| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Flink SQL与Kafka整合的哪些事儿 -> 正文阅读 |
|
[大数据]Flink SQL与Kafka整合的哪些事儿 |
1 Flink Table与Kafka集成案例1.1?需求需求:Flink Table从kafka消费点击日志(JSON),转化为CSV格式之后输出到Kafka。 1.2?添加Maven依赖FlinkTable集成Kafka需引入如下依赖: <dependency> ??????<groupId>org.apache.flink</groupId> ??????<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> ??????<version>${flink.version}</version> </dependency> 1.3?代码实现Flink Table API实现Kafka生产者与消费者的完整代码如下所示。 package com.bigdata.chap02; import org.apache.flink.table.api.*; import static org.apache.flink.table.api.Expressions.$; public class FlinkTableAPIKafka2Kafka { ????public static final String input_topic = "clicklog_input"; ????public static final String out_topic = "clicklog_output"; ????public static void main(String[] args) { ????????//1、创建TableEnvironment ????????EnvironmentSettings settings = EnvironmentSettings ????????????????.newInstance() ????????????????.build(); ????????TableEnvironment tEnv = TableEnvironment.create(settings); ????????//2、创建kafka source table ????????final Schema schema = Schema.newBuilder() ????????????????.column("user", DataTypes.STRING()) ????????????????.column("url", DataTypes.STRING()) ????????????????.column("cTime", DataTypes.STRING()) ????????????????.build(); ????????tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka") ????????????????.schema(schema) ????????????????.format("json") ????????????????.option("topic",input_topic) ????????????????.option("properties.bootstrap.servers","hadoop1:9092") ????????????????.option("properties.group.id","clicklog")//每次都从最早的offsets开始 ????????????????.option("scan.startup.mode","latest-offset") ????????????????.build()); ????????//3、创建?kafka sink table ????????tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("kafka") ????????????????.schema(schema) ????????????????.format("csv") ????????????????.option("topic",out_topic) ????????????????.option("properties.bootstrap.servers","hadoop1:9092") ????????????????.build()); ????????//5、输出(包括执行,不需要单独在调用tEnv.execute("job")) ????????tEnv.from("sourceTable") ????????????????.select($("user"), $("url"),$("cTime")) ????????????????.executeInsert("sinkTable"); ????} } 1.4?打开Kafka数据生产与消费#查看topic列表 bin/kafka-topics.sh --zookeeper localhost:2181 --list #创建输入与输出topic bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic clicklog_input --replication-factor 3 --partitions 3 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic clicklog_output --replication-factor 3 --partitions 3 #Kafka 输出topic打开消费者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic clicklog_output #打开Kafka输入Topic生产者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic clicklog_input {"user":"Mary","url":"./home","cTime":"2022-02-02 12:00:00"} 如果clicklog_input topic的生产者输入数据之后,在clicklog_output topic端能消费到数据,则说明Flink Table打通了Kafka端到端的数据流程。 2 Kafka SQL Connector高级特性2.1 key和value格式Kafka消息的key和value均可指定format。 #仅指定value format tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka") ????????????????.schema(schema) ????????????????.format("json") ????????????????.option("topic",input_topic) ????????????????.option("properties.bootstrap.servers","hadoop1:9092") ????????????????.option("properties.group.id","clicklog")//每次都从最早的offsets开始 ????????????????.option("scan.startup.mode","latest-offset") ????????????????.build()); #指定key和value format tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka") ????????????????.schema(schema) ????????????????.option("key.format","json") ???.option("value.format","json") ????????????????.option("topic",input_topic) ????????????????.option("properties.bootstrap.servers","hadoop1:9092") ????????????????.option("properties.group.id","clicklog")//每次都从最早的offsets开始 ????????????????.option("scan.startup.mode","latest-offset") ????????????????.build());
2.2 Topic和Partition发现可以通过topic或者topic-pattern来配置主题。
2.3?读取位置作为source,是可以通过scan.startup.mode选项指定从哪个位置开始消费,可选的值如下。 2.4 Sink分区当kafka作为sink时,可以通过sink.partitioner指定partitioner。支持的选项值如下。 2.5?致性保证默认情况下,如果启用checkpoint,Kafka sink使用at-least-once一致性语意。在启用checkpoint的前题下,可通过sink.delivery-guarantee来调整一致性语意: 一旦启用了事物来保证exactly-once语意,一定要注意下游消费者要配置isolation.level为read_committed(默认是read_uncommitted)。 |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/16 8:40:22- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |