大家好,我是雷恩Layne,这是《深入浅出flink》系列的第十六篇文章,希望能对您有所收获O(∩_∩)O
一、什么是State
我们知道,Flink的一个算子可能会有多个子任务,每个子任务可能分布在不同的实例(即slot)上,我们可以把Flink的状态理解为某个算子的子任务在其当前实例上的一个变量,该变量记录了流过当前实例算子的历史记录产生的结果。当新数据记录流入时,我们需要结合该结果(即状态,State)来进行计算。
实际上,Flink的状态是由算子的子任务来创建和管理的。一个状态的更新和获取的流程如下图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。一个简单的例子是对一个时间窗口内流入的某个整数字段进行求和,那么当算子子任务接收到新元素时,会获取已经存储在状态中的数值(历史记录的求和结果),然后将当前输入加到状态上,并将状态数据更新。
为了保证流式计算的高可用性(容错),子任务的状态除了会暂存在节点内,还需要进行持久化存储(快照),这就是所谓的Checkpoint。当子任务出现故障\或重启任务时,可以从持久化的Checkpoint中恢复。
二、Keyed State和Operator State
2.1 Managed State和Raw State
按照状态的管理方式来分,Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。从名称中也能读出两者的区别:Managed State是由Flink直接管理的,由Flink帮忙存储、恢复和优化;Raw State是开发者自己管理的,需要自己序列化。
两者的具体区别有:
- 从状态管理的方式上来说,Managed State由Flink Runtime托管,状态是自动存储、自动恢复的,Flink在存储管理和持久化上做了一些优化。当我们横向伸缩(即状态缩放),或者说我们修改Flink应用的并行度时,状态也能自动重新分布到多个并行实例上。Raw State是用户自定义的状态。
- 从状态的数据结构上来说,Managed State支持了一系列常见的数据结构,如ValueState、ListState、MapState等。Raw State只支持字节,任何上层数据结构需要序列化为字节数组。使用时,需要用户自己序列化,以非常底层的字节数组形式存储,Flink并不知道存储的是什么样的数据结构。
- 从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State是在已有算子和Managed State不够用时,用户自定义算子时使用。
用表格表示如下:
| Managed State | Raw State |
---|
状态管理方式 | Flink Runtime托管,状态是自动存储、自动恢复、自动伸缩 | 用户自己管理 | 状态数据结构 | Flink提供的常用数据结构,如ListState、MapState等 | 字节数组: byte[] | 使用场景 | 绝大多数Flink算子 | 用户自定义算子 |
实际上,在绝大多数场景下我们都不需要自行维护状态,所以这里只介绍托管状态。对Managed State继续细分,又可以分为两种类型:Keyed State和Operator State。
2.2 Keyed State
我们首先来看Keyed State。我们知道,env.addSource()方法返回的是一个类型为DataStream的数据流,而这个数据流再按照数据记录中的某个关键字段(比如id字段)为Key进行了keyBy分组操作,得到就是一个类型为KeyedStream的数据流。Keyed State就是这个KeyedStream上的状态。数据流中所有相同id值的的记录共享一个状态(比如数据记录求和的值),可以访问和更新这个状态。以此类推,每个Key对应一个自己的状态。下图展示了Keyed State,因为一个算子子任务可以处理一到多个Key,算子子任务1处理了两种Key,两种Key分别对应自己的状态。
Keyed State支持的数据结构如下:
- 值状态(Value state):将状态表示为单个的值
- 列表状态(List state):将状态表示为一组数据的列表
- 映射状态(Map state):将状态表示为一组 Key-Value 对
- 聚合状态(Reducing state & Aggregating State):将状态表示为一个用于聚合操作的元素
聚合状态(Reducing state & Aggregating State)内部是通过ReduceFunction和AggregateFunction进行聚合的
2.3 Operator State
介绍完Keyed State,我们再来看Operator State。顾名思义,Operator State就是算子上的状态,每个算子子任务管理自己的Operator State。虽然理论上它可以用在所有算子上,但在实际应用中它常常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。每个算子的子任务或者说每个算子实例共享同一个状态,流入这个算子子任务的数据可以访问和更新这个状态。下图展示了Operator State,算子子任务1上的所有数据可以共享第一个Operator State,以此类推,每个算子子任务上的数据共享自己的状态。
Operator State支持的数据结构如下:
- 列表状态(List state):将状态表示为一组数据的列表。
- 联合列表状态(Union list state):也将状态表示为数据的列表,它与常规列表状态的区别在于,状态缩放时状态该如何分配。ListState是将整个状态列表按照round-ribon的模式均匀分布到各个算子子任务上,而Union list state按照广播的模式,将所有状态合并,再分发给每个实例的子任务上。
- 广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。广播状态是固定维护在堆内存中的,不会写入文件系统或者RocksDB。广播流一侧修改广播状态的键值之后,数据流一侧就可以立即感知到变化。在开发过程中,如果遇到下发/广播配置、规则等低吞吐事件流到下游所有task时,就可以使用Broadcast state的特性。
2.4 两者区别
Keyed State和Operator State的区别如下:
| Keyed State | Operator State |
---|
适用算子类型 | 只适用于KeyedStream 上的算子 | 可以用于所有算子 | 状态分配 | 每个Key对应一个状态 | 一个算子子任务对应一个状态 | 创建和访问方式 | 重写对应的算子Rich Function,通过里面的RuntimeContext访问 | 实现CheckpointedFunction等接口 | 状态缩放 | 状态随着Key自动在多个算子任务上迁移 | 有多种状态重新分配方式 | 支持的数据结构 | ValueState、ListState、MapState、Reducing state、Aggregating State | List state、Union list state、Broadcast state |
三、状态缩放(rescale)
状态缩放(rescale),即状态的横向扩展问题。该问题主要是指因为一些业务原因,需要修改Flink作业的并行度(比如,发现某个运行中的作业的某个算子的耗时较长,影响了整体的计算速度,需要重新调整该算子的并行度,以提升作业的整体处理速度;又比如,发现某个运行的作业的资源利用率不高,可以减少一些算子的并行度)。对于Flink而言,当某个算子的并行实例数或算子的子任务数发生了变化,应用需要关停或新启动一些算子子任务,某些原来在某个算子子任务上的状态数据需要平滑地更新到新的算子子任务上。
如下图所示,Flink的Checkpoint机制,为状态数据在各算子间迁移提供了保障。Flink定期将分布式节点上的状态数据生成快照(SNAPSHOT),并保存到分布式存储(如rocksDb或hdfs)上。横向伸缩后,算子子任务的个数发生变化,子任务重启,相应的状态从分布式存储上重建即可。
以扩容为例,上图将算子B和C进行了扩容(并行度从2调整到了3)。算子的扩缩容涉及到状态的重新分配。显然,Keyed State和Operator State重新分配机制是不一样的。相对来说,Operator State的重新分配更为简单,有两种常见的状态分配方式:一种是均匀分配(即List state的方式),另一种是将所有状态合并(即Union list state的方式),再分发给每个实例上。下面以Source接入kafka消息为例,先介绍Operator State的重新分配机制。假如接入消息的topic的分区数为5,且Source一开始的并行度为1,扩容后的并行度为2,则扩容前后Operator State的重新分配结果如下图(缩容为反向过程):
我们接着来看Keyed State的重新分配。按照最简单的思路考虑,Flink中的key是按照hash(key) % parallelism的规则分配到各个Sub-Task上去的,那么我们可以在缩放完成后,根据新分配的key集合从hdfs直接取回对应的Keyed State数据。下图示出并行度从3增加到4后,Keyed State中各个key的重新分配过程。
在Checkpoint发生时,状态数据是顺序写入文件系统的。但从上图可以看出,从状态恢复时是随机读的(而不是顺序读),效率非常低下。并且缩放之后各SubTask处理的key有可能大多都不是缩放之前的那些key,无形中降低了本地性。为了解决这两个问题,在FLINK-3755对Keyed State专门引入了Key Group,下面具体看看。
四、Key Group的原理
以下引自Flink官方文档:
Keyed State is further organized into so-called Key Groups. Key Groups are the atomic unit by which Flink can redistribute Keyed State; there are exactly as many Key Groups as the defined maximum parallelism. During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups.
翻译一下,Key Group是Keyed State分配的原子单位,且Flink作业内Key Group的数量与最大并行度相同,也就是说Key Group的索引位于[0, maxParallelism-1]的区间内。每个Sub-Task都会处理一个到多个Key Group,在源码中,以KeyGroupRange这一数据结构来表示。即KeyGroupRange实际上是多个连续的Key Group组成的闭区间([startKeyGroup, endKeyGroup])。
我们还有两个问题需要解决:
- 如何决定一个key该分配到哪个Key Group中?
- 如何决定一个Sub-Task该处理哪些Key Group(即对应的KeyGroupRange)?
对于第一个问题,Flink实际上是对原始的key进行两重哈希(一次取hashCode,一次做MurmurHash)之后,再对最大并行度取余,得到Key Group的索引。源码如下:
public static int assignToKeyGroup(Object key, int maxParallelism) {
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
而对于第二个问题,由源码可知,SubTask处理哪些Key Group是由并行度、最大并行度和算子实例(即SubTask)的ID共同决定的。源码如下:
public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
int maxParallelism,
int parallelism,
int operatorIndex) {
checkParallelismPreconditions(parallelism);
checkParallelismPreconditions(maxParallelism);
Preconditions.checkArgument(maxParallelism >= parallelism,
"Maximum parallelism must not be smaller than parallelism.");
int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
return new KeyGroupRange(start, end);
}
简单来说就是,Flink会将[0, maxParallelism-1]的区间内的Key Group尽可能均匀地、连续地分给各SubTask。按照这样的Key Group分配逻辑,上一节中Keyed State重分配的场景就会变成下图所示(设最大并行度为10)。
很明显,将Key Group作为Keyed State的基本分配单元之后,上文所述本地性差和随机读的问题都部分得到了解决。当然还要注意,最大并行度对Key Group分配的影响是显而易见的,因此不要随意修改最大并行度的值。Flink内部确定默认最大并行度的逻辑如下代码所示:
public static int computeDefaultMaxParallelism(int operatorParallelism) {
checkParallelismPreconditions(operatorParallelism);
return Math.min(
Math.max(
MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
UPPER_BOUND_MAX_PARALLELISM);
}
其中,下限值DEFAULT_LOWER_BOUND_MAX_PARALLELISM 为128,上限值UPPER_BOUND_MAX_PARALLELISM 为32768。
看到了这里你可能有一些疑问,我刚看到这些内容时也有不少疑问,大致如下:
-
Flink作业内Key Group的数量与最大并行度相同,每个Sub-Task都会处理一个到多个Key Group。这句话怎么理解? Key Group是Keyed State分配的原子单位,一个subTask对应一个keyGroupRange,keyGroupRange(start和end的闭区间)包含多个keyGroup。keyGroupRange的个数跟算子的并行度一样,keyGroup的个数和最大并行度一样。一个keyGroupRange中的多个keyGroup会被分配到一个subTask。 -
为什么能改善随机读的问题? 在Checkpoint发生时,状态数据是顺序写入文件系统的。如果采用之前的方法,从状态恢复时是随机读的(而不是顺序读),效率非常低下。而keyGroupRange是多个连续的Key Group组成的闭区间([startKeyGroup, endKeyGroup]),所以获取数据时是顺序读。 -
为什么能改善本地读? 先来说下为什么之前的方法降低了本地读?我们知道,每个子任务在将状态保存到checkpoint时(比如保存到hdfs),肯定遵循本地性元素,即第一个副本优先保存到本地结点,然后再保存其它副本时才会选择远程结点。所以,虽然状态保存在hdfs中,但是子任务的实例和状态还是在一个结点中的。由于缩放之后并行度发生了改变,如果通过hash(key) % parallelism的方式获取相应的状态,很有可能大多都不是缩放之前的那些key,无形中降低了本地性。而现在hash的计算方式变成MathUtils.murmurHash(keyHash) % maxParallelism ,不再依赖于算子的parallelism,而是依赖于更稳定的maxParallelism。另外一个subTask获取的是keyGroupRange所有的连续的Key Group,所以很有可能是从本地获取的,从而改善了本地读。(个人见解)
需要注意的是,改善本地读的前提一定是程序在不重启的情况下动态改变并行度,如果整个程序重启,那么新生成的subTask很可能和之前的不一样,就没有本地性而言了,反转都是从hdfs获取的。但是,随机读还是能得到保障的。
五、常见状态使用方法
由于实际环境中使用最多的是Keyed State,所以这里先介绍Keyed State的使用方法。Flink提供了几种现成的数据结构供我们使用,他们的继承关系如下图所示。首先,State主要有三种实现,分别为ValueState、MapState和AppendingState,AppendingState又可以细分为ListState、ReducingState和AggregatingState。
5.1 KeyedState之ValueState
ValueState[T] 是单一变量的状态,T是某种具体的数据类型,比如Double 、String ,或我们自己定义的复杂数据结构。我们可以使用value() 方法获取状态,使用update(value: T) 更新状态。
需求:当接收到的相同 key 的元素个数等于 3 个,就计算这些元素的 value 的平均值。
(1)继承算子的RichFunction,创建状态并编写业务逻辑。
public class CountWindowAverageWithValueState
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
private ValueState<Tuple2<Long, Long>> countAndSum;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<Tuple2<Long, Long>>(
"average",
Types.TUPLE(Types.LONG, Types.LONG));
countAndSum = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> element,
Collector<Tuple2<Long, Double>> out) throws Exception {
Tuple2<Long, Long> currentState = countAndSum.value();
if (currentState == null) {
currentState = Tuple2.of(0L, 0L);
}
currentState.f0 += 1;
currentState.f1 += element.f1;
countAndSum.update(currentState);
if (currentState.f0 >= 3) {
double avg = (double)currentState.f1 / currentState.f0;
out.collect(Tuple2.of(element.f0, avg));
countAndSum.clear();
}
}
}
(2)Main方法
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L),
Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L), Tuple2.of(2L, 2L), Tuple2.of(2L,
5L));
dataStreamSource
.keyBy(0)
.flatMap(new CountWindowAverageWithValueState())
.print();
env.execute();
}
}
输出:
3> (1,5.0)
4> (2,3.6666666666666665)
5.2 KeyedState之ListState
ListState[T] 存储了一个由T类型数据组成的列表。我们可以使用add(value: T) 或addAll(values: java.util.List[T]) 向状态中添加元素,使用get(): java.lang.Iterable[T] 获取整个列表,使用update(values: java.util.List[T]) 来更新列表,新的列表将替换旧的列表。
需求:当接收到的相同 key 的元素个数等于 3 个,就计算这些元素的 value 的平均值。
(1)继承算子的RichFunction,创建状态并编写业务逻辑。
public class CountWindowAverageWithListState
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
private ListState<Tuple2<Long, Long>> elementsByKey;
@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<Tuple2<Long, Long>> descriptor =
new ListStateDescriptor<Tuple2<Long, Long>>(
"average",
Types.TUPLE(Types.LONG, Types.LONG));
elementsByKey = getRuntimeContext().getListState(descriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> element,
Collector<Tuple2<Long, Double>> out) throws Exception {
Iterable<Tuple2<Long, Long>> currentState = elementsByKey.get();
if (currentState == null) {
elementsByKey.addAll(Collections.emptyList());
}
elementsByKey.add(element);
List<Tuple2<Long, Long>> allElements =
Lists.newArrayList(elementsByKey.get());
if (allElements.size() >= 3) {
long count = 0;
long sum = 0;
for (Tuple2<Long, Long> ele : allElements) {
count++;
sum += ele.f1;
}
double avg = (double) sum / count;
out.collect(Tuple2.of(element.f0, avg));
elementsByKey.clear();
}
}
}
(2)Main方法
将5.1的Main方法中flatMap的Function替换为CountWindowAverageWithListState
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L),
Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L), Tuple2.of(2L, 2L), Tuple2.of(2L,
5L));
dataStreamSource
.keyBy(0)
.flatMap(new CountWindowAverageWithListState())
.print();
env.execute();
}
}
5.3 KeyedState之MapState
MapState[K, V] 存储一个Key-Value map,其功能与Java的Map 几乎相同。get(key: K) 可以获取某个key下的value,put(key: K, value: V) 可以对某个key设置value,contains(key: K) 判断某个key是否存在,remove(key: K) 删除某个key以及对应的value,entries(): java.lang.Iterable[java.util.Map.Entry[K, V]] 返回MapState 中所有的元素,iterator(): java.util.Iterator[java.util.Map.Entry[K, V]] 返回一个迭代器。需要注意的是,MapState 中的key和Keyed State的key不是同一个key。
需求:当接收到的相同 key 的元素个数等于 3 个,就计算这些元素的 value 的平均值。
(1)继承算子的RichFunction,创建状态并编写业务逻辑。
public class CountWindowAverageWithMapState
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
private MapState<String, Long> mapState;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Long> descriptor =
new MapStateDescriptor<String, Long>(
"average",
String.class, Long.class);
mapState = getRuntimeContext().getMapState(descriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> element,
Collector<Tuple2<Long, Double>> out) throws Exception {
mapState.put(UUID.randomUUID().toString(), element.f1);
List<Long> allElements = Lists.newArrayList(mapState.values());
if (allElements.size() >= 3) {
long count = 0;
long sum = 0;
for (Long ele : allElements) {
count++;
sum += ele;
}
double avg = (double) sum / count;
out.collect(Tuple2.of(element.f0, avg));
mapState.clear();
}
}
}
(2)Main类
将5.1的Main方法中flatMap的Function替换为CountWindowAverageWithMapState
5.4 KeyedState之ReducingState
ReducingState[T] 和AggregatingState[IN, OUT] 与ListState[T] 同属于MergingState[T] 。与ListState[T] 不同的是,ReducingState[T] 只有一个元素,而不是一个列表。它的原理是新元素通过add(value: T) 加入后,与已有的状态元素使用ReduceFunction 合并为一个元素,并更新到状态里。AggregatingState[IN, OUT] 与ReducingState[T] 类似,也只有一个元素,只不过AggregatingState[IN, OUT] 的输入和输出类型可以不一样。ReducingState[T] 和AggregatingState[IN, OUT] 与窗口上进行ReduceFunction 和AggregateFunction 很像,都是将新元素与已有元素做聚合。
需求:求接收到的相同 key 的value的sum。
(1)继承算子的RichFunction,创建状态并编写业务逻辑。
public class SumFunction
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private ReducingState<Long> sumState;
@Override
public void open(Configuration parameters) throws Exception {
ReducingStateDescriptor<Long> descriptor =
new ReducingStateDescriptor<Long>(
"sum",
new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) throws
Exception {
return value1 + value2;
}
}, Long.class);
sumState = getRuntimeContext().getReducingState(descriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> element,
Collector<Tuple2<Long, Long>> out) throws Exception {
sumState.add(element.f1);
out.collect(Tuple2.of(element.f0, sumState.get()));
}
}
(2)Main类
将5.1的Main方法中flatMap的Function替换为SumFunction
输出:
4> (2,4)
3> (1,3)
3> (1,5)
4> (2,7)
3> (1,7)
5.5 KeyedState之AggregatingState
需求:求接收到的相同 key 的value显示出来。
(1)继承算子的RichFunction,创建状态并编写业务逻辑。
public class ContainsValueFunction
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, String>> {
private AggregatingState<Long, String> totalStr;
@Override
public void open(Configuration parameters) throws Exception {
AggregatingStateDescriptor<Long, String, String> descriptor =
new AggregatingStateDescriptor<Long, String, String>(
"totalStr",
new AggregateFunction<Long, String, String>() {
@Override
public String createAccumulator() {
return "Contains:";
}
@Override
public String add(Long value, String accumulator) {
if ("Contains:".equals(accumulator)) {
return accumulator + value;
}
return accumulator + " and " + value;
}
@Override
public String getResult(String accumulator) {
return accumulator;
}
@Override
public String merge(String a, String b) {
return null;
}
}, String.class);
totalStr = getRuntimeContext().getAggregatingState(descriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> element,
Collector<Tuple2<Long, String>> out) throws Exception {
totalStr.add(element.f1);
out.collect(Tuple2.of(element.f0, totalStr.get()));
}
}
(2)Main方法
将5.1的Main方法中flatMap的Function替换为SumFunction。
输出:
3> (1,Contains:3)
4> (2,Contains:4)
4> (2,Contains:4 and 2)
4> (2,Contains:4 and 2 and 5)
3> (1,Contains:3 and 5)
3> (1,Contains:3 and 5 and 7)
5.6 OperatorState之ListState
状态从本质上来说,是Flink算子子任务的一种本地数据,为了保证数据可恢复性,使用Checkpoint机制来将状态数据持久化输出到存储空间上。状态相关的主要逻辑有两项:
- 一、将算子子任务本地内存数据在Checkpoint时snapshot写入存储;
- 二、初始化或重启应用时,以一定的逻辑从存储中读出并变为算子子任务的本地内存数据。
Keyed State对这两项内容做了更完善的封装,开发者可以开箱即用。对于Operator State来说,每个算子子任务管理自己的Operator State,或者说每个算子子任务上的数据流共享同一个状态,可以访问和修改该状态。Flink的算子子任务上的数据在程序重启、横向伸缩等场景下不能保证百分百的一致性。换句话说,重启Flink应用后,某个数据流元素不一定会和上次一样,还能流入该算子子任务上。因此,我们需要根据自己的业务场景来设计snapshot和restore的逻辑。为了实现这两个步骤,Flink提供了最为基础的CheckpointedFunction 接口类。
public interface CheckpointedFunction {
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
}
在Flink的Checkpoint机制下,当一次snapshot触发后,snapshotState 会被调用,将本地状态持久化到存储空间上。这里我们可以先不用关心snapshot是如何被触发的,暂时理解成snapshot是自动触发的,后续文章会介绍Flink的Checkpoint机制。
initializeState 在算子子任务初始化时被调用,初始化包括两种场景:
- 一、整个Flink作业第一次执行,状态数据被初始化为一个默认值;
- 二、Flink作业重启,之前的作业已经将状态输出到存储,通过这个方法将存储上的状态读出并填充到这个本地状态中。
目前Operator State主要有三种,其中ListState和UnionListState在数据结构上都是一种ListState ,还有一种BroadcastState。这里我们主要介绍ListState 这种列表形式的状态。这种状态以一个列表的形式序列化并存储,以适应横向扩展时状态重分布的问题。每个算子子任务有零到多个状态S,组成一个列表ListState[S] 。各个算子子任务将自己状态列表的snapshot到存储,整个状态逻辑上可以理解成是将这些列表连接到一起,组成了一个包含所有状态的大列表。当作业重启或横向扩展时,我们需要将这个包含所有状态的列表重新分布到各个算子子任务上。
ListState和UnionListState的区别在于:
- ListState是将整个状态列表按照round-ribon的模式均匀分布到各个算子子任务上,每个算子子任务得到的是整个列表的子集;
- UnionListState按照广播的模式,将整个列表发送给每个算子子任务。
Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。这里我们来看一个Flink官方提供的Sink案例以了解CheckpointedFunction的工作原理。
需求: 每两条数据打印一次结果 1000
(1)实现SinkFunction和CheckpointedFunction
public class CustomSink
implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {
private List<Tuple2<String, Integer>> bufferElements;
private int threshold;
private ListState<Tuple2<String, Integer>> checkpointState;
public CustomSink(int threshold) {
this.threshold = threshold;
this.bufferElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws
Exception {
bufferElements.add(value);
if (bufferElements.size() == threshold) {
System.out.println("自定义格式:" + bufferElements);
bufferElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception
{
checkpointState.clear();
for (Tuple2<String, Integer> ele : bufferElements) {
checkpointState.add(ele);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws
Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<Tuple2<String, Integer>>(
"bufferd -elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>
() {}));
checkpointState =
context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (Tuple2<String, Integer> ele : checkpointState.get()) {
bufferElements.add(ele);
}
}
}
}
(2)Main方法
public class TestOperatorStateMain {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<String, Integer>> dataStreamSource =
env.fromElements(Tuple2.of("Spark", 3), Tuple2.of("Hadoop", 5),
Tuple2.of("Hadoop", 7),
Tuple2.of("Spark", 4));
dataStreamSource
.addSink(new CustomSink(2)).setParallelism(1);
env.execute("TestStatefulApi");
}
}
输出:
自定义格式:[(Spark,3), (Hadoop,5)]
自定义格式:[(Hadoop,7), (Spark,4)]
上面的代码在输出到Sink之前,先将数据放在本地缓存中,并定期进行snapshot,这实现了批量输出的功能,批量输出能够减少网络等开销。同时,程序能够保证数据一定会输出外部系统,因为即使程序崩溃,状态中存储着还未输出的数据,下次启动后还会将这些未输出数据读取到内存,继续输出到外部系统。
注册和使用Operator State的代码和Keyed State相似,也是先注册一个StateDescriptor ,并指定状态名字和数据类型,然后从FunctionInitializationContext 中获取OperatorStateStore ,进而获取ListState。如果是UnionListState,那么代码改为:context.getOperatorStateStore.getUnionListState 。
状态的初始化逻辑中,我们用context.isRestored 来判断是否为作业重启,这样可以从之前的Checkpoint中恢复并写到本地缓存中。
5.7 OperatorState之BroadCastState
广播状态是固定维护在堆内存中的,不会写入文件系统或者RocksDB。
下面我们通过BroadCastState控制程序的打印输出为例进行介绍。
(1)定义普通数据流,消费数据
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
(2)定义广播流,用于广播规则,从而控制程序打印输出
DataStreamSource<String> broadStreamSource = env.socketTextStream("localhost", 8888);
(3)解析广播流中的数据,解析为二元组
DataStream<Tuple2<String, String>> broadStream =
broadStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
String[] strings = s.split(" ");
return Tuple2.of(strings[0], (strings[1]));
}
});
(4)定义需要广播的状态类型,只支持
MapStateDescriptor<String, String> descriptor = new
MapStateDescriptor<String, String>(
"ControlStream",
String.class,
String.class
);
(5)用解析后的广播流将状态广播出去,从而生成BroadcastStream
BroadcastStream<Tuple2<String, String>> broadcastStream = broadStream.broadcast(descriptor);
(6)通过connect连接两个流,用process分别处理两个流中的数据。连接流时分为两种情况:
noKeyedStream.connect(BroadcastStream).process(new BroadcastProcessFunction<>(…)) : 非 KeyedStream 连接 BroadcastStream 的,只能使用 BroadcastProcessFunction 函数处理连接逻辑KeyedStream.connect(BroadcastStream).process(new KeyedBroadcastProcessFunction<>(…)) :KeyedStream 连接 BroadcastStream 的,只能使用 KeyedBroadcastProcessFunction 函数处理连接逻辑
KeyedBroadcastProcessFunction 比 BroadcastProcessFunction 多了计时器服务和获取当前 key 接口,当然,这两个功能不一定能用到。
我们这里使用的是 BroadcastProcessFunction<IN1, IN2, OUT> ,这三个泛型翻译分别代表:
IN1:数据流(即非广播流)的元素类型
IN2:广播流的元素类型
OUT:两个流连接完成后,输出流的元素类型。
BroadcastProcessFunction中定义了两个函数用于处理具体的连接逻辑和业务逻辑。因此主要需要实现以下两个函数:
public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
这里处理广播流的数据,将广播流数据保存到 BroadcastState 中。value 是广播流中的一个元素;ctx 是上下文,提供 BroadcastState 和修改方法;out 是输出流收集器。
public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
这个函数处理数据流的数据,这里之只能获取到 ReadOnlyBroadcastState,因为 Flink 不允许在这里修改 BroadcastState 的状态。value 是数据流中的一个元素;ctx 是上下文,可以提供上下文环境和只读的 BroadcastState;out 是输出流收集器。
注意:KeyedBroadcastProcessFunction中的ReadOnlyContext多了计时器服务和获取当前 key 接口
下面是完整的代码。
需求:通过BroadCastState控制程序的打印输出
public class TestBroadcastState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource =
env.socketTextStream("localhost", 9999);
DataStreamSource<String> broadStreamSource =
env.socketTextStream("localhost", 8888);
DataStream<Tuple2<String, String>> broadStream =
broadStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
String[] strings = s.split(" ");
return Tuple2.of(strings[0], (strings[1]));
}
});
MapStateDescriptor<String, String> descriptor = new
MapStateDescriptor<String, String>(
"ControlStream",
String.class,
String.class
);
BroadcastStream<Tuple2<String, String>> broadcastStream =
broadStream.broadcast(descriptor);
dataStreamSource
.connect(broadcastStream)
.process(new KeyWordsCheckProcessor())
.print();
env.execute();
}
private static class KeyWordsCheckProcessor
extends BroadcastProcessFunction<String, Tuple2<String, String>,
String> {
MapStateDescriptor<String, String> descriptor =
new MapStateDescriptor<String, String>(
"ControlStream",
String.class,
String.class
);
@Override
public void processBroadcastElement(Tuple2<String, String> value,
Context ctx, Collector<String> out)
throws Exception {
ctx.getBroadcastState(descriptor).put(value.f0, value.f1);
System.out.println(Thread.currentThread().getName() + " 接收到控制信息 :" + value);
}
@Override
public void processElement(String value,
ReadOnlyContext ctx, Collector<String> out)
throws Exception {
String keywords = ctx.getBroadcastState(descriptor).get("key");
if (value.contains(keywords)) {
out.collect(value);
}
}
}
}
六、状态后端State backend
Flink的状态是由算子的子任务来创建和管理的,每传入一条数据,子任务都会读取和更新状态,子任务的状态除了会暂存在节点内,还需要进行持久化存储(快照),也就是所谓的Checkpoint,当子任务出现故障\或重启任务时,可以从持久化的Checkpoint中恢复。
也就是说,状态有两部分:一部分是本地的状态,检查点(checkpoint)中的状态。状态是存储在状态后端(State backend)的,它专门负责状态的存储、访问以及维护,主要做两件事:
- Local State Management(本地状态管理)
- Remote State Checkpointing(远程状态备份)
Flink提供了三种类型的状态后端,分别是基于内存的状态后端MemoryStateBackend( 默认的state的类型就是这种)、基于文件系统的状态后端FsStateBackend以及基于RockDB作为存储介质的RocksDB StateBackend。这三种类型的StateBackend都能够有效地存储Flink流式计算过程中产生的状态数据,在默认情况下Flink使用的是MemoryStateBackend,区别见下表。下面分别对每种状态后端的特点进行说明。
6.1 MemoryStateBackend
MemoryStateBackend,运行时所需的 State 数据全部保存在 TaskManager JVM堆上内存中,执行检查点的时候,会把 State 的快照数据保存到 JobManager 进程的内存中。
MemoryStateBackend 可以使用异步的方式进行快照,(也可以同步),推荐异步,避免阻塞算子处理数据。
默认情况下,每一个状态最大为 5 MB。可以通过 MemoryStateBackend 的构造函数增加最大大小。状态的总大小不能超过TaskManager的内存。
- 特点:快速、低延迟
- 缺点:状态在内存中可能会丢失,只能保存数据量小的状态
- 用于:开发测试
6.2 FSStateBackend
FSStateBackend,运行时所需的 State 数据全部保存在 TaskManager 的内存中(状态的总大小不能超过TaskManager的内存,默认5M), 执行检查点的时候,会把 State 的快照数据保存到配置的文件系统中。
可以是分布式或者本地文件系统,路径如:
- HDFS 路径:
hdfs://namenode:40010/flink/checkpoints - 本地路径:
file://data/flink/checkpoints
默认情况下,FsStateBackend 会配置提供异步快照,以避免在写状态 checkpoint 时阻塞数据流的处理。该特性可以通过在实例化 FsStateBackend 时将布尔标志设置为 false 来禁用,例如:
new FsStateBackend(path, false);
- 缺点:状态大小受TaskManager内存限制(默认支持5M)
- 优点:状态访问速度很快;状态信息不会丢失
- 用于:因为状态信息不会丢失,所以生成环境下可用
6.3 RocksDBStateBackend
RocksDBStateBackend,运行时所需的 State 数据保存在RocksDB 数据库(key-value 的数据存储服务),不会受限于 TaskManager 的内存大小,在执行检查点的时候,再将整个 RocksDB 中保存的 State 数据全量或者增量持久化到配置的文件系统中。RocksDB 克服了 State 受内存限制的问题,同时又能够持久化到远端文件系统中,比较适合在生产中使用。
RocksDBStateBackend 的配置同样需要文件系统的 URL(类型,地址,路径)等来配置,如hdfs://namenode:40010/flink/checkpoints 。
- 缺点:状态访问速度有所下降
- 优点:可以存储超大量的状态信息;状态信息不会丢失
- 用于:生产,可以存储超大量的状态信息(受限于磁盘可用空间的大小)
RocksDBStateBackend 是目前唯一支持有状态流处理应用程序增量检查点的状态后端。
6.4 StateBackend配置方式
(1)单任务调整
修改当前任务代码
env.setStateBackend(new
FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】
(2)全局调整(不建议)
修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend),
filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
(3)其它高级配置
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), Time.minutes(1)));
6.5 StateBackend持久化策略
Flink 的状态最终都要持久化到第三方存储中,确保集群故障或者作业挂掉后能够恢复。RocksDBStateBackend 持久化策略有两种:
- 全量持久化策略:
RocksFullSnapshotStrategy - 增量持久化策略:
RocksIncementalSnapshotStrategy
全量持久化策略每次将全量的 State 写入到状态存储中(HDFS),上述三种状态后端都支持这种策略。快照保存策略类体系在执行持久化策略的时候,可以使用异步机制,每个算子启动 1 个独立的线程,将自身的状态写入分布式存储可靠存储中。
增量持久化策略就是每次持久化增量的 State,只有 RocksDBStateBackend 支持增量持久化。Flink 增量式的检查点以 RocksDB 为基础的,具体就不展开介绍了。
参考资料(文章第一至四章主要整理自参考资料的1~5)
- https://mp.weixin.qq.com/s/JLl-LMjcnVrIyHCCq7Yv7A
- https://mp.weixin.qq.com/s/twA5HiVJbTGwVpn-uiVx2g
- https://zhuanlan.zhihu.com/p/104171679
- https://blog.csdn.net/nazeniwaresakini/article/details/104220138
- https://mp.weixin.qq.com/s/ggHmSc86mN3I7r6snjqxWQ
- https://mp.weixin.qq.com/s/ZVLIuekZQt7hQ8XND6NUSQ
- https://blog.csdn.net/u013411339/article/details/112934975
本文仅供学术交流使用,加上自己的思考、实践和摘录,整理出本文,若有部分章节侵权,请联系博主删除。
|