Flink管理机制中,很重要的一个功能就是对状态进行持久化保存,这样就可以在发生故障进行重启恢复,持久化方式为当前状态,拍一个快照,并写入检查点,存储在外部存储系统中,存储介质一般为分步式文件系统(例如HDFS)。
检查点(Checkpoint)
检查点是任务的状态在某个时间点的一个快照,简单来说,就是一次存盘,让之前的数据不会丢掉,Flink会定期保存检查点,记录状态,如果发生故障,就会用最近一次成功保存的检查点来恢复之前的状态,重新启动处理数据。
如果保存检查点之后又处理了一些数据,然后发生了故障,那么重启恢复状态之后这些数据带来的状态改变会丢失。为了让最终处理结果正确,我们还需要让源(Source)算子重新读取这些数据,再次处理一遍。这就需要流的数据源具有“数据重放”的能力,一个典型的例子就是 Kafka,我们可以通过保存消费数据的偏移量、故障重启后重新提交来实现数据的重放。这是对“至少一次”(at least once)状态一致性的保证,如果希望实现“精确一次”(exactly once)的一致性,还需要数据写入外部系统时的相关保证。
默认情况下,检查点是被禁用的,需要在代码中手动开启。直接调用执行环境的.enableCheckpointing()方法就可以开启检查点。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
env.enableCheckpointing(1000L);
这里传入的参数是检查点的间隔时间,单位为毫秒。
除了检查点之外,Flink 还提供了“保存点”(savepoint)的功能。保存点在原理和形式上跟检查点完全一样,也是状态持久化保存的一个快照;区别在于,保存点是自定义的镜像保存,所以不会由 Flink 自动创建,而需要用户手动触发。这在有计划地停止、重启应用时非常有用。
状态后端(State Backends)
检查点的保存离不开 JobManager 和 TaskManager,以及外部存储系统的协调。在应用进行检查点保存时,首先会由 JobManager 向所有 TaskManager 发出触发检查点的命令;TaskManger 收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中;完成之后向 JobManager 返回确认信息。这个过程是分布式的,当 JobManger 收到所有TaskManager 的返回信息后,就会确认当前检查点成功保存,而这一切工作的协调,就需要一个“专职人员”来完成。
在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作 状态后端(state backend)。 状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。
分类
1)哈希表状态后端(HashMapStateBackend)
把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。对于检查点的保存,一般是放在持久化的分布式文件系统(file system)中,也可以通过配置“检查点存储”(CheckpointStorage)来另外指定。
HashMapStateBackend 是将本地状态全部放入内存的,这样可以获得最快的读写速度,使计算性能达到最佳;代价则是内存的占用。它适用于具有大状态、长窗口、大键值状态的作业,对所有高可用性设置也是有效的。
2)内嵌 RocksDB 状态后端(EmbeddedRocksDBStateBackend)
RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend 后,会将处理中的数据全部放入 RocksDB 数据库中,RocksDB默认存储在 TaskManager 的本地数据目录里。
与 HashMapStateBackend 直接在堆内存中存储对象不同,这种方式下状态主要是放在RocksDB 中的。数据被存储为序列化的字节数组(Byte Arrays),读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key 的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。
如何选择
HashMapStateBackend 是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。
而 RocksDB 是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比 HashMapStateBackend 慢一个数量级。
状态后端的配置
1)配置默认的状态后端 在 flink-conf.yaml 中,可以使用 state.backend 来配置默认状态后端。配置项的可能值为 hashmap,这样配置的就是 HashMapStateBackend;也可以是 rocksdb,这样配置的就是 EmbeddedRocksDBStateBackend。另外,也可以是一个实现了状态后端工厂StateBackendFactory 的类的完全限定类名。
下面是一个配置 HashMapStateBackend 的例子:
state.backend: hashmap
state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints
2)为每个作业(Per-job)单独配置状态后端 每个作业独立的状态后端,可以在代码中,基于作业的执行环境直接设置。代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
上面代码设置的是 HashMapStateBackend,如果想要设置EmbeddedRocksDBStateBackend, 可以用下面的配置方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
需要注意,如果想在 IDE 中使用 EmbeddedRocksDBStateBackend,需要为 Flink 项目添加 依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>1.13.0</version>
</dependency>
而由于 Flink 发行版中默认就包含了 RocksDB,所以只要我们的代码中没有使用 RocksDB的相关内容,就不需要引入这个依赖。即使我们在 flink-conf.yaml 配置文件中设定了state.backend 为 rocksdb,也可以直接正常运行,并且使用 RocksDB 作为状态后端。
|