参考:https://blog.jrwang.me/2019/flink-source-code-checkpoint/#checkpoint-%E7%9A%84%E5%8F%91%E8%B5%B7%E6%B5%81%E7%A8%8B
https://cloud.tencent.com/developer/article/1593969
https://blog.csdn.net/zc19921215/article/details/108171455
Flink checkpoint主要包括 barrier的生成 barrier的传递,状态的保存等过程,源码也是结合这几个部分
课前知识点:
- 每一个算子的快照snapshot被抽象为?OperatorSnapshotFutures,包含了 operator state 和 keyed state 的快照结果
- 在checkpoint过程中,CheckpointCoordinator是很重要的存在,它是整个流程的”协调者“,负责
- 发起 checkpoint 触发的消息,并接收不同 task 对 checkpoint 的响应信息(Ack)
- 维护 Ack 中附带的状态句柄(state-handle)的全局视图
Start .......
如果作业开启了checkpoint,DefaultExecutionGraphBuilder中会调用 executionGraph.enableCheckpointing()方法
class DefaultExecutionGraphBuilder
// configure the state checkpointing
if (isCheckpointingEnabled(jobGraph)) {
......N行代码省略.......
executionGraph.enableCheckpointing(
chkConfig,
hooks,
checkpointIdCounter,
completedCheckpointStore,
rootBackend,
rootStorage,
checkpointStatsTracker,
checkpointsCleaner);
}
这里会创建 CheckpointCoordinator 对象
class DefaultExecutionGraph
// create the coordinator that triggers and commits checkpoints and holds the state
checkpointCoordinator =
new CheckpointCoordinator(
jobInformation.getJobId(),
chkConfig,
operatorCoordinators,
checkpointIDCounter,
checkpointStore,
checkpointStorage,
ioExecutor,
checkpointsCleaner,
new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
SharedStateRegistry.DEFAULT_FACTORY,
failureManager,
createCheckpointPlanCalculator(
chkConfig.isEnableCheckpointsAfterTasksFinish()),
new ExecutionAttemptMappingProvider(getAllExecutionVertices()));
并注册一个作业状态的监听 CheckpointCoordinatorDeActivator
class CheckpointCoordinator
// ------------------------------------------------------------------------
// job status listener that schedules / cancels periodic checkpoints
// ------------------------------------------------------------------------
public JobStatusListener createActivatorDeactivator() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
if (jobStatusListener == null) {
jobStatusListener = new CheckpointCoordinatorDeActivator(this);
}
return jobStatusListener;
}
}
?CheckpointCoordinatorDeActivator 会在作业状态发生改变为RUNNING时得到通知,通过 CheckpointCoordinator.startCheckpointScheduler 启动 checkpoint 的定时器。
public class CheckpointCoordinatorDeActivator implements JobStatusListener {
......N行代码省略.......
@Override
public void jobStatusChanges(
JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
if (newJobStatus == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
}
}
定时任务被封装为 ScheduledTrigger, 运行时会调用 CheckpointCoordinator.triggerCheckpoint() 触发一次 checkpoint
CheckpointCoordinator 发出触发 checkpoint 的消息,最终通过 RPC 调用 TaskExecutorGateway.triggerCheckpoint(TaskExecutorGateway为接口,具体实现在TaskExecutor中),即请求执行 TaskExecutor.triggerCheckpoin()。 因为一个 TaskExecutor 中可能有多个 Task 正在运行,因而要根据触发 checkpoint 的 ExecutionAttemptID 找到对应的 Task,然后调用 Task.triggerCheckpointBarrier() 方法。只有作为 source 的 Task 才会触发 triggerCheckpointBarrier() 方法的调用
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
......N行代码省略.......
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
return CompletableFuture.completedFuture(Acknowledge.get());
} else{
......N行代码省略.......
}
}
Task 执行 checkpoint 的真正逻辑被封装在 AbstractInvokable类中,AbstractInvokable 中有两个触发 checkpoint 的方法
- triggerCheckpointAsync:这个方法被 checkpoint coordinator触发执行checkpoint,是触发 checkpoint 的源头,会向下游注入barriers,即向source task注入barriers
- triggerCheckpointOnBarrier :下游的节点在接收到上游的barriers时会触发该方法
class AbstractInvokable
public Future<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
throw new UnsupportedOperationException(
String.format(
"triggerCheckpointAsync not supported by %s", this.getClass().getName()));
}
public void triggerCheckpointOnBarrier(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetricsBuilder checkpointMetrics)
throws IOException {
throw new UnsupportedOperationException(
String.format(
"triggerCheckpointOnBarrier not supported by %s",
this.getClass().getName()));
}
这两个方法的具体实现有一些细微的差异,但主要的逻辑是一致的,内部都会执行performCheckpoint()方法
class StreamTask
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetricsBuilder checkpointMetrics)
throws Exception {
......N行代码省略.......
subtaskCheckpointCoordinator.checkpointState(
checkpointMetaData,
checkpointOptions,
checkpointMetrics,
operatorChain,
finishedOperators,
this::isRunning);
}
?在checkpointState中,会处理两个事情 1)先向下游发送 barrier, 2)存储检查点快照
class SubtaskCheckpointCoordinatorImpl
// Step (2): 向下游发送barrier
operatorChain.broadcastEvent(
new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
//如果是 unaligned barrier的处理
options.isUnalignedCheckpoint());
// Step (4): 快照
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures =
new HashMap<>(operatorChain.getNumberOfOperators());
try {
if (takeSnapshotSync(
snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) {
finishAndReportAsync(
snapshotFutures,
metadata,
metrics,
operatorChain.isFinishedOnRestore(),
isOperatorsFinished,
isRunning);
} else {
cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
}
} catch (Exception ex) {
cleanup(snapshotFutures, metadata, metrics, ex);
throw ex;
}
?这里涉及到对齐barrier checkpoint 和非对其barrier checkpoint(unaligned checkpoint)的知识,后面解释,这两种都可以保证exactly once
从Step4可以看出先执行state的snapshot,再report执行结果给JobManager,report的消息最终会通过rpc的方式发送CheckpointCoordinator
在一个 Task 完成 checkpoint 操作后,CheckpointCoordinator 接收到 Ack 响应,对 Ack 响应的处理流程主要为:
- 根据 Ack 的 checkpointID 从 Map<Long, PendingCheckpoint> pendingCheckpoints 中查找对应的 PendingCheckpoint,对于一个已经触发但还没有完成的 checkpoint,即 PendingCheckpoint
- 若存在对应的 PendingCheckpoint
- ? 这个 PendingCheckpoint 没有被丢弃,调用 PendingCheckpoint.acknowledgeTask 方法处理 Ack,根据处理结果的不同:
-
SUCCESS:判断是否已经接受了所有需要响应的 Ack(后面也会提到),如果是,则调用completePendingCheckpoint 完成此次 checkpoint -
DUPLICATE:Ack 消息重复接收,直接忽略 -
UNKNOWN:未知的 Ack 消息,清理上报的 Ack 中携带的状态句柄 -
DISCARD:Checkpoint 已经被 discard,清理上报的 Ack 中携带的状态句柄
????????????????这个 PendingCheckpoint 已经被丢弃,抛出异常
- 若不存在对应的 PendingCheckpoint,则清理上报的 Ack 中携带的状态句柄
源码如下:
class CheckpointCoordinator
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
if (shutdown) {
return false;
}
final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
if (checkpoint != null && !checkpoint.isDisposed()) {
switch (checkpoint.acknowledgeTask(
message.getTaskExecutionId(),
message.getSubtaskState(),
message.getCheckpointMetrics(),
getStatsCallback(checkpoint))) {
case SUCCESS:
LOG.debug(
"Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
checkpointId,
message.getTaskExecutionId(),
message.getJob(),
taskManagerLocationInfo);
if (checkpoint.isFullyAcknowledged()) {
completePendingCheckpoint(checkpoint);
}
break;
case DUPLICATE:
LOG.debug(
"Received a duplicate acknowledge message for checkpoint {}, task {}, job {}, location {}.",
message.getCheckpointId(),
message.getTaskExecutionId(),
message.getJob(),
taskManagerLocationInfo);
break;
case UNKNOWN:
LOG.warn(
"Could not acknowledge the checkpoint {} for task {} of job {} at {}, "
+ "because the task's execution attempt id was unknown. Discarding "
+ "the state handle to avoid lingering state.",
message.getCheckpointId(),
message.getTaskExecutionId(),
message.getJob(),
taskManagerLocationInfo);
discardSubtaskState(
message.getJob(),
message.getTaskExecutionId(),
message.getCheckpointId(),
message.getSubtaskState());
break;
case DISCARDED:
LOG.warn(
"Could not acknowledge the checkpoint {} for task {} of job {} at {}, "
+ "because the pending checkpoint had been discarded. Discarding the "
+ "state handle tp avoid lingering state.",
message.getCheckpointId(),
message.getTaskExecutionId(),
message.getJob(),
taskManagerLocationInfo);
discardSubtaskState(
message.getJob(),
message.getTaskExecutionId(),
message.getCheckpointId(),
message.getSubtaskState());
}
return true;
} else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded but non-removed checkpoint "
+ checkpointId);
} else {
reportStats(
message.getCheckpointId(),
message.getTaskExecutionId(),
message.getCheckpointMetrics());
boolean wasPendingCheckpoint;
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
if (recentPendingCheckpoints.contains(checkpointId)) {
wasPendingCheckpoint = true;
LOG.warn(
"Received late message for now expired checkpoint attempt {} from task "
+ "{} of job {} at {}.",
checkpointId,
message.getTaskExecutionId(),
message.getJob(),
taskManagerLocationInfo);
} else {
LOG.debug(
"Received message for an unknown checkpoint {} from task {} of job {} at {}.",
checkpointId,
message.getTaskExecutionId(),
message.getJob(),
taskManagerLocationInfo);
wasPendingCheckpoint = false;
}
// try to discard the state so that we don't have lingering state lying around
discardSubtaskState(
message.getJob(),
message.getTaskExecutionId(),
message.getCheckpointId(),
message.getSubtaskState());
return wasPendingCheckpoint;
}
}
PendingCheckpoint它是如何处理 Ack 消息的呢?在 PendingCheckpoint 内部维护了两个 Map,分别是:
- Map<OperatorID, OperatorState> operatorStates; : 已经接收到 Ack 的算子的状态句柄
- Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;: 需要 Ack 但还没有接收到的 Task
每当接收到一个 Ack 消息时,PendingCheckpoint 就从 notYetAcknowledgedTasks 中移除对应的 Task,并保存 Ack 携带的状态句柄保存。当 notYetAcknowledgedTasks 为空时,表明所有的 Ack 消息都接收到了。
一旦 PendingCheckpoint 确认所有 Ack 消息都已经接收,那么就可以完成此次 checkpoint 了,在CASE SUCCESS中来判断是否Ack消息都已经被接受
if (checkpoint.isFullyAcknowledged()) {
completePendingCheckpoint(checkpoint);
}
当所有算子的Ack消息都接收后,会有一下处理:
- 调用 PendingCheckpoint.finalizeCheckpoint() 将 PendingCheckpoint 转化为 CompletedCheckpoint
- 将 CompletedCheckpoint 保存到 CompletedCheckpointStore 中
- 移除被越过的 PendingCheckpoint,因为 CheckpointID 是递增的,那么所有比当前完成的 CheckpointID 小的 PendingCheckpoint 都可以被丢弃了
- 依次调用 Execution.notifyCheckpointComplete() 通知所有的 Task 当前 Checkpoint 已经完成
class CheckpointCoordinator
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
throws CheckpointException {
//调用 PendingCheckpoint.finalizeCheckpoint() 将 PendingCheckpoint 转化为 CompletedCheckpoint
completedCheckpoint =
pendingCheckpoint.finalizeCheckpoint(
checkpointsCleaner,
this::scheduleTriggerRequest,
executor,
getStatsCallback(pendingCheckpoint));
//将 CompletedCheckpoint 保存到 CompletedCheckpointStore 中
completedCheckpointStore.addCheckpoint(
completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest);
// 移除被越过的 PendingCheckpoint
dropSubsumedCheckpoints(checkpointId);
//在该方法中,依次调用 Execution.notifyCheckpointComplete() 通知所有的 Task 当前 Checkpoint 已经完成
sendAcknowledgeMessages(
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
checkpointId,
completedCheckpoint.getTimestamp());
}
额外的小知识?
? 1. 对齐barrier和非对其barrier(unaligned)checkpoint?
?对齐barrier
?unaligned barrier
??
?对齐barrier大家都很熟悉了,第一种的缺点是这种方式是阻塞的,需要等待所有上游的barrier都到齐再触发checkpoint,如果出现反压,可能会导致ck完成延迟或失败
无对齐的barrier在其中一个barrier到达之后,会在算子的缓存数据队列(包括输入 Channel 和输出 Channel)中往前跳跃一段距离,跑到最前面,即第二种图2所示,从而实现即使出现反压,barrier也能比较顺畅地由Source端直达Sink端,而被”插队”的数据和其他输入 Channel 在其 Barrier 之前的数据会被写入快照中来,通过Input + Output + State三者的持久化,在Unaligned Checkpoint语义下实现Exactly_Once语义
naligned Checkpoint也存在一些问题:
- 由于要持久化缓存数据,State Size 会有比较大的增长,磁盘负载会加重。
- 随着 State Size 增长,作业恢复时间可能增长,运维管理难度增加
Flink1.11的时候官方当前推荐仅将它应用于那些容易产生反压且I/O压力较小(比如原始状态不太大)的作业中,现在没去看优化的怎么样了
? 2. exactly once和at least once与barrier对齐关系
每一个 Task 的通过 InputGate 消费上游 Task 产生的数据,CheckpointBarrierHandler 是对 InputGate 的一层封装,增加了对 CheckpointBarrier 等事件的处理,CheckpointBarrierHandler 有两个实现
- CheckpointBarrierTracker
- SingleCheckpointBarrierHandler
分别对应着flink的两种模式:AT_LEAST_ONCE 和 EXACTLY_ONCE 这两种模式
对于AT_LEAST_ONCE ,数据来了就处理,所有barrier到了就checkpoint,先到的barrier之后的数据不缓存,直接流到下个算子计算处理,当需要回溯时,这部分数据会被重新处理,导致重复处理
对于Exactly_once,barrier来了会把数据缓存起来,等待其他输入的barrier到来之后,制作快照后才会处理,保证了处理且处理一次
|