| def?executorBatchDf(spark:?SparkSession, batchDF:?DataFrame)?=?{ ????val?schema?=?new?StructType() ??????.add("database", StringType) ??????.add("table", StringType) ??????.add("type", StringType) ??????.add("ts", LongType) ??????.add("id", LongType) ??????.add("data", StringType) ????batchDF ??????.withColumn("value", from_json(col("value"), schema)) ??????.select(col("value.*")) ??????.createOrReplaceTempView("batch_all_data") ????batchDF.sparkSession.sqlContext.cacheTable("batch_all_data") ????import?spark.implicits._ ????import?scala.collection.JavaConverters._ ????val?tableData?=?batchDF.sparkSession.sql( ??????""" ?????????select database,table,type,ts,id,data from batch_all_data ??????""".stripMargin ????).flatMap( ??????line?=> { ????????println(line) ????????val?rows?=?JSON.parseArray(line.getAs[String]("data")) ????????val?database?=?line.getAs[String]("database") ????????val?table?=?line.getAs[String]("table") ????????val?op?=?line.getAs[String]("type") ????????val?ts?=?line.getAs[Long]("ts") ????????rows.asInstanceOf[JSONArray].asScala.map( ??????????r?=> { ????????????val?rows?=?JSON.parseObject(r.toString) ????????????val?key?=?s"${database}:${table}:${rows.getString("oms_order_id")}" ????????????val?jsonStr?=?JSON.toJSONString(r, SerializerFeature.WriteMapNullValue) ????????????RecordItem(key, op, ts, database, table, jsonStr) ??????????}) ??????}) ??????.rdd ??????.groupBy(_.key) ??????.map( ????????records?=> { ??????????val?items?=?records._2.toSeq.sortBy(_.ts) ??????????items.last ????????} ??????).toDF("data","op") ????/*// 同一批次 转化 op ??????示例1: 1 insert ?????????????1 update ?????????????转成 1 update ??????示例2:? 2 insert ?????????????2 update ?????????????2 delete ?????????????转成 2 delete ????*/ //??? tableData.collect().foreach( //????? line => { //??????? println(line) //????? } //??? ) ????val?tableName?=?"test001.order_source" ????val?whereSql?=?" t.order_id = s.order_id " ????val?specName?=?" order_create_time " ????val?icebergTable?=?Spark3Util.loadIcebergTable(spark, tableName) ????val?icebergSchema?=?SparkSchemaUtil.convert(icebergTable.schema()) ????tableData.select(from_json(col("data"), icebergSchema) as?"data", col("op")).createOrReplaceTempView("tmp_merge_data") ????val?mergeSql?=?new?StringBuffer() ????mergeSql.append(s"MERGE INTO $tableName t \n") ????mergeSql.append(s"USING (SELECT data.*, op FROM tmp_merge_data ORDER BY $specName DESC) s \n") ????mergeSql.append(s" ON ${whereSql.toString} "?+ ??????s" AND DATE_FORMAT(t.$specName, 'yyyy-MM-dd') >= DATE_SUB(DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyyy-MM-dd'), 30)\n") ????mergeSql.append(" WHEN MATCHED AND s.op = 'delete' THEN DELETE \n") ????mergeSql.append(" WHEN MATCHED AND s.op = 'update' THEN UPDATE SET * \n") ????mergeSql.append(" WHEN NOT MATCHED? THEN INSERT * \n") ????tableData.sparkSession.sql(mergeSql.toString) ????icebergTable.refresh() |