在Flink StreamExecutionEnvironment对应的configuration中新增配置execution.savepoint.path就可以在启动Flink任务的时候默认从上一次的状态中恢复过来
Configuration configuration1 = new Configuration();
//flink parallelism=16 savepoint state
// configuration1.setString("execution.savepoint.path",
// "file:///Users/wenbao/checkPoint/d8ca368f349922c36c20498e1bedb9e7/chk-1");
//flink parallelism=3 savepoint state
configuration1.setString("execution.savepoint.path",
"file:///Users/wenbao/checkPoint/efc27bd1f33fcbd5eae9ab8431d64251/chk-1");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(
configuration1);
下面是测试hybird source从flink checkPoint中恢复的代码: ?
public static void main(String[] args) throws Exception {
Configuration configuration1 = new Configuration();
//flink parallelism=16 savepoint state
// configuration1.setString("execution.savepoint.path",
// "file:///Users/wenbao/checkPoint/d8ca368f349922c36c20498e1bedb9e7/chk-1");
//flink parallelism=3 savepoint state
configuration1.setString("execution.savepoint.path",
"file:///Users/wenbao/checkPoint/efc27bd1f33fcbd5eae9ab8431d64251/chk-1");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(
configuration1);
env.setParallelism(4);
env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(2 * 60 * 1000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend("file:///Users/wenbao/checkPoint"));
Path path = Path.fromLocalFile(new File("a"));
FileSource<String> source = FileSource.forRecordStreamFormat(new TextLineFormat(), path)
.build();
HybridSource<String> hybridSource =
HybridSource.builder(source)
.addSource(
KafkaSource.<String>builder()
.setStartingOffsets(
OffsetsInitializer.timestamp(1648542406228L))
.setBootstrapServers(
"localhost:9092")
.setTopics("wb_test")
.setGroupId("wb-test")
.setDeserializer(
new KafkaRecordDeserializationSchema<String>() {
@Override
public void deserialize(
ConsumerRecord<byte[], byte[]> record,
Collector<String> out)
throws IOException {
out.collect(new String(record.value()));
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
})
.build())
.build();
DataStreamSource<String> stringDataStreamSource = env.fromSource(hybridSource,
WatermarkStrategy.noWatermarks(), "file-source", TypeInformation.of(String.class));
stringDataStreamSource.print();
env.execute("aaa");
}
|