检查点机制
Fink中基于异步轻量级的分布式快照技术提供了Checkpoints容错机制,分布式快照可以将同一时间点Task/Operator的状态数据全局统一快照处理。 Flink会在输入的数据集上间歇性地生成checkpoint barrier,通过barrier将间隔时间段内的数据划分到相应的checkpoint中,当应用出现异常时,Operator就能够从上一次快照中恢复所有算子之前的状态,从而保证一致性。 对于状态占用空间较小的应用,快照产生过程非常轻量,高频率创建且对flink任务性能影响相对较小。checkpoint过程中状态数据一般被保存在一个可配置的环境中,通常在JobManager节点或者hdfs上 检查点默认是不开启的
checkpoint开启和时间间隔指定
checkpoint开启和时间间隔指定(根据实际情况选择,状态较大,建议适当增加n的值)
env.enableCheckpointing(1000);
exactly-once和at-least-once语义的选择
默认是使用exactly-ance exactly-ance:保证整个应用内端到端的数据一致性,这种适合数据要求较高,不允许出现丢数据或者数据重复,同时这种情况flink的性能也相对较弱 at-least-once:适合于延时和吞吐量非常高但对数据的一致性要求不高的场景。
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Checkpoint超时时间
超时时间指定了checkpoint执行过程中的上限时间范围,一但checkpoint执行时间超过阈值,flink会中断checkpoint过程,并按超时处理(默认时间是10min)。
env.getCheckpointConfig().setCheckpointingTimeout(60000);
检查点之间的最小时间间隔
防止出现状态数据过大导致checkpoint执行时间过长,从而导致checkpoint积压过多,最终flink密集的触发checkpoint操作,占用大量计算资源而影响整个应用的性能。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
最大并行执行的检查点数量
设定能够最大同时执行的Checkpoint数量。默认情况下只有一个,根据用户指定的数量可以同时触发多个checkpoint,进而提升checkpoint整体效率。
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
外部检查点
设置周期性的外部检查点,然后将状态数据持久化到外部系统中,这种方式不会在任务正常停止的过程中清理掉检查点,而是一直保存到外部系统介质中,而且可以通过外部检查点中对任务进行恢复。
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETIN_ON_CANCELLATION);
failOnCheckpointingErrors
failOnCheckpointingErrors参数决定了当Checkpoint执行过程中如果出现失败或者错误时,任务是否关闭默认是true。
env.getCheckpointConfig().setfailOnCheckpointingErrors(false);
|