def ?executorBatchDf(spark : ?SparkSession, batchDF : ?DataFrame)? = ?{ ???? val ?schema? = ?new ?StructType() ?????? .add( "database" , StringType) ?????? .add( "table" , StringType) ?????? .add( "type" , StringType) ?????? .add( "ts" , LongType) ?????? .add( "id" , LongType) ?????? .add( "data" , StringType) ???? batchDF ?????? .withColumn( "value" , from _ json(col( "value" ), schema)) ?????? .select(col( "value.*" )) ?????? .createOrReplaceTempView( "batch_all_data" ) ???? batchDF.sparkSession.sqlContext.cacheTable( "batch_all_data" ) ???? import ?spark.implicits. _ ???? import ?scala.collection.JavaConverters. _ ???? val ?tableData? = ?batchDF.sparkSession.sql( ?????? "" " ????????? select database,table,type,ts,id,data from batch_all_data ?????? " "" .stripMargin ???? ).flatMap( ?????? line? = > { ???????? println(line) ???????? val ?rows? = ?JSON.parseArray(line.getAs[String]( "data" )) ???????? val ?database? = ?line.getAs[String]( "database" ) ???????? val ?table? = ?line.getAs[String]( "table" ) ???????? val ?op? = ?line.getAs[String]( "type" ) ???????? val ?ts? = ?line.getAs[Long]( "ts" ) ???????? rows.asInstanceOf[JSONArray].asScala.map( ?????????? r? = > { ???????????? val ?rows? = ?JSON.parseObject(r.toString) ???????????? val ?key? = ?s "${database}:${table}:${rows.getString(" oms _ order _ id ")}" ???????????? val ?jsonStr? = ?JSON.toJSONString(r, SerializerFeature.WriteMapNullValue) ???????????? RecordItem(key, op, ts, database, table, jsonStr) ?????????? }) ?????? }) ?????? .rdd ?????? .groupBy( _ .key) ?????? .map( ???????? records? = > { ?????????? val ?items? = ?records. _ 2 .toSeq.sortBy( _ .ts) ?????????? items.last ???????? } ?????? ).toDF( "data" , "op" ) ???? /*// 同一批次 转化 op ?????? 示例1: 1 insert ????????????? 1 update ????????????? 转成 1 update ?????? 示例2:? 2 insert ????????????? 2 update ????????????? 2 delete ????????????? 转成 2 delete ???? */ //??? tableData.collect().foreach( //????? line => { //??????? println(line) //????? } //??? ) ???? val ?tableName? = ?"test001.order_source" ???? val ?whereSql? = ?" t.order_id = s.order_id " ???? val ?specName? = ?" order_create_time " ???? val ?icebergTable? = ?Spark 3 Util.loadIcebergTable(spark, tableName) ???? val ?icebergSchema? = ?SparkSchemaUtil.convert(icebergTable.schema()) ???? tableData.select(from _ json(col( "data" ), icebergSchema) as? "data" , col( "op" )).createOrReplaceTempView( "tmp_merge_data" ) ???? val ?mergeSql? = ?new ?StringBuffer() ???? mergeSql.append(s "MERGE INTO $tableName t \n" ) ???? mergeSql.append(s "USING (SELECT data.*, op FROM tmp_merge_data ORDER BY $specName DESC) s \n" ) ???? mergeSql.append(s " ON ${whereSql.toString} " ?+ ?????? s " AND DATE_FORMAT(t.$specName, 'yyyy-MM-dd') >= DATE_SUB(DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyyy-MM-dd'), 30)\n" ) ???? mergeSql.append( " WHEN MATCHED AND s.op = 'delete' THEN DELETE \n" ) ???? mergeSql.append( " WHEN MATCHED AND s.op = 'update' THEN UPDATE SET * \n" ) ???? mergeSql.append( " WHEN NOT MATCHED? THEN INSERT * \n" ) ???? tableData.sparkSession.sql(mergeSql.toString) ???? icebergTable.refresh() |