官方文档: https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/windows/
1、什么是Window
- Flink 底层引擎是一个流式引擎,认为 Batch 是 Streaming 的一个特例,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。
- Window窗口就在一个无界流中设置起始位置和终止位置,让无界流变成有界流,并且在有界流中进行数据处理
2、Window窗口分类
Window窗口在无界流中设置起始位置和终止位置的方式可以有两种:
根据窗口的类型划分:
根据数据流类型划分:
- Keyed Window:基于分组后的数据流之上做窗口操作
- Global Window:基于未分组的数据流之上做窗口操作
Keyed Window
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger"
[.evictor(...)] <- optional: "evictor"
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag"
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Global Window
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
2.1 Window Assigners(窗口适配器)
WindowAssigner是负责将每一个到来的元素分配给一个或者多个窗口(window), tumbling windows, sliding windows, session windows and global windows
- Tumbling Windows: 滚动窗口,窗口范围固定,窗口之间没有数据重叠,如图所示

input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
- Sliding Window: 滑动窗口,窗口内的数据有重叠,如下图所示

input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
- Session Windows: 会话窗口
在规定的时间内如果没有数据活跃接入,则认为窗口结束,然后触发窗口计算结果,如下图所示, 
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
}))
.<windowed transformation>(<window function>);
- Global Windows: 全局窗口 存储所有数据,用户需自定义触发器 trigger
 - Count Window: 数量窗口 这里不具体讲解了,它是基于 GlobalWindows+ trigger+ evictor来实现的
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
2.2 Window Functions 窗口函数
窗口函数定义了针对窗口内元素的计算逻辑,窗口函数大概分为两类:
- 增量聚合函数,聚合原理:窗口内保存一个中间聚合结果,随着新元素的加入,不断对该值进行更新
这类函数通常非常节省空间 ReduceFunction、AggregateFunction属于增量聚合函数 - 全量聚合函数,聚合原理:收集窗口内的所有元素,并且在执行的时候对他们进行遍历,这种聚合
函数通常需要占用更多的空间(收集一段时间的数据并且保存),但是它可以支持更复杂的逻辑 ProcessWindowFunction、WindowFunction属于全量窗口函数
注意:这两类函数可以组合搭配使用
- ReduceFunction
注意: 通过两个输入的参数进行合并输出一个同类型的参数的过程
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
2.3 Triggers 触发器
触发器定义了窗口何时准备好被窗口处理。每个窗口分配器默认都有一个触发器,例如: EventTimeWindows窗口的触发器是 EventTimeTrigger作为默认触发器
注意: GlobalWindow默认的触发器时NeverTrigger,该触发器从不出发,所以在使用GlobalWindow时必须自定义触发器。 
2.4 Evictors (驱逐器)
驱逐器(evitor)可以在触发器触发之前或者之后,或者窗口函数被应用之前清理窗口中的元素。 
2.5 Allowed Lateness (允许延迟)
Flink允许为窗口操作指定一个最大允许时延,允许时延指定了元素可以晚到多长时间,默认情况下是0。例如: 当处理event-time的window时,可能会出现元素晚到的情况(即Flink用来跟踪event-time进度的watermark已经过了元素所属窗口的最后时间,属于当前窗口的数据才到达)。
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
|