IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark批量写数据到mysql -> 正文阅读

[大数据]Spark批量写数据到mysql

1.读取数据

private def runJdbcDatasetExample(spark: SparkSession): Unit = {
    
    // 从JDBC source加载数据(load)
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://127.0.0.1:3306/test")
      .option("dbtable", "mytable")
      .option("user", "root")
      .option("password", "root")
      .load()

    val connectionProperties = new Properties()
    connectionProperties.put("user", "root")
    connectionProperties.put("password", "root")
    val jdbcDF2 = spark.read
      .jdbc("jdbc:mysql://127.0.0.1:3306/test", "mytable", connectionProperties)
    // 指定读取schema的数据类型
    connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
    val jdbcDF3 = spark.read
      .jdbc("jdbc:mysql://127.0.0.1:3306/test", "mytable", connectionProperties)

  }

值得注意的是,上面的方式如果不指定分区的话,Spark默认会使用一个分区读取数据,这样在数据量特别大的情况下,会出现OOM。在读取数据之后,调用DataFrameDF.rdd.partitions.size方法可以查看分区数。

2.批量写数据到mysql

  case class Person(name: String, age: Int)
  def main(args: Array[String]): Unit = {

    // 创建sparkSession对象
    val conf = new SparkConf()
      .setAppName("BatchInsertMySQL")
    val spark: SparkSession =  SparkSession.builder()
      .config(conf)
      .getOrCreate()
    import spark.implicits._
    // MySQL连接参数
    val url = JDBCUtils.url
    val user = JDBCUtils.user
    val pwd = JDBCUtils.password

    // 创建Properties对象,设置连接mysql的用户名和密码
    val properties: Properties = new Properties()

    properties.setProperty("user", user) // 用户名
    properties.setProperty("password", pwd) // 密码
    properties.setProperty("driver", "com.mysql.jdbc.Driver")
    properties.setProperty("numPartitions","10")

    // 读取mysql中的表数据
    val testDF: DataFrame = spark.read.jdbc(url, "test", properties)
     println("testDF的分区数:  " + testDF.rdd.partitions.size)
   testDF.createOrReplaceTempView("test")
   testDF.persist(StorageLevel.MEMORY_AND_DISK)
   testDF.printSchema()

    val result =
      s"""-- SQL代码
               """.stripMargin

    val resultBatch = spark.sql(result).as[Person]
    println("resultBatch的分区数: " + resultBatch.rdd.partitions.size)

    // 批量写入MySQL
    // 此处最好对处理的结果进行一次重分区
    // 由于数据量特别大,会造成每个分区数据特别多
    resultBatch.repartition(500).foreachPartition(record => {

      val list = new ListBuffer[Person]
      record.foreach(person => {
        val name = Person.name
        val age = Person.age
        list.append(Person(name,age))
      })
      upsertDateMatch(list) //执行批量插入数据
    })
    // 批量插入MySQL的方法
    def upsertPerson(list: ListBuffer[Person]): Unit = {

      var connect: Connection = null
      var pstmt: PreparedStatement = null

      try {
        connect = JDBCUtils.getConnection()
        // 禁用自动提交
        connect.setAutoCommit(false)

        val sql = "REPLACE INTO `person`(name, age)" +
          " VALUES(?, ?)"

        pstmt = connect.prepareStatement(sql)

        var batchIndex = 0
        for (person <- list) {
          pstmt.setString(1, person.name)
          pstmt.setString(2, person.age)
          // 加入批次
          pstmt.addBatch()
          batchIndex +=1
          // 控制提交的数量,
          // MySQL的批量写入尽量限制提交批次的数据量,否则会把MySQL写挂!!!
          if(batchIndex % 1000 == 0 && batchIndex !=0){
            pstmt.executeBatch()
            pstmt.clearBatch()
          }

        }
        // 提交批次
        pstmt.executeBatch()
        connect.commit()
      } catch {
        case e: Exception =>
          e.printStackTrace()
      } finally {
        JDBCUtils.closeConnection(connect, pstmt)
      }
    }

    spark.close()
  }
}

JDBC连接工具类:

object JDBCUtils {
  val user = "root"
  val password = "root"
  val url = "jdbc:mysql://localhost:3306/mydb"
  Class.forName("com.mysql.jdbc.Driver")
  // 获取连接
  def getConnection() = {
    DriverManager.getConnection(url,user,password)
  }
// 释放连接
  def closeConnection(connection: Connection, pstmt: PreparedStatement): Unit = {
    try {
      if (pstmt != null) {
        pstmt.close()
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (connection != null) {
        connection.close()
      }
    }
  }
}

Spark写入大量数据到MySQL时,在写入之前尽量对写入的DF进行重分区处理,避免分区内数据过多。在写入时,要注意使用foreachPartition来进行写入,这样可以为每一个分区获取一个连接,在分区内部设定批次提交,提交的批次不易过大,以免将数据库写挂。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-16 11:48:52  更:2021-08-16 11:50:03 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:27:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码