问题发现:
通过 Flink UI 查看,checkpoint 总是失败 Flink UI 里的每一项具体含义见:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/ops/monitoring/checkpoint_monitoring/
Flink 配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new RocksDBStateBackend(filebackend, true));
原因1:全量 Checkpoint 导致 snapshot 持久化慢
Checkpoint 有两种模式:全量 Checkpoint 和 增量 Checkpoint。全量 Checkpoint 会把当前的 state 全部备份一次到持久化存储,而增量 Checkpoint,则只备份上一次 Checkpoint 中不存在的 state。 目前仅 RocksDBStateBackend 支持增量 Checkpoint。 因此在代码里设置 RocksDB 存储后端,配置依赖,然后代码里加入
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>1.8.0</version>
</dependency>
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = new RocksDBStateBackend("viewfs:///xxx", true);
env.setStateBackend(stateBackend);
参考: https://blog.csdn.net/lxhandlbb/article/details/90668040 https://blog.csdn.net/zc19921215/article/details/107051883 https://www.huaweicloud.com/articles/39bb5380b4ee56fc37ea3bc45d48543d.html
原因2:作业存在反压或者数据倾斜
通过查看 checkpoint failed 对应的 ID(全局搜索:n/a)或者时长最长的 ID,在 overview 里找到对应的数据量大小,可以看到该 task 数据明显多于其他的
解决方案:通常业务上这种情况是由于,有一些爬虫或者搜索次数特别多的用户,集中在了某一个 task 上导致的,因此可以使用预聚合的方式减少 hash 的不均匀。
- 首先将key打散,我们加入将key转化为 key-随机数 ,保证数据散列
- 对打散后的数据进行聚合统计,这时我们会得到数据比如 : (key1-12,1),(key1-13,19),(key1-1,20),(key2-123,11),(key2-123,10)
- 将散列key还原成我们之前传入的key,这时我们的到数据是聚合统计后的结果,不是最初的原数据
- 二次keyby进行结果统计,输出到addSink
参考链接: https://zhuanlan.zhihu.com/p/365136566 https://joccer.gitee.io/2019/12/15/Flink-%E6%95%B0%E6%8D%AE%E5%80%BE%E6%96%9C/ https://www.cxyzjd.com/article/zhangshenghang/105322423
原因3:Barrier 对齐慢
介绍Checkpoint Barrier 对齐机制,算子 Operator 收齐上游的 barrier n 才能触发 snapshot。例如,StateBackend 是 RocksDB,snapshot 开始的时候保存数据到 RocksDB,然后 RocksDB 异步持久化到 FS。如果 barrier n 一直对不齐的话,就不会开始做 snapshot。
另外:Buffered During Alignment:在对所有 Acknowledged Subtasks 进行 Barrier Alignment 期间缓冲的字节数。如果该值大于 0,说明在 Checkpoint 生产期间发生了 Barrier Alignment。如果 Checkpoint 使用的是 AT_LEAST_ONCE 模式,那么该值始终是 0。
具体表现是:Acknowledged 只有个位数到不了 100%,以及 Buffered During Alignment 有好几个G
通过排查日志(也就是看n/a 对应的 ID 的 Task Managers 里的日志)可以看到我这里是 Sedis 连接超时导致的,因此可以设置超时时间,或者在 Flink 里设置 Task 故障恢复:
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
20,
org.apache.flink.api.common.time.Time.of(50, TimeUnit.SECONDS)
));
参考链接: https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/execution/task_failure_recovery/ https://blog.csdn.net/nazeniwaresakini/article/details/107954076
原因4:sink 对应的 redis 或者 hbase 连接数超了
集群并行处理,每个 task 都会创建 redis 实例连接,如果并行度 Parallelism 设置的过大,有可能导致集群崩溃
|