从某种意义上说,算子状态是更底层的状态类型,因为它只针对当前算子并行任务有效,不需要考虑不同 key 的隔离。
基本概念和特点
算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务,与Key无关,不同Key的数据只要分发到同一个并行子任务,就会访问到同一个算子状态。
算子状态一般用在 Source 或 Sink 等与外部系统连接的算子上,或者完全没有 key 定义的场景。比如 Flink 的 Kafka 连接器中,就用到了算子状态。在我们给 Source 算子设置并行度后,Kafka 消费者的每一个并行实例,都会为对应的主题(topic)分区维护一个偏移量, 作为算子状态保存起来。这在保证 Flink 应用“精确一次”(exactly-once)状态一致性时非常有用。
状态类型
算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState 和 BroadcastState。
列表状态(ListState)
与 Keyed State 中的 ListState 一样,将状态表示为一组数据的列表。
与 Keyed State 中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。
当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的 rebanlance 数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-splitredistribution)。
算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。
联合列表状态(UnionListState)
与 ListState 类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。
UnionListState 的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式
代码实现
状态的本质就是算子并行子任务实例上的一个特殊的本地变量,它的特殊之处就在于Flink会提供完整的管理机制,来保证它的持久化缓存,以便故障进行恢复。
注意:在发生故障,重启应用后,无论按键分区,还是轮询分区,都会发生变化,就好比,打牌的时候从三个人变成四个人时,即使牌的顺序不发生变化,但牌的数量到每个人手里都不同。那么问题来了,如何保证原先状态和故障恢复后数据的对应关系呢?
对于按键状态,因为状态是和key相关的,相同key的数据不管在哪个分区,最后都是在同一个分区,所以只要将状态按照key的哈希值计算对应分区,重新分配即可。
对于算子状态,因为不存在key,所有数据发往哪个分区是不可知的,当故障重启之后,不能保证数据和以前一样进入同一个子任务,访问同一个状态,所以Flink提供了一个接口,根据需求来自行设计快照保存(snapshot)和恢复(restore)逻辑。
CheckpointedFunction 接口
在 Flink 中,对状态进行持久化保存的快照机制叫作“检查点”(Checkpoint)。于是使用算子状态时,就需要对检查点的相关操作进行定义,实现一个CheckpointedFunction 接口。 CheckpointedFunction 接口在源码中定义如下:
public interface CheckpointedFunction {
void snapshotState(FunctionSnapshotContext context) throws Exception
void initializeState(FunctionInitializationContext context) throws Exception;
}
每次应用保存检查点做快照时,都会调用.snapshotState()方法,将状态进行外部持久化。而在算子任务进行初始化时,会调用. initializeState()方法。这又有两种情况:一种是整个应用第一次运行,这时状态会被初始化为一个默认值(default value);另一种是应用重启时,从检查点(checkpoint)或者保存点(savepoint)中读取之前状态的快照,并赋给本地状态。所以,接口中的.snapshotState()方法定义了检查点的快照保存逻辑,而. initializeState()方法不仅定义了初始化逻辑,也定义了恢复逻辑。
这里需要注意,CheckpointedFunction 接口中的两个方法,分别传入了一个上下文(context)作为参数。不同的是,.snapshotState()方法拿到的是快照的上下文 FunctionSnapshotContext,它可以提供检查点的相关信息,不过无法获取状态句柄;而. initializeState()方法拿到的是FunctionInitializationContext,这是函数类进行初始化时的上下文,是真正的“运行时上下文”。FunctionInitializationContext 中提供了“算子状态存储”(OperatorStateStore)和“按键分区状态存储(” KeyedStateStore),在这两个存储对象中可以非常方便地获取当前任务实例中的 OperatorState 和 Keyed State。
例如:
ListStateDescriptor<String> descriptor = new ListStateDescriptor<>(
"buffered-elements",
Types.of( String ) );
ListState<String> checkpointedState = context.getOperatorStateStore().getListState( descriptor );
示例代码
需求:批量缓存输出 (攒到10条数据就做一个输出)
public class BufferingSinkExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
stream.print("input");
stream.addSink(new BufferingSink(10));
env.execute();
}
public static class BufferingSink implements SinkFunction<Event>, CheckpointedFunction {
private final int threshold;
private List<Event> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
private ListState<Event> checkpointedState;
@Override
public void invoke(Event value, Context context) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Event event : bufferedElements) {
System.out.println(event);
}
System.out.println("====================输出完毕====================");
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Event event : bufferedElements) {
checkpointedState.add(event);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Event> descriptor = new ListStateDescriptor<>("buffer", Event.class);
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (Event event : checkpointedState.get()){
bufferedElements.add(event);
}
}
}
}
}
当初始化好状态对象后,可以通过调用. isRestored()方法判断是否是从故障中恢复。在代码中 BufferingSink 初始化时,恢复出的 ListState 的所有元素会添加到一个局部变量bufferedElements 中,以后进行检查点快照时就可以直接使用了。在调用.snapshotState()时,直接清空 ListState,然后把当前局部变量中的所有元素写入到检查点中。
对于不同类型的算子状态,需要调用不同的获取状态对象的接口,对应地也就会使用不同的状态分配重组算法。比如获取列表状态时,调用.getListState() 会使用最简单的 平均分割重组(even-split redistribution)算法;而获取联合列表状态时,调用的是.getUnionListState() , 对应就会使用联合重组(union redistribution) 算法。
Gitee完整代码
|