IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SparkSql批量插入或更新,保存数据到Mysql中 -> 正文阅读

[大数据]SparkSql批量插入或更新,保存数据到Mysql中

在sparksql 中,保存数据到数据,只有 Append , Overwrite , ErrorIfExists, Ignore 四种模式,不满足项目需求 ,此处大概说一下我们需求,当业务库有数据发生变化,需要更新、插入、删除数仓中ods层的数据,因此需要改造源码。

现依据 spark save 源码,进行进一步的改造, 批量保存数据,存在则更新 不存在 则插入

import com.sun.corba.se.impl.activation.ServerMain.logError
import org.apache.spark.SparkContext
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.getCommonJDBCType
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties

object TestInsertOrUpdateMysql {

  val url: String = "jdbc:mysql://192.168.1.1:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowMultiQueries=true&autoReconnect=true&failOverReadOnly=false"
  val driver: String = "com.mysql.jdbc.Driver"
  val user: String = "123"
  val password: String = "123"
  val sql: String = "select * from testserver "
  val table: String = "testinsertorupdate"

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("testSqlServer").getOrCreate()

    val dbtable = "(" + sql + ") AS Temp"

    val jdbcDF = spark.read
      .format("jdbc")
      .option("driver", driver)
      .option("user", user)
      .option("password", password)
      .option("url", url)
      .option("dbtable", dbtable)
      .load()
    jdbcDF.show()

    //普通写入数据库
    //commonWrite(jdbcDF)

    //saveorupdate
    insertOrUpdateToMysql("id", jdbcDF, spark)

    println("======================程序结束======================")
  }

1.先看一下普通的插入怎么写的

def commonWrite(jdbcDF: DataFrame): Unit = {
    val properties = new Properties()
    properties.put("user", user)
    properties.put("password", password)
    properties.put("driver", driver)
    jdbcDF.write.mode(SaveMode.Append).jdbc(url, table, properties)
  }

这种方式比较局限,只能做一些简单的插入(追加或覆盖等SaveMode.Append)

那么新的写法是什么呢,首先写出mysql的更新或插入的语法规则:

INSERT INTO t_name ( c1, c2, c3 )
VALUES
	( 1, '1', '1') 
	ON DUPLICATE KEY UPDATE
	c2 = '2';

需要注意的是一定要有主键,没主键没法更新;

2.看一下insertorupdate的写法

//写入数据库,批量插入 或更新 数据 ,该方法 借鉴Spark.write.save() 源码
  // 规则如下:
  //没有关键主键字段即为插入,有即为更新
  def insertOrUpdateToMysql(primaryKey: String, jdbcDF: DataFrame, spark: SparkSession): Unit = {
    val sc: SparkContext = spark.sparkContext
    spark.conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    //1.加载驱动程序
    Class.forName(driver);
    //2. 获得数据库连接
    val conn: Connection = DriverManager.getConnection(url, user, password);

    val tableSchema = jdbcDF.schema
    val columns = tableSchema.fields.map(x => x.name).mkString(",")
    val placeholders = tableSchema.fields.map(_ => "?").mkString(",")
    val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders) on duplicate key update "
    val update = tableSchema.fields.map(x =>
      x.name.toString + "=?"
    ).mkString(",")

    //ON DUPLICATE KEY UPDATE
    //on conflict($primaryKey) do update set
    val realsql = sql.concat(update)
    conn.setAutoCommit(false)
    val dialect = JdbcDialects.get(conn.getMetaData.getURL)
    val broad_ps = sc.broadcast(conn.prepareStatement(realsql))
    val numFields = tableSchema.fields.length * 2
    //调用spark中自带的函数,获取属性字段与字段类型
    val nullTypes = tableSchema.fields.map(f => getJdbcType(f.dataType, dialect).jdbcNullType)
    val setters = tableSchema.fields.map(f => makeSetter(conn, f.dataType))

    var rowCount = 0
    val batchSize = 2000
    val updateindex = numFields / 2
    try {
      jdbcDF.foreachPartition(iterator => {
        //遍历批量提交
        val ps = broad_ps.value
        try {
          while (iterator.hasNext) {
            val row = iterator.next()
            var i = 0
            while (i < numFields) {
              i < updateindex match {
                case true => {
                  if (row.isNullAt(i)) {
                    ps.setNull(i + 1, nullTypes(i))
                  } else {
                    setters(i).apply(ps, row, i, 0)
                  }
                }
                case false => {
                  if (row.isNullAt(i - updateindex)) {
                    ps.setNull(i + 1, nullTypes(i - updateindex))
                  } else {
                    setters(i - updateindex).apply(ps, row, i, updateindex)
                  }
                }
              }
              i = i + 1
            }
            ps.addBatch()
            rowCount += 1
            if (rowCount % batchSize == 0) {
              ps.executeBatch()
              rowCount = 0
            }
          }
          if (rowCount > 0) {
            ps.executeBatch()
          }
        } finally {
          ps.close()
        }
      })
      conn.commit()
    } catch {
      case e: Exception =>
        logError("Error in execution of insert. " + e.getMessage)
        conn.rollback()
      // insertError(connectionPool("OuCloud_ODS"),"insertOrUpdateToPgsql",e.getMessage)
    } finally {
      conn.close()
    }

  }

几个源码包

private def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = {
    dialect.getJDBCType(dt).orElse(getCommonJDBCType(dt)).getOrElse(
      throw new IllegalArgumentException(s"Can't get JDBC type for ${dt.catalogString}"))
  }

  private type JDBCValueSetter_add = (PreparedStatement, Row, Int, Int) => Unit

  private def makeSetter(conn: Connection, dataType: DataType): JDBCValueSetter_add = dataType match {
    case IntegerType =>
      (stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
        stmt.setInt(pos + 1, row.getInt(pos - currentpos))

    case LongType =>
      (stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
        stmt.setLong(pos + 1, row.getLong(pos - currentpos))

    case DoubleType =>
      (stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
        stmt.setDouble(pos + 1, row.getDouble(pos - currentpos))

    case FloatType =>
      (stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
        stmt.setFloat(pos + 1, row.getFloat(pos - currentpos))

    case ShortType =>
      (stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
        stmt.setInt(pos + 1, row.getShort(pos - currentpos))

    case ByteType =>
      (stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
        stmt.setInt(pos + 1, row.getByte(pos - currentpos))

    case BooleanType =>
      (stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
        stmt.setBoolean(pos + 1, row.getBoolean(pos - currentpos))

    case StringType =>
      (stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
        stmt.setString(pos + 1, row.getString(pos - currentpos))

    case BinaryType =>
      (stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
        stmt.setBytes(pos + 1, row.getAs[Array[Byte]](pos - currentpos))

    case TimestampType =>
      (stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
        stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos - currentpos))

    case DateType =>
      (stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
        stmt.setDate(pos + 1, row.getAs[java.sql.Date](pos - currentpos))

    case t: DecimalType =>
      (stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
        stmt.setBigDecimal(pos + 1, row.getDecimal(pos - currentpos))
    case _ =>
      (stmt: PreparedStatement, row: Row, pos: Int, currentpos: Int) =>
        throw new IllegalArgumentException(
          s"Can't translate non-null value for field $pos")
  }

这里面有个属性比较关键我列出来,不加会报错 Exception in thread "main" java.io.NotSerializableException: com.mysql.jdbc.JDBC42PreparedStatement:

spark.conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

添加下这个配置 这个第三方序列化 用默认的javaSerializer 不行

结果如下,这块有一些坑,有需要的朋友,我们可以交流

附postgreesql的更新或插入语法:

INSERT INTO test_001 ( c1, c2, c3 )
VALUES( ?, ?, ? )
ON conflict ( ID ) DO
UPDATE SET c1=?,c2 = ?,c3 = ?;

MySQL的on duplicate key update 的使用_厄尔尼诺的夏天的博客-CSDN博客?

博主qq:907044657,欢迎大家一起交流学习,有问题请指出,转载麻烦注明出处,多谢啦

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-07-21 21:37:17  更:2022-07-21 21:38:25 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 0:01:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码