文:王东阳
前言
在Flink中根据数据集是否根据Key进行分区,将状态分为Keyed State和Operator State(Non-keyed State)两种类型 ,在之前的文章《Flink中基于KeyedState的计算开发方法》已经详细介绍了Keyed State的概念和用法,本文将继续介绍Operator State。
Operator State与Keyed State不同的是,Operator State只和并行的算子实例绑定,和数据元素中的key无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator State支持当算子实例并行度发生变化时自动重新分配状态数据, OperatorState目前只支持使用ListState。
Operator State与并行的操作算子实例相关联,例如在Kafka Connector中,每个Kafka消费端算子实例都对应到Kafka的一个分区中,维护Topic分区和Offsets偏移量作为算子的Operator State 在Flink中可以通过 Checkpointed-Function 或者 ListCheckpointed<T extends Serializable> 两个接口来定义操作Operator State的函数。
Operator State开发实战
本章节将通过实际的项目代码演示Operator State在两种不同计算场景下的开发方法。
在样例中将演示Operator State如何融合进入Flink 的DataStream API,让用户在开发Flink应用的时候,可以将临时数据保存在State中,从State中读取数据,在运行的时候,在运行层面上与算子、Function体系融合,自动对State进行备份Checkpoint,一旦出现异常能够从保存的State中恢复状态,实现Exactly-Once 。
通过CheckpointedFunction接口操作Operator State
CheckpointedFunction 接口定义如代码所示,需要实现两个方法,当checkpoint触发时就会调用snapshotState() 方法,当初始化自定义函数的时候会调用initializeState() 方法,其中包括第一次初始化函数和从之前的checkpoints中恢复状态数据,同时initializeState() 方法中需要包含两套逻辑,
- 一个是不同类型状态数据初始化的逻辑,
- 一个是从之前的状态中恢复数据的逻辑
@Public
public interface CheckpointedFunction {
void snapshotState(FunctionSnapshotContext var1) throws Exception;
void initializeState(FunctionInitializationContext var1) throws Exception;
}
在每个算子中Managed Operator State都是以List形式存储,算子和算子之间的状态数据相互独立,List存储比较适合于状态数据的重新分布,Flink目前支持对Managed Operator State两种重分布的策略,分别是Even-split Redistribution和Union Redistribution。
- Even-split Redistribution:每个算子实例中含有部分状态元素的List列表,整个状态数据是所有List列表的合集。当触发 restore/redistribution 动作时,通过将状态数据平均分配成与算子并行度 相同数量的List列表,每个task实例中有一个List,其可以为空或者含有多个元素
- Union Redistribution:每个算子实例中含有所有状态元素的List列表,当触发 restore/redistribution 动作时,每个算子都能够获取到完整的状态元素列表
实现FlatMapFunction和CheckpointedFunction
在实际项目中可以通过实现FlatMapFunction 和CheckpointedFunction 完成对输入数据中每个key的数据元素数量和算子中的元素数量的统计。如代码所示,通过在initializeState() 方法中分别创建keyedState 和operatorState 两种State,存储基于Key相关的状态值以及基于算子的状态值。
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Collector;
import org.apache.log4j.Logger;
public class CheckpointCount
implements FlatMapFunction<Tuple2<Integer, Long>, Tuple3<Integer, Long, Long>>, CheckpointedFunction {
private static final Logger logger = Logger.getLogger(CheckpointCount.class);
private Long operatorCount;
private ValueState<Long> keyedState;
private ListState<Long> operatorState;
public void CheckpointCount() {
}
@Override
public void flatMap(
Tuple2<Integer, Long> integerLongTuple2, Collector<Tuple3<Integer, Long, Long>> collector) throws Exception {
if (integerLongTuple2.f0 == 4) {
throw new IOException("input ");
}
if (keyedState.value() == null) {
keyedState.update(1L);
} else {
keyedState.update(keyedState.value() + 1L);
}
operatorCount = operatorCount + 1;
collector.collect(Tuple3.of(integerLongTuple2.f0, keyedState.value(), operatorCount));
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
System.out.println("snapshot");
operatorState.clear();
operatorState.add(operatorCount);
}
@Override
public void initializeState(FunctionInitializationContext ctx) throws Exception {
System.out.println("initialize");
logger.debug("init");
keyedState = ctx.getKeyedStateStore().getState(new ValueStateDescriptor<Long>("keyedState", Long.class));
operatorState = ctx.getOperatorStateStore().getListState(new ListStateDescriptor<Long>(
"operatorState",
Long.class));
operatorCount = 0L;
if (ctx.isRestored()) {
List<Long> op = Lists.newArrayList(operatorState.get());
if (op.size() > 0 ) {
operatorCount = op.get(op.size()-1);
}
System.out.println("restored");
}
}
}
代码地址:CheckpointCount.java
可以从上述代码中看到的是,在 snapshotState() 方法中清理掉上一次checkpoint中存储的operatorState 的数据,然后再添加并更新本次算子中需要checkpoint的operatorCount 状态变量。当系统重启时会调用initializeState 方法,重新恢复keyedState 和operatorState ,其中operatorCount 数据可以从最新的operatorState 中恢复。
验证代码
构建验证代码如下:
private static void checkpointOperatorStateWithMapFunction() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(3000);
DataStreamSource<String> localhost = env.socketTextStream("localhost", 1111);
SingleOutputStreamOperator<Tuple2<Integer, Long>> inputStream= localhost.map(new MapFunction<String,
Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> map(String s) throws Exception {
String[] split = s.split(",");
return new Tuple2<>(Integer.valueOf(split[0]), Long.valueOf(split[1]));
}
});
inputStream.keyBy(0).flatMap(new CheckpointCount()).print();
env.execute("checkpoint state for map");
}
代码地址:checkpointOperatorStateWithMapFunction
通过***nc***,我们输入以下数据
DESKTOP-SPIDEIC:~$ nc -lk 1111
2,1
2,2
2,3
3,1
3,2
3,3
得到打印输出
initialize
snapshot
snapshot
snapshot
(2,1,1)
(2,2,2)
(2,3,3)
snapshot
(3,1,4)
(3,2,5)
snapshot
(3,3,6)
snapshot
可以看到
- 由于 keyedState 是跟key相关的,所以当
integerLongTuple2.f0 从2变为3的时候, keyedState 是重新初始化,从1开始递增 - 由于 operatorState只跟算子相关的,所以一直在递增
- 由于代码中使用
env.enableCheckpointing(3000) 开启了checkpoint,可以看到 snapshotState 中的日志打印出来
我们等(3,3,6)后面的snapshot打印出来后,接下来通过***nc***继续输入
4,3
由于我们在代码设置当 if (integerLongTuple2.f0 == 4) 的时候抛出异常,所以此刻flink程序就会退出,然后重启,进入到 initializeState
看到对应的打印如下
initialize
restored
snapshot
snapshot
可以看到初始化(initialize)以及恢复(restored)的逻辑都执行到了,我们通过nc继续输入 3,5 ,可以看到程序打印出 (3,4,7) , 说明 keyedState 以及operatorCount 都正常恢复了之前的值。
对于状态数据重分布策略的使用,可以在创建operatorState的过程中通过相应的方法指定:如果使用Even-split Redistribution策 略,则通过context. getListState(descriptor) 获取OperatorState;如果使用Union Redistribution策略,则通过context.getUnionList State(descriptor) 来获取。实例代码中默认使用的Even-split Redistribution策略。
通过CheckpointedFunction构建带缓冲区的Sink
下面我们再看另外一个例子,构建一个带缓冲的Sink。如代码所示,通过checkpointState保存当前已经接受的所有元素列表。
实现SinkFunction和CheckpointedFunction
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
public class BufferingSink
implements SinkFunction<Tuple2<Integer, Long>>,
CheckpointedFunction {
private final int threshold;
private transient ListState<Tuple2<Integer, Long>> checkpointState;
private List<Tuple2<Integer, Long>> bufferedElements;
public BufferingSink(int threshold){
this.bufferedElements = new ArrayList<>();
this.threshold = threshold;
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
System.out.println("snapshot");
checkpointState.clear();
checkpointState.addAll(bufferedElements);
}
@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
System.out.println("initialize");
ListStateDescriptor<Tuple2<Integer, Long>> descriptor =
new ListStateDescriptor<Tuple2<Integer, Long>>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {})
);
checkpointState = functionInitializationContext.getOperatorStateStore().getListState(descriptor);
if (functionInitializationContext.isRestored()) {
for (Tuple2<Integer, Long> element : checkpointState.get()){
bufferedElements.add(element);
}
}
}
@Override
public void invoke(
Tuple2<Integer, Long> value, Context context) throws Exception {
System.out.println(String.format("recv %d %d", value.f0, value.f1));
bufferedElements.add(value);
if (value.f0 == 4) {
throw new IOException("input ");
}
}
@Override
public void finish() throws Exception {
for (Tuple2<Integer, Long> element: bufferedElements) {
System.out.println(element);
}
}
}
代码地址:CheckpointBufferingSink.java
验证代码
构建验证程序如下:
private static void checkpointListStateWithSinkFunction() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(3000);
DataStreamSource<String> localhost = env.socketTextStream("localhost", 1111);
SingleOutputStreamOperator<Tuple2<Integer, Long>> inputStream= localhost.map(new MapFunction<String,
Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> map(String s) throws Exception {
String[] split = s.split(",");
return new Tuple2<>(Integer.valueOf(split[0]), Long.valueOf(split[1]));
}
});
inputStream.addSink(new BufferingSink(2));
env.execute("checkpoint state for sink");
}
代码地址:checkpointListStateWithSinkFunction
通过 nc 输入如下数据
wdy@DESKTOP-SPIDEIC:~$ nc -lk 1111
2,1
2,2
2,3
flink程序输出如下打印:
initialize
snapshot
recv 2 1
recv 2 2
snapshot
recv 2 3
snapshot
等 recv 2 3 之后看到 snapshot 打印出来后,用于确保最后一条(2,3)也保存到了checkpoint中,通过 nc 输入
4,0
由于我们在代码设置当 if (integerLongTuple2.f0 == 4) 的时候抛出异常,所以此刻flink程序就会退出,然后重启,进入到 initializeState
flink程序输出如下打印:
recv 4 0
initialize
restored
snapshotrecv 4 0
initialize
restored
snapshot
可以看到 正确进入到initializeState 并且执行了恢复逻辑,接下来通过 nc 输入
2,4
flink程序输出如下打印
recv 2 4
snapshot
接下来 Ctrl+C 停止 nc ,进入到 finish() 函数中,flink程序输出
(2,1)
(2,2)
(2,3)
(2,4)
标明flink异常退出重启后,正确从checkpointState 恢复了之前的数据。
通过ListCheckpointed接口定义Operator State
ListCheckpointed 接口和CheckpointedFunction 接口相比在灵活性上相对弱一些,只能支持List类型的状态,并且在数据恢复的时候仅支持even-redistribution策略。在ListCheckpointed 接口中需要实现以下两个方法来操作Operator State:
public interface ListCheckpointed<T extends Serializable> {
List<T> snapshotState(long var1, long var3) throws Exception;
void restoreState(List<T> var1) throws Exception;
}
其中snapshotState 方法定义数据元素List存储到checkpoints的逻辑,restoreState 方法则定义从checkpoints中恢复状态的逻辑。如果状态数据不支持List形式,则可以在snapshotState 方法中返回Collections.singletonList(STATE) 。
这个接口在Flink 1.14中已经不建议使用,所以本文也不再进行实例演示。
总结
本文介绍了OperateState在MapFunction以及SlinkFunction两种操作场景中的应用,同时展示了如何通过结合CheckpointedFunction自动对State进行备份Checkpoint,从而在任务出现异常时能够从保存的State中恢复状态,实现Exactly-Once。
参考资料
- 《Flink原理、实战与性能优化》5.1 有状态计算
- 《Flink内核原理与实现》第7章 状态原理
- Flink教程(17) Keyed State状态管理之AggregatingState使用案例 求平均值 https://blog.csdn.net/winterking3/article/details/115133519
|