IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark 处理流入数据入湖 -> 正文阅读

[大数据]spark 处理流入数据入湖

场景:mysql 的 binlog 通过 canal 采集放入 kafka ,生成结果表

iceberg 处理 change log stream业务库产生的数据操作, insert update delete , 入湖iceberg 表

处理思路: 主要是利用 spark 的merge into?

--flink 建表sql

CREATE?TABLE?`sample_stream_test01` (

??`id`?BIGINT?NOT?NULL,

??`data`?VARCHAR(2147483647)

) PARTITIONED?BY?(`id`)

WITH?(

??'catalog-database'?=?'test001',

??'write.metadata.delete-after-commit.enabled'?=?'true',

??'warehouse'?=?'hdfs://nameservice2/user/hive/warehouse/',

??'uri'?=?'thrift://10.8.49.114:9083,thrift://10.8.49.115:9083',

??'write.metadata.previous-versions-max'?=?'2',

??'catalog-table'?=?'sample_stream_test02',

??'catalog-type'?=?'hive',

??'write.distribution-mode'?=?'hash'

);

CREATE?TABLE?`sample_stream_test02` (

??`id`?BIGINT?NOT?NULL,

??`data`?VARCHAR(2147483647),

??`op`?VARCHAR(2147483647)

) PARTITIONED?BY?(`id`)

WITH?(

??'catalog-database'?=?'test001',

??'write.metadata.delete-after-commit.enabled'?=?'true',

??'warehouse'?=?'hdfs://nameservice2/user/hive/warehouse/',

??'uri'?=?'thrift://10.8.49.114:9083,thrift://10.8.49.115:9083',

??'write.metadata.previous-versions-max'?=?'2',

??'catalog-table'?=?'sample_stream_test02',

??'catalog-type'?=?'hive',

??'write.distribution-mode'?=?'hash'

);

// sample_stream_test01 表为合并之后的示例表,sample_stream_test02 为kafka 接入的 canal 表模拟示例表,op 里面为?insert?|?update?|detele

insert?into?sample_stream_test01?values?(1,'1');

insert?into?sample_stream_test01?values?(2,'2');

insert?into?sample_stream_test02?values?(1,'111','update');

insert?into?sample_stream_test02?values?(2,'22','delete');

insert?into?sample_stream_test02?values?(3,'333','update');

insert?into?sample_stream_test02?values?(4,'4','insert');

MERGE?INTO?test001.sample_stream_test01 t

USING (SELECT?*?from?test001.sample_stream_test02 ) s?

ON?t.id = s.id?

WHEN?MATCHED?AND?s.op='delete'?THEN?DELETE

WHEN?MATCHED?AND?s.op='update'?THEN?UPDATE?SET?*

WHEN?NOT?MATCHED??THEN?INSERT?*

;

注意点:
1.sample_stream_test02 表中 单批次必须同一个 on 关联条件下,必须唯一,不然报错
2.merge into 之后 产生的 状态为 overwrite ,? 不能再流式读取
3.如果 sample_stream_test01 表中不存在数据,?sample_stream_test02表中有记录op 为delete , 这样就出一个问题,执行的不是delete ,而是 not metched 执行insert 操作,结果就出现问题
官方merge into 特性,?WHEN MATCHED? 为 delete 和 update ,?WHEN NOT MATCHED? 为insert
所以,这种情况,delete 在单独处理,merge into 只处理 insert? 和 update ,
4. 接入kafka 数据 如有数据为 同一批次,同一 主键,只有保留一条记录, 那么如id=1,? 1,insert; 1,update , 这样处理的 成一条数据就成了 1,update ; 写入表中又无数据,匹配不到update 条件
所以在 进入?WHEN NOT MATCHED ,不要加入?AND s.op='insert' ,? 和3 的相互的效果。要注意细节。

? ? ? ? ? ? ??

参考文章:业务数据实时流入iceberg数据湖的方法 - 知乎

1.流式读取kafka表

val?df?=?spark.readStream

??.format("kafka")

??.option("kafka.bootstrap.servers",?"bootstrapServers地址")

??.option("subscribe",?"topic地址")

??//????? .option("startingOffsets", "latest")

??.option("startingOffsets",?"earliest")

??.option("failOnDataLoss",?"false")

??.load()

2.流式写入处理

val?query?=?df.writeStream

??//????? .format("console") // 打印控制台

??.trigger(Trigger.ProcessingTime("2 seconds"))??// 触发时间

??//????? .outputMode(OutputMode.Append())?????? // 写入模式

??.option("checkpointLocation", DEFAULTFS +?"/tmp/lz")?// 地址为 hdfs 地址,一定要注意,否则报 空指针异常

??//????? .option("checkpointLocation", "~/tmp/ll") // 地址为 hdfs 地址,一定要注意,否则报 空指针异常

??.option("fanout-enabled",?"true")

??.foreachBatch {

????(batchDF:?DataFrame, batchId:?Long)?=> {

??????executorBatchDf(spark, batchDF.selectExpr("CAST(value AS STRING)"))

????}

??}

??.start()

query.awaitTermination()

3. 处理批式处理

def?executorBatchDf(spark:?SparkSession, batchDF:?DataFrame)?=?{

????val?schema?=?new?StructType()

??????.add("database", StringType)

??????.add("table", StringType)

??????.add("type", StringType)

??????.add("ts", LongType)

??????.add("id", LongType)

??????.add("data", StringType)

????batchDF

??????.withColumn("value", from_json(col("value"), schema))

??????.select(col("value.*"))

??????.createOrReplaceTempView("batch_all_data")

????batchDF.sparkSession.sqlContext.cacheTable("batch_all_data")

????import?spark.implicits._

????import?scala.collection.JavaConverters._

????val?tableData?=?batchDF.sparkSession.sql(

??????"""

?????????select database,table,type,ts,id,data from batch_all_data

??????""".stripMargin

????).flatMap(

??????line?=> {

????????println(line)

????????val?rows?=?JSON.parseArray(line.getAs[String]("data"))

????????val?database?=?line.getAs[String]("database")

????????val?table?=?line.getAs[String]("table")

????????val?op?=?line.getAs[String]("type")

????????val?ts?=?line.getAs[Long]("ts")

????????rows.asInstanceOf[JSONArray].asScala.map(

??????????r?=> {

????????????val?rows?=?JSON.parseObject(r.toString)

????????????val?key?=?s"${database}:${table}:${rows.getString("oms_order_id")}"

????????????val?jsonStr?=?JSON.toJSONString(r, SerializerFeature.WriteMapNullValue)

????????????RecordItem(key, op, ts, database, table, jsonStr)

??????????})

??????})

??????.rdd

??????.groupBy(_.key)

??????.map(

????????records?=> {

??????????val?items?=?records._2.toSeq.sortBy(_.ts)

??????????items.last

????????}

??????).toDF("data","op")

????/*// 同一批次 转化 op

??????示例1: 1 insert

?????????????1 update

?????????????转成 1 update

??????示例2:? 2 insert

?????????????2 update

?????????????2 delete

?????????????转成 2 delete

????*/

//??? tableData.collect().foreach(

//????? line => {

//??????? println(line)

//????? }

//??? )

????val?tableName?=?"test001.order_source"

????val?whereSql?=?" t.order_id = s.order_id "

????val?specName?=?" order_create_time "

????val?icebergTable?=?Spark3Util.loadIcebergTable(spark, tableName)

????val?icebergSchema?=?SparkSchemaUtil.convert(icebergTable.schema())

????tableData.select(from_json(col("data"), icebergSchema) as?"data", col("op")).createOrReplaceTempView("tmp_merge_data")

????val?mergeSql?=?new?StringBuffer()

????mergeSql.append(s"MERGE INTO $tableName t \n")

????mergeSql.append(s"USING (SELECT data.*, op FROM tmp_merge_data ORDER BY $specName DESC) s \n")

????mergeSql.append(s" ON ${whereSql.toString} "?+

??????s" AND DATE_FORMAT(t.$specName, 'yyyy-MM-dd') >= DATE_SUB(DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyyy-MM-dd'), 30)\n")

????mergeSql.append(" WHEN MATCHED AND s.op = 'delete' THEN DELETE \n")

????mergeSql.append(" WHEN MATCHED AND s.op = 'update' THEN UPDATE SET * \n")

????mergeSql.append(" WHEN NOT MATCHED? THEN INSERT * \n")

????tableData.sparkSession.sql(mergeSql.toString)

????icebergTable.refresh()

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-14 10:00:40  更:2022-05-14 10:02:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 6:33:21-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码