Sink部分Flink根据Kafka分为了2个部分
0.11之前因为没有kafka的事务相关 所以没法做到 消息的exactly_once
0.11之后是可以实现的
FlinkKafkakProducer 的创建有多个重载构造方法,当我们没有指定相关的 流checkpoint 语义,
那么默认 是at_least_once
FlinkKafkaProducerBase
public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
继承 sinkFunction 基于实现checkpointFunction
对于SinkFunction相关的处理逻辑是在invoke,snapshot里面,但我们首先看下其一些前置准备工作 open(), init()
Open
open方法做了一些前置工作的准备,主要是一下几个
- 序列化方式
- 创建KafkaProducer
- metrics
- checkpoint配置
Invoke
invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context)
如果 transcation 如果是 EXACTLY_ONCE 每次会在每次 checkpoint的时候变化
做一些消息转发的动作, 如果是 exactly_once模式 这时候是不会真正的提交,只有在checkpoint的时候才会
将本次事务的消息进行提交,然后再开启下一个事务
Snapshot checkpoint 阶段 (重点)
- FlinkKafkaProducer.snapshot()
- super.snapshotState(context) 实际调用 TwoPhaseCommitSinkFunction
- TwoPhaseCommitSinkFunction.snapshotState
- 获取checkpointId
- preCommit 将当前事务的数据进行提交
- kafkaProducer: 将exactly_once 和 at_least_once 的数据进行 producer.flush
? ? ? ? 3. currentTransactionHolder = beginTransactionInternal() 开启一个新的事务
- ????????????????这里只有对 exactly_once 模式进行了事务创建
- ????????????????at_least_once && none 默认复用之前的
|