IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink SQL与Kafka整合的哪些事儿 -> 正文阅读

[大数据]Flink SQL与Kafka整合的哪些事儿

1 Flink Table与Kafka集成案例

1.1?需求

需求:Flink Table从kafka消费点击日志(JSON),转化为CSV格式之后输出到Kafka。

1.2?添加Maven依赖

FlinkTable集成Kafka需引入如下依赖:

<dependency>

??????<groupId>org.apache.flink</groupId>

??????<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>

??????<version>${flink.version}</version>

</dependency>

1.3?代码实现

Flink Table API实现Kafka生产者与消费者的完整代码如下所示。

package com.bigdata.chap02;

import org.apache.flink.table.api.*;

import static org.apache.flink.table.api.Expressions.$;

public class FlinkTableAPIKafka2Kafka {

????public static final String input_topic = "clicklog_input";

????public static final String out_topic = "clicklog_output";

????public static void main(String[] args) {

????????//1、创建TableEnvironment

????????EnvironmentSettings settings = EnvironmentSettings

????????????????.newInstance()

????????????????.build();

????????TableEnvironment tEnv = TableEnvironment.create(settings);

????????//2、创建kafka source table

????????final Schema schema = Schema.newBuilder()

????????????????.column("user", DataTypes.STRING())

????????????????.column("url", DataTypes.STRING())

????????????????.column("cTime", DataTypes.STRING())

????????????????.build();

????????tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka")

????????????????.schema(schema)

????????????????.format("json")

????????????????.option("topic",input_topic)

????????????????.option("properties.bootstrap.servers","hadoop1:9092")

????????????????.option("properties.group.id","clicklog")//每次都从最早的offsets开始

????????????????.option("scan.startup.mode","latest-offset")

????????????????.build());

????????//3、创建?kafka sink table

????????tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("kafka")

????????????????.schema(schema)

????????????????.format("csv")

????????????????.option("topic",out_topic)

????????????????.option("properties.bootstrap.servers","hadoop1:9092")

????????????????.build());

????????//5、输出(包括执行,不需要单独在调用tEnv.execute("job"))

????????tEnv.from("sourceTable")

????????????????.select($("user"), $("url"),$("cTime"))

????????????????.executeInsert("sinkTable");

????}

}

1.4?打开Kafka数据生产与消费

#查看topic列表

bin/kafka-topics.sh --zookeeper localhost:2181 --list

#创建输入与输出topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic clicklog_input --replication-factor 3 --partitions 3

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic clicklog_output --replication-factor 3 --partitions 3

#Kafka 输出topic打开消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic clicklog_output

#打开Kafka输入Topic生产者

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic clicklog_input

{"user":"Mary","url":"./home","cTime":"2022-02-02 12:00:00"}

如果clicklog_input topic的生产者输入数据之后,在clicklog_output topic端能消费到数据,则说明Flink Table打通了Kafka端到端的数据流程。

2 Kafka SQL Connector高级特性

2.1 key和value格式

Kafka消息的key和value均可指定format。

#仅指定value format

tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka")

????????????????.schema(schema)

????????????????.format("json")

????????????????.option("topic",input_topic)

????????????????.option("properties.bootstrap.servers","hadoop1:9092")

????????????????.option("properties.group.id","clicklog")//每次都从最早的offsets开始

????????????????.option("scan.startup.mode","latest-offset")

????????????????.build());

#指定key和value format

tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka")

????????????????.schema(schema)

????????????????.option("key.format","json")

???.option("value.format","json")

????????????????.option("topic",input_topic)

????????????????.option("properties.bootstrap.servers","hadoop1:9092")

????????????????.option("properties.group.id","clicklog")//每次都从最早的offsets开始

????????????????.option("scan.startup.mode","latest-offset")

????????????????.build());

注意:format("json")和option("value.format","json")二选一,二者等价

2.2 Topic和Partition发现

可以通过topic或者topic-pattern来配置主题。

注意:要允许在作业开始运行后发现动态创建的topic,请为 scan.topic-partition-discovery.interval 设置一个非负值。

2.3?读取位置

作为source,是可以通过scan.startup.mode选项指定从哪个位置开始消费,可选的值如下。

2.4 Sink分区

当kafka作为sink时,可以通过sink.partitioner指定partitioner。支持的选项值如下。

2.5?致性保证

默认情况下,如果启用checkpoint,Kafka sink使用at-least-once一致性语意。在启用checkpoint的前题下,可通过sink.delivery-guarantee来调整一致性语意:

一旦启用了事物来保证exactly-once语意,一定要注意下游消费者要配置isolation.level为read_committed(默认是read_uncommitted)。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-01 15:49:32  更:2022-05-01 15:51:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 8:40:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码