前言
flink的 RestartStrategy 作用是什么? 一句话概括,提升任务健壮性和容错性,保证任务可以实时产出数据。 设置重启策略和公司处理数据业务需求有很大的关系,根据不同的业务需求设置处理任务的不同策略。
其实遇到上面这种问题比较常见的,比如有时候因为数据的问题(不合规范、为 null 等),这时在处理这些脏数据的时候可能就会遇到各种各样的异常错误,比如空指针、数组越界、数据类型转换错误等。可能你会说只要过滤掉这种脏数据就行了,或者进行异常捕获就不会导致 Job 不断重启的问题了。 所以日常开发中我们要尽力的保证代码的健壮性,但是也要配置好 Flink Job 的 RestartStrategy(重启策略)。
1.flink的重启策略(RestartStrategy)
RestartStrategy,重启策略,在遇到机器或者代码等不可预知的问题时导致 Job 或者 Task 挂掉的时候,它会根据配置的重启策略将 Job 或者受影响的 Task 拉起来重新执行,以使得作业恢复到之前正常执行状态。Flink 中的重启策略决定了是否要重启 Job 或者 Task,以及重启的次数和每次重启的时间间隔。
2. flink的4种重启策略
默认的重启策略是通过Flink的flink-conf.yaml来指定的,这个配置参数restart-strategy定义了哪种策略会被采用。如果checkpoint未启动,就会采用no restart策略,如果启动了checkpoint机制,但是未指定重启策略的话,就会采用fixed-delay策略,重试Integer.MAX_VALUE次。请参考下面的可用重启策略来了解哪些值是支持的。
配置参数 restart-strategy 定义了哪个策略被使用。
- 固定间隔 (Fixed delay)
- 失败率 (Failure rate)
- 无重启 (No restart)
- Fallback(备用重启策略)
FixedDelayRestartStrategy(固定延时重启策略) FixedDelayRestartStrategy 是固定延迟重启策略,程序按照集群配置文件中或者程序中额外设置的重启次数尝试重启作业,如果尝试次数超过了给定的最大次数,程序还没有起来,则停止作业,另外还可以配置连续两次重启之间的等待时间,在 flink-conf.yaml 中可以像下面这样配置。
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
在程序中设置固定延迟重启策略的话如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies
.fixedDelayRestart(3,Time.seconds(3)));
FailureRateRestartStrategy(故障率重启策略)
FailureRateRestartStrategy 是故障率重启策略,在发生故障之后重启作业,如果固定时间间隔之内发生故障的次数超过设置的值后,作业就会失败停止,该重启策略也支持设置连续两次重启之间的等待时间。
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
可以在应用程序中这样设置来配置故障率重启策略:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,
Time.of(2, TimeUnit.SECONDS),
Time.of(3, TimeUnit.SECONDS))
);
NoRestartStrategy(不重启策略)
NoRestartStrategy 作业不重启策略,直接失败停止,在 flink-conf.yaml 中配置如下:
restart-strategy: none
在程序中如下设置即可配置不重启:
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
Fallback(备用重启策略)
如果程序没有启用 Checkpoint,则采用不重启策略,如果开启了 Checkpoint 且没有设置重启策略,那么采用固定延时重启策略,最大重启次数为 Integer.MAX_VALUE。
|