1.Flink 中的状态
1.1 状态管理
-
状态的访问权限。我们知道 Flink 上的聚合和窗口操作,一般都是基于 KeyedStream的,数据会按照 key 的哈希值进行分区,聚合处理的结果也应该是只对当前 key 有效。然而同一个分区(也就是 slot)上执行的任务实例,可能会包含多个 key 的数据,它 们同时访问和更改本地变量,就会导致计算结果错误。所以这时状态并不是单纯的本地变量。 -
容错性,也就是故障后的恢复。状态只保存在内存中显然是不够稳定的,我们需要将它持久化保存,做一个备份;在发生故障后可以从这个备份中恢复状态。 -
我们还应该考虑到分布式应用的横向扩展性。比如处理的数据量增大时,我们应该相应地对计算资源扩容,调大并行度。这时就涉及到了状态的重组调整。
1.2 状态的分类
托管状态(Managed State)和原始状态(Raw State)
- 托管状态是由 Flink 的运行时(Runtime)来托管的
- 托管状态是由 Flink 的运行时(Runtime)来托管的
托管状态细分: 算子状态(Operator State)和按键分区状态(Keyed State)
- 算子状态: 列表状态, 联合列表状态, 广播状态
- 按键分区状态: 值状态, 列表状态, 映射状态, 聚合状态, reducingState 归约状态
算子状态: 按键分区状态: 算子状态可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别——因为本地变量的作用域也是当前任务实例。在使用时,我们还需进一步实现 CheckpointedFunction 接口。
按键分区状态应用非常广泛。之前讲到的聚合算子必须在 keyBy 之后才能使用,就是因为聚合的结果是以 Keyed State 的形式保存的。另外,也可以通过富函数类(Rich Function)来自定义 Keyed State,所以只要提供了富函数类接口的算子,也都可以使用 Keyed State。
2.按键分区状态 (keyed state)
2.1 值类型
public interface ValueState<T> extends State {
T value() throws IOException;
void update(T value) throws IOException;
}
[需求]
我们这里会使用用户 id 来进行分流,然后分别统计每个用户的 pv 数据,由于我们并不想每次 pv 加一,就将统计结果发送到下游去,所以这里我们注册了一个定时器,用来隔一段时间发送 pv 的统计结果,这样对下游算子的压力不至于太大。具体实现方式是定义一个用来保 存定时器时间戳的值状态变量。当定时器触发并向下游发送数据以后,便清空储存定时器时间戳的状态变量,这样当新的数据到来时,发现并没有定时器存在,就可以注册新的定时器了,注册完定时器之后将定时器的时间戳继续保存在状态变量中。
public class PeriodicPvExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
SingleOutputStreamOperator<Event> stream = environment.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
stream.print("input");
stream.keyBy(data -> data.user)
.process(new MyProcessKeyedFunction())
.print();
environment.execute();
}
public static class MyProcessKeyedFunction extends KeyedProcessFunction<String, Event, String>{
ValueState<Long> countState;
ValueState<Long> timerTsState;
@Override
public void open(Configuration parameters) throws Exception {
countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", Long.class));
}
@Override
public void processElement(Event event, Context context, Collector<String> collector) throws Exception {
Long value = countState.value();
countState.update(value == null? 1: value + 1);
if (timerTsState.value() == null){
context.timerService().registerEventTimeTimer(event.timestamp + 10 * 1000L);
timerTsState.update(event.timestamp + 10 * 1000);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey() + " pv" + countState.value());
timerTsState.clear();
ctx.timerService().registerEventTimeTimer(timestamp + 10 * 1000L);
timerTsState.update(timestamp + 10 * 1000);
}
}
}
2.2 列表状态 List State
public interface ListState<T> extends MergingState<T, Iterable<T>> {
void update(List<T> var1) throws Exception;
void addAll(List<T> var1) throws Exception;
}
[需求]
SELECT * FROM A INNER JOIN B WHERE A.id = B.id;
public class TwoStreamJoinExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = environment.fromElements(
Tuple3.of("1", "1", 1000L),
Tuple3.of("2", "2", 2000L),
Tuple3.of("3", "3", 3000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {
return stringStringLongTuple3.f2;
}
})
);
SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = environment.fromElements(
Tuple3.of("1", "11", 1000L),
Tuple3.of("2", "22", 1000L),
Tuple3.of("3", "33", 1000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {
return stringStringLongTuple3.f2;
}
})
);
stream1.keyBy(data -> data.f0)
.connect(stream2.keyBy(data -> data.f0))
.process(new CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {
ListState<Tuple2<String, Long>> stream1List;
ListState<Tuple2<String, Long>> stream2List;
@Override
public void open(Configuration parameters) throws Exception {
stream1List = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Long>>("stream1-list", Types.TUPLE(Types.STRING, Types.LONG)));
stream2List = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Long>>("stream2-list", Types.TUPLE(Types.STRING, Types.LONG)));
}
@Override
public void processElement1(Tuple3<String, String, Long> left, Context context, Collector<String> collector) throws Exception {
for (Tuple2<String, Long> right : stream1List.get()) {
collector.collect(left.f0 + " " + left.f2 + "=>" + right);
}
stream1List.add(Tuple2.of(left.f0, left.f2));
}
@Override
public void processElement2(Tuple3<String, String, Long> right, Context context, Collector<String> collector) throws Exception {
for (Tuple2<String, Long> left : stream1List.get()) {
collector.collect(right.f0 + " " + right.f2 + "=>" + left);
}
stream1List.add(Tuple2.of(right.f0, right.f2));
}
})
.print();
environment.execute();
}
}
2.3 映射状态 Map State
public interface MapState<UK, UV> extends State {
UV get(UK var1) throws Exception;
void put(UK var1, UV var2) throws Exception;
void putAll(Map<UK, UV> var1) throws Exception;
void remove(UK var1) throws Exception;
boolean contains(UK var1) throws Exception;
Iterable<Entry<UK, UV>> entries() throws Exception;
Iterable<UK> keys() throws Exception;
Iterable<UV> values() throws Exception;
Iterator<Entry<UK, UV>> iterator() throws Exception;
boolean isEmpty() throws Exception;
}
[需求]
映射状态的用法和 Java 中的 HashMap 很相似。在这里我们可以通过 MapState 的使用来探索一下窗口的底层实现,也就是我们要用映射状态来完整模拟窗口的功能。这里我们模拟一个滚动窗口 。我们要计算的是每一个 url 在每一个窗口中的 pv 数据。我们之前使用增量聚合和全窗口聚合结合的方式实现过这个需求。这里我们用 MapState 再来实现一下。
public class FakeWindowExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
SingleOutputStreamOperator<Event> stream = environment.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
stream.print("input");
stream.keyBy(data -> data.url)
.process(new FakeWindowResult(10000L))
.print();
environment.execute();
}
public static class FakeWindowResult extends KeyedProcessFunction<String, Event, String>{
private Long windowSize;
public FakeWindowResult(Long windowSize) {
this.windowSize = windowSize;
}
MapState<Long, Long> mapState;
@Override
public void open(Configuration parameters) throws Exception {
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("map-count", Long.class, Long.class));
}
@Override
public void processElement(Event event, Context context, Collector<String> collector) throws Exception {
Long windowStart = event.timestamp / windowSize * windowSize;
Long windowEnd = windowStart + windowSize;
context.timerService().registerEventTimeTimer(windowEnd - 1);
if (mapState.contains(windowStart)){
mapState.put(windowStart, mapState.get(windowStart) + 1);
}else {
mapState.put(windowStart, 1L);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
Long windowEnd = timestamp + 1;
Long windowStart = windowEnd - windowSize;
Long count = mapState.get(windowStart);
out.collect("窗口" + new Timestamp(windowStart) + " ~ " + new Timestamp(windowEnd) + " url: " + ctx.getCurrentKey() + " count: " + count);
mapState.remove(windowStart);
}
}
}
2.4 聚合状态 Aggregating State
public interface AggregatingState<IN, OUT> extends MergingState<IN, OUT> {
}
public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> {
}
public interface AppendingState<IN, OUT> extends State {
OUT get() throws Exception;
void add(IN var1) throws Exception;
}
public interface State {
void clear();
}
[需求]
我们举一个简单的例子,对用户点击事件流每 5 个数据统计一次平均时间戳 。这是一个类似计数窗口 (CountWindow)求平均值的计算 ,这里我们可以使用一个有聚合状态的RichFlatMapFunction 来实现。
public class AverageTimestampExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
SingleOutputStreamOperator<Event> stream = environment.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
stream.print("input");
stream.keyBy(data -> data.user)
.flatMap(new AvgTsResult(5L))
.print();
environment.execute();
}
public static class AvgTsResult extends RichFlatMapFunction<Event, String>{
private Long count;
public AvgTsResult(Long count) {
this.count = count;
}
AggregatingState<Event, Long> aggregatingState;
ValueState<Long> valueState;
@Override
public void open(Configuration parameters) throws Exception {
aggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Event, Tuple2<Long, Long>, Long>("agg",
new AggregateFunction<Event, Tuple2<Long, Long>, Long>() {
@Override
public Tuple2<Long, Long> createAccumulator() {
return Tuple2.of(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Event event, Tuple2<Long, Long> longLongTuple2) {
return Tuple2.of(event.timestamp + longLongTuple2.f0, longLongTuple2.f1 + 1);
}
@Override
public Long getResult(Tuple2<Long, Long> longLongTuple2) {
return longLongTuple2.f0/longLongTuple2.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> longLongTuple2, Tuple2<Long, Long> acc1) {
return null;
}
}
, Types.TUPLE(Types.LONG, Types.LONG)));
valueState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
}
@Override
public void flatMap(Event event, Collector<String> collector) throws Exception {
Long value = valueState.value();
if (value == null){
value = 1L;
}else {
value ++;
}
valueState.update(value);
aggregatingState.add(event);
if (value.equals(count)){
collector.collect(event.user + "过去" + count + "平均时间戳为: " + aggregatingState.get());
valueState.clear();
aggregatingState.clear();
}
}
}
}
3.状态生存时间 Ttl
失效时间 = 当前时间 + TTL
ValueState<Event> valueState = null;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Event> stateDescriptor = new ValueStateDescriptor<>("value-state", Event.class);
valueState = getRuntimeContext().getState(stateDescriptor);
StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.hours(1))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build();
stateDescriptor.enableTimeToLive(stateTtlConfig);
}
- .newBuilder():
状态 TTL 配置的构造器方法,必须调用,返回一个 Builder 之后再调用.build()方法就可以得到 StateTtlConfig 了。方法需要传入一个 Time 作为参数,这就是设定的状态生存时间。 - setUpdateType()
设置更新类型。更新类型指定了什么时候更新状态失效时间 - .setStateVisibility()
设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能基于存在,这时如果对它进行访问,能否正常读取到就是一个问题了。
4.状态一致性说明
4.1 一致性级别说明
在流处理中,一致性可以分为 3 个级别:
- at-most-once: 这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。同样的还有 udp。
- at-least-once: 这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。
- exactly-once: 这指的是系统保证在发生故障后得到的计数结果与正确值一致。
曾经,用户不得不在保证exactly-once 与获得低延迟和效率之间权衡利弊。Flink 避免了这种权衡。 Flink 的一个重大价值在于:它既保证了 exactly-once,也具有低延迟和高吞吐的处理能力。
4.2 端到端状态一致性
目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。
端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。
- 内部保证 —— 依赖 checkpoint
- source 端 —— 需要外部源可重设数据的读取位置
- sink 端 —— 需要保证从故障恢复时,数据不会重复写入外部系统而对于 sink 端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。
- 幂等写入
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。 - 事务写入
需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint真正完成的时候,才把所有对应的结果写入 sink 系统中。
对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。DataStream API 提供了 GenericWriteAheadSink 模板类和TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。
|