在上一章节中,我简单介绍了State 的读取操作 Flink小知识–State Processor API的简单讲解(1) State的读取 本章节将重点简述下 state 的写以及修改,主要以 Keyed State为例 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/
1. Writing New Savepoints
基于上一期的key state 案例,本期生成的state 数据结构将与上一期一致
{
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
String oldsavepointPath ="file:///D:\\IDEAspaces\\bigdata_study\\bigdata_flink\\data\\checkpoint\\ce4e7457dfcd7bf92d046a0e70b4a992\\chk-1";
String savepointPath ="file:///D:\\IDEAspaces\\bigdata_study\\bigdata_flink\\data\\checkpoint\\ce4e7457dfcd7bf92d046a0e70b4a992\\chk-3";
int maxParallelism = 128;
DataSource<Tuple2<String,Integer>> fromCollection = bEnv.fromCollection(Arrays.asList(
Tuple2.of("a", 10),
Tuple2.of("b", 10),
Tuple2.of("d", 10)
)
);
BootstrapTransformation<Tuple2<String, Integer>> transform = OperatorTransformation
.bootstrapWith(fromCollection)
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.transform(new keyBootstrapper());
Savepoint.create(new MemoryStateBackend(),maxParallelism)
.withOperator("key_uid", transform)
.write(savepointPath);
bEnv.execute();
}
?
public static class keyBootstrapper extends KeyedStateBootstrapFunction<String, Tuple2<String,Integer>> {
ValueState<Integer> state;
ListState<Long> updateTimes;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
state = getRuntimeContext().getState(stateDescriptor);
ListStateDescriptor<Long> updateDescriptor = new ListStateDescriptor<>("times", Types.LONG);
updateTimes = getRuntimeContext().getListState(updateDescriptor);
}
@Override
public void processElement(Tuple2<String,Integer> value, Context ctx) throws Exception {
state.update(value.f1 + 1);
updateTimes.add(System.currentTimeMillis());
}
}
在savepointPath 目录中新生成_metaata
KeyedStates{key='a', value=11, times=[1626158781896]}
KeyedStates{key='d', value=11, times=[1626158781896]}
KeyedStates{key='b', value=11, times=[1626158781896]}
2. Modifying Savepoints
获取老的state 数据,加上 新增的 算子 uid 组成一个新的savepoint
Savepoint.load(bEnv,oldsavepointPath,new MemoryStateBackend())
.withOperator("key_uid2", transform)
.write(savepointPath);
KeyedStates{key='a', value=2, times=[1626157285704]}
KeyedStates{key='d', value=2, times=[1626157292202]}
KeyedStates{key='c', value=2, times=[1626157289201]}
KeyedStates{key='a', value=11, times=[1626158781896]}
KeyedStates{key='d', value=11, times=[1626158781896]}
KeyedStates{key='b', value=11, times=[1626158781896]}
|