内容: 本文主要介绍使用flink读写kafka,如何保证exactly-once
关键点:
Flink的checkpoint机制
Kafka source支持重新消费,手动commit
Kafka sink支持2PC(two-phase commit protocol)
flink实践任务配置:
Kafka端到端一致性需要注意的点:
- Flink任务需要开启checkpoint配置为CheckpointingMode.EXACTLY_ONCE
- Flink任务FlinkKafkaProducer需要指定参数Semantic.EXACTLY_ONCE
- Flink任务FlinkKafkaProducer配置需要配置transaction.timeout.ms,checkpoint间隔(代码指定)<transaction.timeout.ms(默认为1小时)<transaction.max.timeout.ms(默认为15分钟)
- 消费端在消费FlinkKafkaProducer的topic时需要指定isolation.level(默认为read_uncommitted)为read_committed
例如: flink任务设置:
env.enableCheckpointing(XXX, CheckpointingMode.EXACTLY_ONCE);
若为生产者,还需设置:
properties.setProperty("transaction.timeout.ms", "900000");
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 配置容错
.build();
若为消费者,还需设置:
properties.setProperty("isolation.level", "read_committed");
说明:
1、生产者
Kafka 的 Exactly Once 语义是通过它的事务和生产幂等两个特性来共同实现的。 kafka从0.11开始支持事务(exactly-once语义),这为实现端到端的精确一致性语义提供了支持。 (1)幂等性:为了实现 Producer 的幂等语义,Kafka 引入了Producer ID(即PID)和Sequence Number。 原理:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,该 PID 对用户完全透明而不会暴露给用户。 对于每个 PID,该 Producer 发送数据的每个<Topic, Partition>都对应一个从 0 开始单调递增的Sequence Number。 类似地,Broker 端也会为每个<PID, Topic, Partition>维护一个序号,并且每次 Commit 一条消息时将其对应序号递增。对于接收的每条消息,如果其序号比 Broker 维护的序号(即最后一次 Commit 的消息的序号)大1,则 Broker 会接受它,否则将其丢弃:
- 如果消息序号比 Broker 维护的序号大1以上,说明中间有数据尚未写入,也即乱序,此时 Broker 拒绝该消息,Producer 抛出InvalidSequenceNumber
- 如果消息序号小于等于 Broker 维护的序号,说明该消息已被保存,即为重复消息,Broker 直接丢弃该消息,Producer 抛出DuplicateSequenceNumber
(2)事务性:幂等设计只能保证单个 Producer 对于同一个<Topic, Partition>的Exactly Once语义。事务保证可使得应用程序将生产数据和消费数据当作一个原子单元来处理,要么全部成功,要么全部失败,即使该生产或消费跨多个<Topic, Partition>。 应用程序必须提供一个稳定的(重启后不变)唯一的 ID,也即Transaction ID。Transactin ID与PID可能一一对应。区别在于Transaction ID由用户提供,而PID是内部的实现对用户透明。 为了保证新的 Producer 启动后,旧的具有相同Transaction ID的 Producer 即失效,每次 Producer 通过Transaction ID拿到 PID 的同时,还会获取一个单调递增的 epoch。由于旧的 Producer 的 epoch 比新 Producer 的 epoch 小,Kafka 可以很容易识别出该 Producer 是老的 Producer 并拒绝其请求。 原理:Kafka 0.11.0.0 引入了一个服务器端的模块,名为Transaction Coordinator,用于管理 Producer 发送的消息的事务性。 该Transaction Coordinator维护Transaction Log,该 log 存于一个内部的 Topic 内。由于 Topic 数据具有持久性,因此事务的状态也具有持久性。Producer 并不直接读写Transaction Log,它与Transaction Coordinator通信,然后由Transaction Coordinator将该事务的状态插入相应的Transaction Log。
2、消费者
为了保证事务特性,需要配置isolation.level = read_committed参数 消费者设置read_uncommitted可以读取到未提交的事务数据(默认); 消费者设置read_committed只有在消费者提交事务的时候,才可以读取到数据,如果事务取消了,那么读取不到数据。
|