首先消费到源数据:
这里源数据放在了kafka 发布订阅消息系统(专门用来处理流数据平台)
用flink 消费kafka中的数据:
这里提供一个获取消费者的方法
def getFlinkKafkaConsume(kafkaServer:String,kafkaGroupId:String,topicName:String): FlinkKafkaConsumer[String] ={
val props = new Properties()
props.setProperty("bootstrap.servers",kafkaServer)
props.setProperty("group.id",kafkaGroupId)
props.setProperty("key.deserializer",classOf[StringSerializer].getName)
props.setProperty("value.deserializer",classOf[StringSerializer].getName)
props.setProperty("auto.offset.reset","latest") // latest earliest
val kafkaConsume = new FlinkKafkaConsumer[String](topicName,new SimpleStringSchema(),props)
kafkaConsume
}
分流操作主要是写操作(将消费到的数据向kafka 的一个或者多个主题中写入)
声明一个kafka生产者:
val properties = new Properties
properties.setProperty("bootstrap.servers", kafkaServer)
properties.setProperty("transaction.timeout.ms", s"${timeoutKafka}")
val myProducer = new FlinkKafkaProducer[String](
"dwd-default-topic",
new BinLogOdsToDwdProducerKafkaSchema("dwd_qy_db"),
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
mapDataSourde.addSink(produce) //将数据写入到下一级kafka中
BinLogOdsToDwdProducerKafkaSchema 类实现
import java.lang
import java.nio.charset.StandardCharsets
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
import java.nio.charset.StandardCharsets
import com.zw.bigdata.common.pojo.MaxwellBinlogRecord
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JObject, JValue}
import org.json4s.jackson.JsonMethods.parse
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.json4s._
class BinLogOdsToDwdProducerKafkaSchema(layerPrefix:String) extends KafkaSerializationSchema[String]{
val ODS_PREFIX=layerPrefix //"dwd_yd_db"
override def serialize(t: String, aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
implicit val formats = DefaultFormats
val value = parse(t).extract[MaxwellBinlogRecord[JValue]]
var dbName=value.database
if (value.database.matches("^cps_user_(\\d+).*")){
dbName="cps_user"
}
if (value.database.matches("^cps_shard_(\\d+).*")){
dbName="cps_shard"
}
val targetTopic = s"${ODS_PREFIX}_${dbName}_${value.table}"
// println(compact(render(value.data)))
// println(value)
new ProducerRecord[Array[Byte], Array[Byte]](targetTopic, t.getBytes(StandardCharsets.UTF_8))
//new ProducerRecord[Array[Byte], Array[Byte]](targetTopic, compact(render(value.data)).getBytes(StandardCharsets.UTF_8))
}
}
入口类:
import java.util.Properties
import com.zw.bigdata.common.config.EnvConfig
import com.zw.bigdata.common.kafka.BinLogOdsToDwdProducerKafkaSchema
import com.zw.bigdata.common.util.{FlinkExecutionEnvUtil, FlinkKafkaUtil}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
object BinlogOds2DwdWriter {
val flinkCheckpointPath = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.LogOds2DwdWriter.flink.checkout.path")
val jobName = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.LogOds2DwdWriter.flink.job.name")
val timeoutKafka = 15 * 60 * 1000
val kafkaServer = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.kafka.bootstrap.servers")
val kafkaGroupId = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.kafka.group.id")
val binlogTopicName = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.kafka.binlog.topic.name")
val offsetPosition = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.kafka.topic.offset")
def main(args: Array[String]): Unit = {
val environment = FlinkExecutionEnvUtil.getStreamEnv(flinkCheckpointPath)
binlogOds2DwdWrite(environment)
environment.execute(jobName)
}
def binlogOds2DwdWrite(env:StreamExecutionEnvironment):Unit={
val qyBinlogKafka = FlinkKafkaUtil.getFlinkKafkaConsume(kafkaServer,kafkaGroupId,binlogTopicName)
qyBinlogKafka.setStartFromEarliest()
val qyBinlogDs = env.addSource(qyBinlogKafka)
val properties = new Properties
properties.setProperty("bootstrap.servers", kafkaServer)
properties.setProperty("transaction.timeout.ms", s"${timeoutKafka}")
val myProducer = new FlinkKafkaProducer[String](
"dwd-default-topic",
new BinLogOdsToDwdProducerKafkaSchema("dwd_qy_db"),
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
// qyBinlogDs.print()
qyBinlogDs.addSink(myProducer)
}
}
数据聚合:
|