IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 从0到1Flink的成长之路(二十一)-Flink+Kafka:End-to-End Exactly-Once代码示例 -> 正文阅读

[大数据]从0到1Flink的成长之路(二十一)-Flink+Kafka:End-to-End Exactly-Once代码示例

Flink+Kafka:End-to-End Exactly-Once代码示例

analysis/

package xx.xxxxxx.flink.exactly;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
/**
* 1. Kafka Producer的容错-Kafka 0.9 and 0.10
* 如果Flink开启了checkpoint,针对FlinkKafkaProducer09 和FlinkKafkaProducer010 可以提供 at-least-once的语义,配置如下参数
* setLogFailuresOnly(false)
* setFlushOnCheckpoint(true)
* retries【这个参数的值默认是0】 //注意:建议修改kafka 生产者的重试次数
** 2. Kafka Producer的容错-Kafka 0.11+
* 如果Flink开启了checkpoint,针对FlinkKafkaProducer011+ 就可以提供 exactly-once的语义
* 但是需要选择具体的语义
* Semantic.NONE
* Semantic.AT_LEAST_ONCE【默认】
* Semantic.EXACTLY_ONCE
*/
public class StreamExactlyOnceKafkaDemo {
 /*
/export/server/kafka/bin/kafka-topics.sh --list --bootstrap-server node1.itcast.cn:9092
Source Kafka:
/export/server/kafka/bin/kafka-topics.sh --create \
--bootstrap-server node1.itcast.cn:9092 --replication-factor 1 --partitions 3 --topic flink-topic-source
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 \
--topic flink-topic-source
Sink Kafka:
/export/server/kafka/bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 \
--replication-factor 1 --partitions 3 --topic flink-topic-sink
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 \
--topic flink-topic-sink --from-beginning
*/
 public static void main(String[] args) throws Exception {
 // 1. 执行环境-env
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(2);
 // 设置Checkpoint
 env.enableCheckpointing(5000);
 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
 env.getCheckpointConfig().enableExternalizedCheckpoints(
 CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
 );
 // 2. 数据源-source
 // 2.1. Kafka Consumer 消费数据属性设置
 Properties sourceProps = new Properties();
 sourceProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1.itcast.cn:9092");
 sourceProps.setProperty("group.id","flink-1001");
 //如果有记录偏移量从记录的位置开始消费,如果没有从最新的数据开始消费
 sourceProps.setProperty("auto.offset.reset","latest");
 //开一个后台线程每隔5s检查Kafka的分区状态
 sourceProps.setProperty("flink.partition-discovery.interval-millis","5000");
 // 2.2. 实例化FlinkKafkaConsumer对象
 FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
 "flink-topic-source", new SimpleStringSchema(), sourceProps
 );
 // 从group offset记录的位置位置开始消费,如果没有该group信息,会根据"auto.offset.reset"的设置来决定从哪开始消费
 kafkaSource.setStartFromGroupOffsets();
 // Flink执行Checkpoint的时候提交偏移量(一份在Checkpoint中, 一份在Kafka主题中__comsumer_offsets(方便外部监控工具去看))
 kafkaSource.setCommitOffsetsOnCheckpoints(true) ;
 // 2.3. 添加数据源
 DataStreamSource<String> kafkaDataStream = env.addSource(kafkaSource);
 // 3. 数据转换-transformation
 // 4. 数据终端-sink
 // 4.1. Kafka Producer 生成者属性配置
 Properties sinkProps = new Properties();
 sinkProps.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
 //设置事务超时时间,也可在kafka配置中设置
 sinkProps.setProperty("transaction.timeout.ms", 60000 * 15 + "");
 // 4.2. 创建序列化实例对象
 KafkaSerializationSchema<String> kafkaSchema = new KafkaSerializationSchema<String>(){
 @Override
 public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
 ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
 "flink-topic-sink", element.getBytes(StandardCharsets.UTF_8)
 );
 return record;
 }
 } ;
 // 4.3. 实例化FlinkKafkaProducer对象
 FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
 "flink-topic-sink",
 kafkaSchema,
 sinkProps,
 FlinkKafkaProducer.Semantic.EXACTLY_ONCE
 );
 // 4.4. 添加sink
 kafkaDataStream.addSink(kafkaSink) ;
 // 5. 触发执行-execute
 env.execute(StreamExactlyOnceKafkaDemo.class.getSimpleName());
  } }
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-09 17:33:49  更:2021-07-09 17:34:07 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 21:37:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码