一、简介
持久化,说的是两种不同的持久化方式,Checkpoint自动持久化。和Savepoints手动持久化
checkpoint
checkpoint是由flink定期的,自动的进行数据的持久化(把状态中的数据写入到磁盘(HDFS))。新的checkpoint执行完成之后,会把老的checkpoint丢弃掉
JobManager负责checkpoint的发起以及协调。JobManager节点会定期向TaskManager节点发送Barrier(实际上 是JobManager创建的CheckpointCoordinator),TaskManager接收到Barrier信号,会把Barrier信号作为数据流 的一部分传递给所有算子。每一个算子接收到Barrier信号后会预先提交自己的状态信息并且给JobManger应答, 同时会将Barrier信号传递给下游算子。JobManager接收到所有算子的应答后才认定此次Checkpoint是成功的,并 且会自动删除上一次Checkpoint数据。否则,如果在规定的时间内没有收到所有算子的应答,则认为本次 Checkpoint快照制作失败 。
Savepoints
Savepoint是手动触发的checkpoint,它获取程序的快照并将其写入state backend。可以通过命令在任何的时间点上进行数据的持久化
Savepoint是手动触发的checkpoint,它获取程序的快照并将其写入state backend。Checkpoint依赖于常规的检 查点机制:在执行过程中个,程序会定期在TaskManager上快照并且生成checkpoint。为了恢复,只需要最后生成 的checkpoint。旧的checkpoint可以在新的checkpoint完成后安全地丢弃。 Savepoint与上述的定期checkpoint类似,只是他们由用户触发,并且在新的checkpoint完成时不会自动过期。 Savepoint可以通过命令行创建,也可以通过REST API在取消Job时创建。
二、区别:
Checkpoint依赖于常规的检查点机制:在执行过程中个,程序会定期在TaskManager上快照并且生成checkpoint。为了恢复,只需要最后生成的checkpoint。旧的checkpoint可以在新的checkpoint完成后安全地丢弃。
Savepoint与上述的定期checkpoint类似,只是他们由用户触发,并且在新的checkpoint完成时不会自动过期。Savepoint可以通过命令行创建,也可以通过REST API在取消Job时创建。
首先最容易注意到的是 Savepoint 是一种特殊的 Checkpoint,实际上它们的存储格式也是一致的,它们主要的不同在于定位。Checkpoint 机制的目标在于保证 Flink 作业意外崩溃重启不影响 exactly once 准确性,通常是配合作业重启策略使用的。而 Savepoint 的目的在于在 Flink 作业维护(比如更新作业代码)时将作业状态写到外部系统,以便维护结束后重新提交作业可以到恢复原本的状态。换句话讲,Checkpoint 是为 Flink runtime 准备的,Savepoint 是为 Flink 用户准备的。因此 Checkpoint 是由 Flink runtime 定时触发并根据运行配置自动清理的,一般不需要用户介入,而 Savepoint 的触发和清理都由用户掌控。
其次,由于 Checkpoint 的频率远远大于 Savepoint,Flink 对 Checkpoint 格式进行了针对不同 StateBackend 的优化,因此它在底层储存效率更高,而代价是耦合性更强,比如不保证 rescaling (即改变作业并行度)的特性和跨版本兼容。这里说”不保证”而不是”不支持”,原因是实际上 RocksDB 的 Checkpoint 是支持 rescaling 的,”不保证”更多是从系统设计出发而言。跨版本兼容性也同理。
最后,Savepoint 的定义有提及它是 non-incremental 的,这是相对于 incremental Checkpoint 来说。因为 Checkpoint 是秒级频繁触发的,两个连续 Checkpoint 通常高度相似,因此对于 State 特别大的作业来说,每次 Checkpoint 只增量地补充 diff 可以大大地节约成本,这就是 incremental Checkpoint 的由来。而 Savepoint 并不会连续地触发,而且比起效率,Savepoint 更关注的是可移植性和版本兼容性,所以不支持 incremental 也是理所当然。
三、checkpoint在Flink中的使用
默认情况下,Flink的Checkpoint机制是禁用的,如果需要开启,可以通过以下API完成
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
|