由于有效的状态访问对于处理数据的低延迟只管重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问。
状态的存储、访问以及维护,有一个可插入的组件决定,这个组件就叫做状态后端(State Backends)
状态后端主要负责两件事:
- 本地的状态管理
- 将检查点(checkpoint)状态写入远程存储
Flink 提供的状态后端:
- MemoryStateBackend:内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在 TaskManager 的 JVM 堆上,而将 checkpoint 存储在 JobManger 的内存中。(生产不用)
- FsStateBackend:将 checkpoint 存到远程的持久化文件系统(FileSystem)上,面对于本地状态,跟 MemoryStateBackend 一样,也存到 TaskManger 的 JVM 堆上。
- RockDBStateBackend:将所有状态序列化后,存入本地的 RockDB中存储。RockDB 是基于 KV的,可以看做是一个本地数据库(实际使用内存+磁盘)。checkpoint 存到远程的持久化文件系统(FileSystem)上。RockDB 是一个用于快速存储的可嵌入持久化键值存储。他通过 Java Native接口(JNI)与Flink进行交互。
状态后端 | 特点 | 场景 | MemoryStateBackend | 快速、低延迟、但不稳定 | 生产环境不用 | FsStateBackend | 同时拥有内存级的本地访问速度和更好的容错保证,但还是会受到OOM影响 | 分钟级窗口聚合、join | RockDBStateBackend | 超大状态的作业,对状态读写性能要求不高的作业 | 天级窗口聚合 |
设置 StateBackend 的方式
1. 在配置文件中修改 flink-conf.yaml
jobmanager(即MemoryStateBackend)
filesystem(即FsStateBackend)
rocksdb(即RocksDBStateBackend)
state.backend:filesystem
state.checkpoints.dir:hdfs://namenode:9000/flink/checkpoints
2. 在代码中修改
//MemoryStateBackend
val env = StreamExecutionEnvironment.getExecutionEvironment
env.setStateBackend(new HashMapStateBackend)
env.getCheckpointConfig.setCheckpointStorage(new JobManagerCheckpointStorage)
//FsStateBackend
env.setStateBackend(new HashMapStateBackend)
env.getCheckpointConfig.setCheckpointStorage("hdfs://checkpoints")
//RockDBStateBackend
env.setStateBackend(new EmbeddedRocksDBStateBackend)
env.getCheckpointConfig.setCheckpointStorage("hdfs://checkpoints")
如果设置的是RockDBStateBackend,需要先引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
RockDBStateBa 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManger 的数据目录。
运行中的状态首先写入堆外/本机内存,然后当达到配置的阈值时刷新到本地磁盘。这意味着 RocksDBStateBendback 可以支持大于总配置堆容量的状态,或者说其状态大小只受限于整个集群中的可用磁盘空间。另外,因为 RockSDBStateBendback 不使用 JVM 堆来存储运行中的状态,故也不受 JVM 垃圾回收的影响,具有可预测的延迟。
Checkpoint 时,整个RocksDB 数据库 被 checkpoint 到配置的文件系统目录中。少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其存储到 Checkpoint 的元数据文件中)。
除了完整的、自包含的状态快照之外,RocksDBStateBackend 还支持作为性能调优选项的增量 checkpoint。增量 checkpoint 仅存储上次checkpoint之后发生的改变。与执行完整快照相比,这大大减少了checkpoint的时间。Rock是DBStateBendback 是当前唯一支持增量 checkpoint 的状态后端。
|