【README】
- 本文记录了 窗口算子操作;
- 本文使用的flink为 1.14.4 版本;
- 本文部分内容总结自 flink 官方文档:
窗口 | Apache Flink窗口 # 窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。下面展示了 Flink 窗口在 keyed streams 和 non-keyed streams 上使用的基本结构。 我们可以看到,这两者唯一的区别仅在于:keyed streams 要调用 keyBy(...)后再调用 window(...) , 而 non-keyed streams 只用直接调用 windowAll(...)。留意这个区别,它能帮我们更好地理解后面的内容。Keyed Windowsstream .keyBy(...) <- 仅 keyed 窗口需要 .window(...) <- 必填项:"assigner" [.trigger(...)] <- 可选项:"trigger" (省略则使用默认 trigger) [.evictor(...)] <- 可选项:"evictor" (省略则不使用 evictor) [.allowedLateness(...)] <- 可选项:"lateness" (省略则为 0) [.sideOutputLateData(...)] <- 可选项:"output tag" (省略则不对迟到数据使用 side output) .reduce/aggregate/apply() <- 必填项:"function" [.getSideOutput(...)] <- 可选项:"output tag" Non-Keyed Windowshttps://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/operators/windows/
【1】flink window api概念
1)定义:窗口(Window)是处理无界流的关键所在。窗口可以将无界数据流装入大小有限的“桶”中(有界流),再对每个“桶”加以处理。即以窗口为单位把无界流切分为有界流。
【1.1】窗口分类
1)时间窗口:
2)计数窗口:
【1.2】几种常用窗口类型
【1.2.1】 滚动时间窗口
1)定义: 滚动窗口的时间范围是固定的,且各自范围之间不重叠。
2)例子:比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)。
基于时间的窗口用 start timestamp(包含)和 end timestamp(不包含)描述窗口的大小。(窗口区间为左闭右开)1个元素只能被分发给1个窗口;
3)api调用:
// 滚动窗口
sensorStream.keyBy(SensorReading::getId)
//.window(TumblingEventTimeWindows.of(Time.seconds(5))) // 事件时间滚动窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 处理时间滚动窗口
;
?【1.2.2】滑动时间窗口
1)定义:两个参数,窗口大小与滑动距离;
滑动窗口分配器WindowAssigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口还有滑动距离(window slide)参数来控制生成新窗口的频率。
- 因此,如果 滑动距离小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口(与滚动时间窗口不同)。
2)滚动时间窗口 VS 滑动时间窗口
- 滚动时间窗口:窗口大小与滑动距离相同的滑动时间窗口;(滚动时间窗口是滑动时间窗口的特例)
- 滑动时间窗口:窗口大小与滑动距离或相同或不同;
3)例子: 比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。
?4)api代码:
sensorStream.keyBy(SensorReading::getId)
//.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))); // 事件时间滑动窗口
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))); // 处理时间滑动窗口
?【1.2.3】会话窗口
1)定义:会话窗口分配器会把数据按活跃的会话分组。
- 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。
2)例子:
?3)api代码:
// 会话窗口
sensorStream.keyBy(SensorReading::getId)
//.window(EventTimeSessionWindows.withGap(Time.seconds(5))); // 事件时间会话窗口
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))); // 处理时间会话窗口
【1.2.4】全局窗口
1)定义:全局窗口分配器将拥有相同 key 的所有数据分发到一个全局窗口。
- 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。
?2)api代码:
// 全局窗口
sensorStream.keyBy(SensorReading::getId)
.window(GlobalWindows.create())
【2】窗口分配器:4种通用窗口分配器;
【3】窗口函数
1)定义:计算每个窗口中的数据的操作;
2)窗口函数有三种:
- ReduceFunction-约简函数:指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同,如增量聚合;(仅保存状态,如sum,仅保留结果和,不保存所有数据)
- AggregateFunction -聚合函数 (仅保存状态,如sum,仅保留结果和,不保存所有数据)
- ProcessWindowFunction- 处理窗口函数(需要得到当前窗口所有数据)
3)效率
- 前两者执行起来更高效:因为 Flink 可以在每条数据到达窗口后 进行增量聚合(incrementally aggregate)。(不需要存储所有数据)
- ProcessWindowFunction :而ProcessWindowFunction 会得到能够遍历当前窗口内所有数据的 Iterable(效率低),以及关于这个窗口的 meta-information。
使用 ProcessWindowFunction 的窗口转换操作没有其他两种函数高效,因为 Flink 在窗口触发前必须缓存里面的所有数据。 ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 合并来提高效率。
【补充】flink 水位线,refer2
Flink 水位线(Watermark)_Alienware^的博客-CSDN博客
【3.1】滚动时间窗口代码示例
1)滚动时间窗口代码:
/**
* @Description 时间窗口算子
* @author xiao tang
* @version 1.0.0
* @createTime 2022年04月21日
*/
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setAutoWatermarkInterval(1*4000L); // 设置水位线生成间隔
// 从socket读取数据
DataStream<String> fileStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensorTimeWindow.txt");
// 转换为 SensorReader pojo类型
DataStream<SensorReadingTimeWindow> sensorStream = fileStream.map(x -> {
String[] arr = x.split(",");
return new SensorReadingTimeWindow(arr[0], arr[1], arr[2], new BigDecimal(arr[3]));
});
// 滚动窗口,进行增来聚合来计算温度均值(采用 Tuple2 )
DataStream<Tuple2<BigDecimal, String>> windowAggStream = sensorStream
.assignTimestampsAndWatermarks( // 设置水位线
WatermarkStrategy.<SensorReadingTimeWindow>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<SensorReadingTimeWindow>(){
@Override
public long extractTimestamp(SensorReadingTimeWindow record, long l) {
return record.getTimestamp().getTime(); // 直接拿当前最大的时间戳作为水位线
}
}))
.keyBy(SensorReadingTimeWindow::getType)// 按照 type 分组
.window(TumblingEventTimeWindows.of(Time.seconds(3))) // 事件时间滚动窗口
.aggregate(new AggregateFunction<SensorReadingTimeWindow, Tuple3<BigDecimal, Integer, List<String>>, Tuple2<BigDecimal, String>>() {
@Override
public Tuple3<BigDecimal, Integer, List<String>> createAccumulator() {
return new Tuple3<>(BigDecimal.ZERO, 0, new ArrayList<>()); // 初始值
}
@Override
public Tuple3<BigDecimal, Integer, List<String>> add(SensorReadingTimeWindow sensorReading, Tuple3<BigDecimal, Integer, List<String>> accumulator) {
accumulator.f2.add(sensorReading.getId());
return new Tuple3<>(sensorReading.getTemperature().add(accumulator.f0).setScale(2,BigDecimal.ROUND_HALF_UP), accumulator.f1+1, accumulator.f2);
}
@Override
public Tuple2<BigDecimal, String> getResult(Tuple3<BigDecimal, Integer, List<String>> accumulator) {
return new Tuple2<>(accumulator.f0.divide(new BigDecimal(accumulator.f1)).setScale(2,BigDecimal.ROUND_HALF_UP), accumulator.f2.toString());
}
@Override
public Tuple3<BigDecimal, Integer, List<String>> merge(Tuple3<BigDecimal, Integer, List<String>> a, Tuple3<BigDecimal, Integer, List<String>> b) {
a.f2.addAll(b.f2);
return new Tuple3<BigDecimal, Integer, List<String>>(a.f0.add(b.f0), a.f1 + b.f1, a.f2);
}
})
;
// 打印
windowAggStream.print("TumblingEventTimeWindowsStream");
// 执行
env.execute("TumblingEventTimeWindowsJob");
}
}
文本内容:
1,sensor1,2022-04-17 22:07:01,36.1
2,sensor2,2022-04-17 22:07:02,36.2
3,sensor1,2022-04-17 22:07:03,36.3
4,sensor2,2022-04-17 22:07:04,36.4
5,sensor1,2022-04-17 22:07:05,36.5
6,sensor1,2022-04-17 22:07:06,36.6
7,sensor1,2022-04-17 22:07:07,36.7
8,sensor1,2022-04-17 22:07:08,36.8
打印结果:
TumblingEventTimeWindowsStream> (36.10,[1])
TumblingEventTimeWindowsStream> (36.20,[2])
TumblingEventTimeWindowsStream> (36.40,[4])
TumblingEventTimeWindowsStream> (36.40,[3, 5])
TumblingEventTimeWindowsStream> (36.70,[6, 7, 8])
?2)结果分析:
2.1)参数: 窗口大小为3,滑动步长为3,即每3秒都会生成一个大小为3个窗口;且时间窗口的时间范围是左闭右开;如第3秒生成窗口的时间范围是 [0, 3)大于等于0秒小于3秒;
2.2)以 sensor1为例,其数据id为 1,3,5, 6, 7, 8 ; 对应的时间为 第1、3、 5、 6、 7、 8秒,生成的窗口列表如下:
- 窗口1:时间范围在 [ 22:07:01, 22:07:03 );数据列表为 36.1,所以均值为 36.1 ;
- 窗口1:时间范围在 [?22:07:03,?22:07:06 );数据列表为 36.3,36.5,所以均值为 36.4
- 窗口3:时间范围在 [?22:07:06,?22:07:09 );数据列表为 36.6 36.7 36.8 ,所以均值为 36.7;
【注意】
- 通过 事件时间来进行窗口计算,必须定义事件时间是什么,以及水位线;上文代码的事件时间是传感器时间,水位先是递增的水位线,每4s生成一条水位记录;
DataStream<Tuple2<BigDecimal, String>> windowAggStream = sensorStream
.assignTimestampsAndWatermarks( // 设置水位线
WatermarkStrategy.<SensorReadingTimeWindow>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<SensorReadingTimeWindow>(){
@Override
public long extractTimestamp(SensorReadingTimeWindow record, long l) {
return record.getTimestamp().getTime(); // 直接拿当前最大的时间戳作为水位线
}
}))....
env.getConfig().setAutoWatermarkInterval(1*4000L); // 设置水位线生成间隔
【补充】 水位线作用
- 水位线是用来解决数据乱序问题,本身是一个特殊的时间戳;由flink加入到数据流;
- 水位线单调递增的,Flink认为事件时间早于水位线的数据都来了;
- 当前的watermark >= 窗口的最大时间戳时,窗口关闭;
- 需要设置水位线生成间隔时间,默认200ms;
?【3.2】滑动计数窗口示例
1)滑动计数窗口参数
- 窗口大小:5;
- 滑动步长:2,即生成新窗口频率,每2条数据就会生成一个新窗口;
2)代码
/**
* @Description 滑动计数窗口算子
* @author xiao tang
* @version 1.0.0
* @createTime 2022年04月17日
*/
public class WindowTest2_CountWindow {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从socket读取数据
DataStream<String> fileStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensorTimeWindow.txt");
// 转换为 SensorReader pojo类型
DataStream<SensorReadingWindow> sensorStream = fileStream.map(x -> {
String[] arr = x.split(",");
return new SensorReadingWindow(arr[0], arr[1], new BigDecimal(arr[2]));
});
// 滑动计数窗口,进行增量聚合来计算温度均值(采用 Tuple2 )
DataStream<Tuple2<String, BigDecimal>> windowAggStream = sensorStream
.keyBy(SensorReadingWindow::getId)// 按照id 分组
.countWindow(5, 2) // 滑动计数窗口
.aggregate(new AggregateFunction<SensorReadingWindow, Tuple3<String, BigDecimal, Integer>, Tuple2<String, BigDecimal>>() {
@Override
public Tuple3<String, BigDecimal, Integer> createAccumulator() {
return new Tuple3<>("", BigDecimal.ZERO, 0); // 初始值
}
@Override
public Tuple3<String, BigDecimal, Integer> add(SensorReadingWindow sensorReading, Tuple3<String, BigDecimal, Integer> accumulator) {
return new Tuple3<>(sensorReading.getId(), sensorReading.getTemperature().add(accumulator.f1).setScale(2,BigDecimal.ROUND_HALF_UP), accumulator.f2+1);
}
@Override
public Tuple2<String, BigDecimal> getResult(Tuple3<String, BigDecimal, Integer> accumulator) {
return new Tuple2<>(accumulator.f0, accumulator.f1.divide(new BigDecimal(accumulator.f2)).setScale(2,BigDecimal.ROUND_HALF_UP));
}
@Override
public Tuple3<String, BigDecimal, Integer> merge(Tuple3<String, BigDecimal, Integer> a, Tuple3<String, BigDecimal, Integer> b) {
return new Tuple3<String, BigDecimal, Integer>(a.f0, a.f1.add(b.f1), a.f2 + b.f2);
}
})
;
// 打印
windowAggStream.print("slideCountWindowAggStream");
// 执行
env.execute("slideCountWindowAggStreamJob");
}
}
sensor文本:
sensor1,2022-04-17 22:07:01,36.1
sensor2,2022-04-17 22:07:02,36.2
sensor1,2022-04-17 22:07:03,36.3
sensor2,2022-04-17 22:07:04,36.4
sensor1,2022-04-17 22:07:05,36.5
sensor1,2022-04-17 22:07:06,36.6
sensor1,2022-04-17 22:07:07,36.7
sensor1,2022-04-17 22:07:07,36.7
打印结果:
slideCountWindowAggStream> (sensor1,36.20)
slideCountWindowAggStream> (sensor2,36.30)
slideCountWindowAggStream> (sensor1,36.38)
slideCountWindowAggStream> (sensor1,36.56)
3)分析: 根据keyby分组后,
sensor1 的温度是 36.1、 36.3、 36.5、 36.6、 36.7、36.7 ;
sensor2 的温度是 36.2、36.4 ;
所以 sensor2 的温度均值是 36.3 ,打印结果没有问题;
对于 sensor1 的温度均值 (每2个数据生成一个新窗口):
- 第1个窗口包含的数据是 【36.1? 36.3 】,均值为 36.2;
- 第2个窗口包含的数据是 【36.1 ,36.3,36.5, 36.6】,均值为 36.375,四舍五入为 36.56;
- 第3个窗口包含的数据是 【36.3,36.5, 36.6 , 36.7, 36.7?】,均值为 36.56;
【4】window其他可选API
?
refer2 https://blog.csdn.net/PacosonSWJTU/article/details/124312346
|