IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 用Flink程序数据分流项目 -> 正文阅读

[大数据]用Flink程序数据分流项目

首先消费到源数据:

这里源数据放在了kafka 发布订阅消息系统(专门用来处理流数据平台)

用flink 消费kafka中的数据:

这里提供一个获取消费者的方法


def getFlinkKafkaConsume(kafkaServer:String,kafkaGroupId:String,topicName:String): FlinkKafkaConsumer[String] ={

val props = new Properties()

props.setProperty("bootstrap.servers",kafkaServer)
props.setProperty("group.id",kafkaGroupId)
props.setProperty("key.deserializer",classOf[StringSerializer].getName)
props.setProperty("value.deserializer",classOf[StringSerializer].getName)
props.setProperty("auto.offset.reset","latest")   // latest        earliest

val kafkaConsume = new FlinkKafkaConsumer[String](topicName,new SimpleStringSchema(),props)
kafkaConsume
}

分流操作主要是写操作(将消费到的数据向kafka 的一个或者多个主题中写入)

在这里插入图片描述
声明一个kafka生产者:

val properties = new Properties
properties.setProperty("bootstrap.servers", kafkaServer)

properties.setProperty("transaction.timeout.ms", s"${timeoutKafka}")


val myProducer = new FlinkKafkaProducer[String](
  "dwd-default-topic",
  new BinLogOdsToDwdProducerKafkaSchema("dwd_qy_db"),
  properties,
  FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)

mapDataSourde.addSink(produce)  //将数据写入到下一级kafka中

BinLogOdsToDwdProducerKafkaSchema 类实现

import java.lang
import java.nio.charset.StandardCharsets

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
import java.nio.charset.StandardCharsets

import com.zw.bigdata.common.pojo.MaxwellBinlogRecord
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JObject, JValue}
import org.json4s.jackson.JsonMethods.parse
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.json4s._


           
class BinLogOdsToDwdProducerKafkaSchema(layerPrefix:String)  extends KafkaSerializationSchema[String]{

    val ODS_PREFIX=layerPrefix       //"dwd_yd_db"
    
    override def serialize(t: String, aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
    
    implicit val formats = DefaultFormats
    
    val value = parse(t).extract[MaxwellBinlogRecord[JValue]]
    
    var dbName=value.database
    
    if (value.database.matches("^cps_user_(\\d+).*")){
    dbName="cps_user"
    }
    
    if (value.database.matches("^cps_shard_(\\d+).*")){
    dbName="cps_shard"
    }

    val targetTopic = s"${ODS_PREFIX}_${dbName}_${value.table}"


    //    println(compact(render(value.data)))      
    //    println(value)

    new ProducerRecord[Array[Byte], Array[Byte]](targetTopic, t.getBytes(StandardCharsets.UTF_8))
    
    //new ProducerRecord[Array[Byte], Array[Byte]](targetTopic, compact(render(value.data)).getBytes(StandardCharsets.UTF_8))
    
      }
    }

入口类:

import java.util.Properties

import com.zw.bigdata.common.config.EnvConfig
import com.zw.bigdata.common.kafka.BinLogOdsToDwdProducerKafkaSchema
import com.zw.bigdata.common.util.{FlinkExecutionEnvUtil, FlinkKafkaUtil}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer



object BinlogOds2DwdWriter {


val flinkCheckpointPath = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.LogOds2DwdWriter.flink.checkout.path")
val jobName = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.LogOds2DwdWriter.flink.job.name")
val timeoutKafka = 15 * 60 * 1000
val kafkaServer = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.kafka.bootstrap.servers")

val kafkaGroupId = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.kafka.group.id")

val binlogTopicName = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.kafka.binlog.topic.name")

val offsetPosition = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.kafka.topic.offset")


def main(args: Array[String]): Unit = {
val environment = FlinkExecutionEnvUtil.getStreamEnv(flinkCheckpointPath)

binlogOds2DwdWrite(environment)


environment.execute(jobName)

}



def binlogOds2DwdWrite(env:StreamExecutionEnvironment):Unit={
val qyBinlogKafka = FlinkKafkaUtil.getFlinkKafkaConsume(kafkaServer,kafkaGroupId,binlogTopicName)
qyBinlogKafka.setStartFromEarliest()
val qyBinlogDs = env.addSource(qyBinlogKafka)

val properties = new Properties
properties.setProperty("bootstrap.servers", kafkaServer)

properties.setProperty("transaction.timeout.ms", s"${timeoutKafka}")
val myProducer = new FlinkKafkaProducer[String](
  "dwd-default-topic",
  new BinLogOdsToDwdProducerKafkaSchema("dwd_qy_db"),
  properties,
  FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)

//    qyBinlogDs.print()
qyBinlogDs.addSink(myProducer)


  }

}

数据聚合:
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-09 12:46:25  更:2022-05-09 12:47:57 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 6:54:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码