flink应用开发完成并且部署上线以后,正常情况下要求是持续运行的,也就是不可以中断执行,比如在按天分组统计业务数据的应用中,将之前收到的数据量累计并记录在缓存中,后续接收到数据后,分组累加到之前的数据上。
不管是程序存在bug需要修复后运行,还是程序需要升级功能后重新运行都是不可避免的,此时就需要借助于checkpoint来进行检查点持久化,以便在下次启动应用的时候,利用检查点恢复数据并且继续从上次中断的位置继续运行。
1. 检查点参数配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 3s 做一次 checkpoint
// 根据业务场景,可以灵活设置该参数
env.enableCheckpointing(3000);
// 设置文件后端
env.setStateBackend(new FsStateBackend("file:///opt/flink/chkpoint"));
// checkpoint 语义设置为 EXACTLY_ONCE,这是默认语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 两次 checkpoint 的间隔时间至少为 1 s,默认是 0,立即进行下一次 checkpoint
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
// checkpoint 必须在 60s 内结束,否则被丢弃,默认是 10 分钟
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只能允许有一个 checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 最多允许 checkpoint 失败 3 次
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// 当 Flink 任务取消时,保留外部保存的 checkpoint 信息
// 这个是重点,取消和恢复需要此配置
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
// 当有较新的 Savepoint 时,作业也会从 Checkpoint 处恢复
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
2. 查看并记录checkpoint
?通过flink的web ui可以查看任务的checkpoint,不管是正在运行的,还是已经结束的任务,都可以看到checkpoint。
3. 启动任务时指定checkpoint
?上传flink程序jar包以后,submit时需要指定checkpoint,只有这样,程序才会从之前保存的信息中恢复信息到缓存中,继续进行数据处理。
4. checkpoint保存和恢复的数据
checkpoint保存和恢复的数据,包括中间统计数据,kafka消息队列的消费偏移量等。
|