解决问题:从gp->kafka,数据集获取,发送kafka的数据变换
val sourceDF = Range(0, 10)
.map(index => {
val dbtable =
s"""(
|select
|aid1,
|aid2,
|t1 ,
|t2 ,
|along_interval ,
|source_id ,
|create_time,
|dt ,
|thumbnail_id1,
|thumbnail_url1,
|image_id1,
|image_url1 ,
|thumbnail_id2 ,
|thumbnail_url2,
|image_id2 ,
|image_url2 ,
|score1 ,
|score2
|from dwd_bigdata_relation_peer_day_5030
|WHERE create_time > '2020-06-27 00:00:00' AND create_time <= '2020-06-28 00:00:00') AS t_tmp_$index""".stripMargin
println(dbtable)
spark
.read
.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url","jdbc:postgresql://192.168.11.33:2222/bigdata_dwd" )
.option("dbtable", dbtable)
.option("user", "张三")
.option("password", "123456")
.option("fetchsize","5000")
.load()
})
.reduce((rdd1, rdd2) => rdd1.union(rdd2))
println("加载同行事件")
sourceDF.show()
val producer = new KafkaProducer[String, String](props)
val peerArray = sourceDF.collect
for(i <- 0 to peerArray.length-1){
val row = peerArray(i)
val sourceAid = row.getAs[String]("aid1")
val targetAid = row.getAs[String]("aid2")
val time = row.getAs[String]("t1")
val source_id = row.getAs[String]("source_id")
val along_interval = row.getAs[Int]("along_interval")
val create_time = row.getAs[Timestamp]("create_time")
val dt = row.getAs[String]("dt")
val thumbnail_id1 = row.getAs[String]("thumbnail_id1")
val thumbnail_url1 = row.getAs[String]("thumbnail_url1")
val image_id1 = row.getAs[String]("image_id1")
val image_url1 = row.getAs[String]("image_url1")
val thumbnail_id2 = row.getAs[String]("thumbnail_id2")
val thumbnail_url2 = row.getAs[String]("thumbnail_url2")
val image_id2 = row.getAs[String]("image_id2")
val image_url2 = row.getAs[String]("image_url2")
val score1 = row.getAs[String]("score1")
val score2 = row.getAs[String]("score2")
val event = RelationshipPeer(sourceAid, targetAid, time, time, along_interval, source_id, create_time, dt, thumbnail_id1, thumbnail_url1, image_id1, image_url1, thumbnail_id2, thumbnail_url2, image_id2, image_url2, score1, score2)
implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
val data: String = Serialization.write(event)
val message = new ProducerRecord[String, String](topic,s"key$i", data.toString())
producer.send(message)
}
case class RelationshipPeer(aid1: String, aid2: String, t1: String, t2: String, along_interval: Int, source_id: String, create_time: Timestamp, dt: String, thumbnail_id1: String, thumbnail_url1: String, image_id1: String, image_url1: String , thumbnail_id2: String , thumbnail_url2: String, image_id2: String, image_url2: String, score1: String, score2: String)
|