一、前言
我们知道exactly-once ,大多知道有checkpoint ,但是在Flink1.4 之后,又新增了端到端的exactly-once。也就是输入和输出是对应的,没有丢失和重复。
1.1、Flink-1.4之前的exactly-once实现
Flink的基本思路就是将状态定时地checkpiont 到hdfs中去,当发生failure的时候恢复上一次的状态,然后将输出update到外部。这里需要注意的是输入流的offset也是状态的一部分,因此一旦发生failure就能从最后一次状态恢复,从而保证输出的结果是exactly once。
1.2、Flink-1.4之后的exactly-once实现
2017年12月,apache flink 1.4.0发布。其中有一个里程碑式的功能:两步提交的sink function(TwoPhaseCommitSinkFunction,relevant Jira here)。TwoPhaseCommitSinkFunction 就是把最后写入存储的逻辑分为两部提交,这样就有可能构建一个从数据源到数据输出的一个端到端的exactly-once语义的flink应用。当然,TwoPhaseCommitSinkFunction的数据输出包括apache kafka 0.11以上的版本。flink提供了一个抽象的TwoPhaseCommitSinkFunction类,来让开发者用更少的代码来实现端到端的exactly-once语义。
Flink的 checkpoint 在保证exactly-once 是内部应用exactly-once ,不需要重复计算等 Flink是通过两步提交协议来保证从数据源到数据输出的exactly-once 语义(外部)
接下来,我们通过一个例子来解释如果应用TwoPhaseCommitSinkFunction 来实现一个exactly-once 的sink 。
二、Exactly-once Tow Phase Commit
下面我们来看看flink 消费并写入kafka 的例子是如何通过两部提交来保证exactly-once 语义的。
注意: 因为只有kafka 从0.11 开始支持事物操作,若要使用flink 端到端exactly-once 语义需要flink 的sink 的kafka 是0.11 版本以上的。 同时 DELL/EMC 的 Pravega 也支持使用flink 来保证端到端的exactly-once 语义。
这个例子包括以下几个步骤:
- 从
kafka 读取数据 - 一个聚合窗操作
- 向kafka写入数据
为了保证exactly-once ,所有写入kafka的操作必须是事务的。在两次checkpiont 之间要批量提交数据,这样在任务失败后就可以将没有提交的数据回滚。
然而一个简单的提交和回滚,对于一个分布式的流式数据处理系统来说是远远不够的。下面我们来看看flink是如何解决这个问题的。
Flink官方推荐所有需要保证exactly once的Sink逻辑都继承该抽象类。它定义了如下4个抽象方法,需要子类实现。
protected abstract TXN beginTransaction() throws Exception;
protected abstract void preCommit(TXN transaction) throws Exception;
protected abstract void commit(TXN transaction);
protected abstract void abort(TXN transaction);
2.1、预提交 (preCommit)
首先我们看下 preCommit 代码实现。
@Override
protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction)
throws FlinkKafkaException {
switch (semantic) {
case EXACTLY_ONCE:
case AT_LEAST_ONCE:
flush(transaction);
break;
}
}
preCommit 在TwoPhaseCommitSinkFunction#snapshotState() 中调用
public void snapshotState(FunctionSnapshotContext context) throws Exception {
long checkpointId = context.getCheckpointId();
preCommit(currentTransactionHolder.handle);
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
currentTransactionHolder = beginTransactionInternal();
state.clear();
state.add(new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}
TwoPhaseCommitSinkFunction也继承了CheckpointedFunction 接口,所以2PC是与检查点机制一同发挥作用的。
每当需要做checkpoint 时,JobManager 就在数据流中打入一个屏障(barrier ),作为检查点的界限。屏障随着算子链向下游传递,每到达一个算子都会触发将状态快照写入状态后端(state BackEnd)的动作。当屏障到达Kafka sink后,触发preCommit(实际上是KafkaProducer.flush())方法刷写消息数据,但还未真正提交。接下来还是需要通过检查点来触发提交阶段。
2.1.1、插入检查点
flink 的jobmanager 会在数据流中插入一个检查点的标记(这个标记可以用来区别这次checkpoint 的数据和下次checkpoint 的数据)。 这个标记会在整个dag中传递。每个dag中的算子遇到这个标记就会触发这个算子状态的快照。(图2)
2.1.2、触发将状态快照写入状态后端
读取kafka的算子,在遇到检查点标记时会存储kafka的offset。之后,会把这个检查点标记传到下一个算子。
接下来就到了flink的内存操作算子。这些内部算子就不用考虑两步提交协议了,因为他们的状态会随着flink整体的状态来更新或者回滚。
2.1.3、和外部系统,两步提交协议来保证数据不丢失不重复
到了和外部系统打交道的时候,就需要两步提交协议来保证数据不丢失不重复了。在预提交这个步骤下,所有向kafka提交的数据都是预提交。
一旦开启了checkpoint功能,JobManager就在数据流中源源不断地打入屏障(barrier),作为检查点的界限。屏障随着算子链向下游传递,每到达一个算子都会触发将状态快照写入状态后端的动作。当屏障到达Kafka sink后,通过KafkaProducer.flush() 方法刷写消息数据,但还未真正提交。
接下来还是需要通过检查点来触发提交阶段
2.2、提交阶段
@Override
protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
transaction.producer.commitTransaction();
} finally {
recycleTransactionalProducer(transaction.producer);
}
}
}
该方法的调用点位于TwoPhaseCommitSinkFunction.notifyCheckpointComplete() 方法中。顾名思义,当所有检查点都成功完成之后,会回调这个方法。
该方法每次从正在等待提交的事务句柄中取出一个,校验它的检查点ID,并调用commit()方法提交之。
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
Throwable firstError = null;
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
try {
commit(pendingTransaction.handle);
} catch (Throwable t) {
pendingTransactionIterator.remove();
}
}
2.3、回滚
只有在所有检查点都成功完成这个前提下,写入才会成功。这符合2PC的流程,其中JobManager为协调者,各个算子为参与者(不过只有sink一个参与者会执行提交)。一旦有检查点失败,notifyCheckpointComplete()方法就不会执行。
如果重试也不成功的话,最终会调用abort()方法回滚事务。
2.4、总结一下flink的两步提交
当所有算子都完成他们的快照时,进行正式提交操作
当任意子任务在预提交阶段失败时,其他任务立即停止,并回滚到上一次成功快照的状态。
在预提交状态成功后,外部系统需要完美支持正式提交之前的操作。如果有提交失败发生,整个flink应用会进入失败状态并重启,重启后将会继续从上次状态来尝试进行提交操作。
参考
https://www.aboutyun.com/forum.php?mod=viewthread&tid=27395 https://dandelioncloud.cn/article/details/1441622512370266113 https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
|