Flink窗口基础(一)
1、窗口概念:
流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。
2、窗口的分类:
时间驱动:基于时间的窗口
**时间驱动→滚动窗口(Tumbling Windows):**滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙。
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。如图
code:
env
.socketTextStream("CentOS", 9999)
.flatMap(new
FlatMapFunction<String,
Tuple2<String, Long>>**()
{
@Override
public void flatMap(String value, Collector<Tuple2<String,
Long>> out) throws Exception {
Arrays.stream(value.split("\\W+")).forEach(word -> out.collect(Tuple2.of(word, 1L)));
}
})
.keyBy(t -> t.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(8)))
.sum(1)
.print();
**时间驱动→滑动窗口(Sliding Windows):**与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动 窗口启动的频率. 所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中。
code:
env
.socketTextStream("CentOS", 9999)
.flatMap(new FlatMapFunction<String,
Tuple2<String, Long>>**()
{
@Override
public void flatMap(String value, Collector<Tuple2<String,
Long>> out) throws Exception {
Arrays.stream(value.split("\\W+")).forEach(word -> out.collect(Tuple2.of(word, 1L)));
}
})
.keyBy(t -> t.f0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.sum(1)
.print();
env.execute();
**时间驱动→会话窗口(Session Windows):**会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间。如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)。我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口
code
静态gap
.window**(**ProcessingTimeSessionWindows.*withGap***(**Time.*seconds***(**10**)))
动态gap
.window**(**ProcessingTimeSessionWindows.*withDynamicGap***(new** SessionWindowTimeGapExtractor<Tuple2<String, Long>>**() {** @Override
**public long** extract**(**Tuple2<String, Long> element**) {** *//* *返回 gap**值,* *单**位毫秒* **return** element.**f0**.length**()** * 1000;
**} }))
创建原理:
? 因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动,滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction,AggregateFunction,ProcessWindowFunction
时间驱动→全局窗口(Global Windows):全局窗口分配器会分配相同key的所有元素进入同一个 Global window. 这种窗口机制只有指定自定义的触发器时才有用. 否则, 不会做任务计算, 因为这种窗口没有能够处理聚集在一起元素的结束点.
code
.window(GlobalWindows.create());
数据驱动:基于元素个数的窗口
数据驱动→滚动窗口:
默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。那个窗口先达到3个元素, 哪个窗口就关闭. 不影响其他的窗口.
code
.countWindow(3)
数据驱动→滑动窗口:
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。
code
.countWindow**(**3, 2**)**
|