【README】
0)本文编写了多个flink水位线watermark的代码例子,加深对watermark的理解 ;
1)时间分类
- Event Time: 事件创建的时间(事件发生时间);
- Ingestion Time:数据进入flink的实际;
- Processing Time:执行算子的本地机器时间 ;
我们主要讨论的是 事件时间;
2)flink窗口分为 滚动窗口,滑动窗口, 本文使用了 滚动窗口;
- 滚动窗口: 只有1个参数,窗口长度与窗口步长(窗口创建频率)相等;
- 滑动窗口:有2个参数,即窗口长度,窗口步长;可以手动设置,可以相等也可以不等;
3)本文结合代码示例讲了 水位线, 窗口,窗口属性 lateness 延迟属性, 窗口流的 siteOutputLateData 侧输出流(旁路输出),及其它们的作用;
【1】水位线
1)定义(本文自定义总结,非官方):水位线 watermark,指的是 flink底层在数据流中添加的带有时间戳的数据,当这些水位线数据到达算子时(如窗口算子),算子会认为 小于水位线的业务数据都来了;(数据可以理解为 一条日志,或温度传感器采集的温度信息)
2)作用: 水位线可以用来处理无序数据流;(下文代码例子会给出);
3)如何产生水位线?
- 指定水位线的时间戳如何获取? 可以指定 水位线时间戳从业务数据(抽象为javabean)的某个属性获取;
- 指定水位线可以延迟多长时间,即允许无序数据最多可以晚来多长时间;(超过这个时间会被丢弃)
【1.1】事件迟到被丢弃
1)建立一个 10s 滚动窗口算子(每10s新开一个长度为10s的窗口),水位线取温度bean的时间戳,且延迟 0 秒,如下:
其中 窗口用于收集id号码,即属于同一个窗口的元素的id会被收集到一起;
public class WindowTest3_EventTimeWatermarkWindow3 {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从socket读取数据,数据格式参见 sensorTimeWatermarkWindow.txt
// DataStream<String> textStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensorTimeWatermarkWindow.txt");
// nc -lk 7777
DataStream<String> textStream = env.socketTextStream("192.168.163.201", 7778);
// 转换为 SensorReader pojo类型
DataStream<SensorReadingTimeWatermarkWindow> sensorStream = textStream.map(x -> {
String[] arr = x.split(",");
return new SensorReadingTimeWatermarkWindow(arr[0], arr[1], arr[2], new BigDecimal(arr[3]));
});
// 设置抽取时间戳,水位线延迟2秒(如当前时间戳为 20:00:10 ,水位线的时间是 20:00:08),窗口是看水位线时间,而不是时间时间
SingleOutputStreamOperator<SensorReadingTimeWatermarkWindow> streamWithWatermark = sensorStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<SensorReadingTimeWatermarkWindow>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp().getTime())
);
// 开窗聚合
SingleOutputStreamOperator<String> aggForWindowStream =
streamWithWatermark.keyBy(SensorReadingTimeWatermarkWindow::getType)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new AggregateFunction<SensorReadingTimeWatermarkWindow, String, String>() {
@Override
public String createAccumulator() {
return "";
}
@Override
public String add(SensorReadingTimeWatermarkWindow sensorReadingTimeWatermarkWindow, String s) {
return s + ", " + sensorReadingTimeWatermarkWindow.getId();
}
@Override
public String getResult(String s) {
return s;
}
@Override
public String merge(String s, String acc1) {
return s + ", " + acc1;
}
});
// 打印
aggForWindowStream.print("aggForWindowStream");
// 执行
env.execute("aggForWindowStream");
}
}
上述代码中的水位线的延迟时间为0s,即水位线时间戳等于事件时间戳;?
元素抽象为 传感器信息bean,如下:
public class SensorReadingTimeWatermarkWindow {
private String id;
private String type;
private Timestamp timestamp;
private BigDecimal temperature;
public SensorReadingTimeWatermarkWindow() {
}
public SensorReadingTimeWatermarkWindow(String id, String type, String timeStr, BigDecimal temperature) {
this.id = id;
this.type = type;
this.temperature = temperature;
this.parseTimestamp(timeStr);
}
private void parseTimestamp(String timeStr) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
this.timestamp = new Timestamp(simpleDateFormat.parse(timeStr).getTime());
} catch (ParseException e) {
this.timestamp = new Timestamp(System.currentTimeMillis());
}
}
}
接收的是 nc 客户端的socket文本流,窗口算子计算结果如下:
详情如下:
1,sensor1,2022-04-17 22:07:01,36.1
7,sensor1,2022-04-17 22:07:02,36.7
8,sensor1,2022-04-17 22:07:04,36.8
11,sensor1,2022-04-17 22:07:07,36.9
12,sensor1,2022-04-17 22:07:11,36.9 -> 1, 7, 8, 11
13,sensor1,2022-04-17 22:07:09,36.9
15,sensor1,2022-04-17 22:07:16,36.9
16,sensor1,2022-04-17 22:07:23,36.9 -> 12,15
【结果分析】
- 发现1:当事件12(id=12)出现时,因水位线延迟时间为0,所以水位线时间戳等于事件12的时间戳=22:07:11,这个时间戳大于窗口结束时间(22:07:10),第1个窗口被关闭并输出计算结果为【1,7,8,11】;
- 发现2: 当事件16(id=16)出现时,因水位线延迟时间为0,所以水位线时间戳等于事件16的时间戳=22:07:23,这个时间戳大于窗口结束时间(22:07:20),第2个窗口被关闭并输出计算结果为【12,15】;
- 发现3:事件13没有更新水位线,因为水位线必须单调递增(事件12发生时的水位线是22:07:11,事件13的时间戳是22:07:09,所以事件13发生时不会更新水位线);
问题来了: 事件13去哪里了? 被 flink 丢弃了,因为事件13迟到了;
- 如何理解事件迟到了: 因为事件12 的时间戳为 22:07:11,又水位线延迟0s,所以水位线的 时间戳也是 22:07:11,这大于窗口结束时间,所以窗口关闭并计算结果,窗口关闭后,事件13才来,因此被丢弃。
【补充】窗口范围是左闭右开;如上图,第1个窗口的范围是 [0,10),第2个窗口是 [10,20)
【1.2】 事件迟到但被正常处理
1)修改上述水位线代码, 设置延迟时间为5s,重新录入上述数据,结果如下:
// 设置抽取时间戳,水位线延迟2秒(如当前时间戳为 20:00:10 ,水位线的时间是 20:00:08),窗口是看水位线时间,而不是事件时间
SingleOutputStreamOperator<SensorReadingTimeWatermarkWindow> streamWithWatermark =
sensorStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<SensorReadingTimeWatermarkWindow>forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 水位线延迟时间修改为 5s
.withTimestampAssigner((event, timestamp) -> event.getTimestamp().getTime())
);
?
1,sensor1,2022-04-17 22:07:01,36.1
7,sensor1,2022-04-17 22:07:02,36.7
8,sensor1,2022-04-17 22:07:04,36.8
11,sensor1,2022-04-17 22:07:07,36.9
12,sensor1,2022-04-17 22:07:11,36.9
13,sensor1,2022-04-17 22:07:09,36.9
15,sensor1,2022-04-17 22:07:16,36.9 -> 1, 7, 8, 11, 13
16,sensor1,2022-04-17 22:07:23,36.9
21,sensor1,2022-04-17 22:07:20,36.9
22,sensor1,2022-04-17 22:07:25,36.9 -> 12, 15
【结果分析】
- 发现1:事件13,事件21 不会更新水位线时间戳,原因上文已经解释过了;
- 发现2:当事件15(id=15)出现时,因水位线延迟时间为5s,所以水位线等于事件15的时间戳减去5s = 22:07:11,这个时间戳大于窗口结束时间(22:07:10),第1个窗口被关闭并输出计算结果为【1,7,8,11,13】;
- 发现3:事件13没有被丢弃,因为水位线延迟了5s,窗口在事件15发生时才关闭,所以可以探测到事件13,这也阐述了为啥 flink水位线可以处理无序数据的原理,flink的设计者的水位线idea真的很棒(对比来看,【1.1】中的例子事件13被丢弃);
- 发现4:当事件22(id=22)出现时,因水位线延迟时间为5s,所以水位线等于事件22的时间戳减去5s = 22:07:20,这个时间戳大于等于窗口结束时间(22:07:20),第2个窗口被关闭并输出计算结果为【12,15】;(大于等于窗口结束时间,窗口就被关闭,因为窗口范围是左开右闭)
【2】窗口的 lateness 延迟属性
此外,窗口还有 lateness 属性,表示延迟多长时间关闭窗口;
如下面代码每10s 创建一个长度为12s的窗口; (如果没有 lateness参数或其为0的话, 就是 每10s 创建一个长度为10s的窗口)
代码修改如下:
SingleOutputStreamOperator<String> aggForWindowStream =
streamWithWatermark.keyBy(SensorReadingTimeWatermarkWindow::getType)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(2)) // 允许窗口延迟 2 秒后关闭窗口
窗口算子计算结果如下:
详情如下:
1,sensor1,2022-04-17 22:07:01,36.1
7,sensor1,2022-04-17 22:07:02,36.7
8,sensor1,2022-04-17 22:07:04,36.8
11,sensor1,2022-04-17 22:07:07,36.9
12,sensor1,2022-04-17 22:07:11,36.9
13,sensor1,2022-04-17 22:07:09,36.9
15,sensor1,2022-04-17 22:07:15,36.9 -> 1, 7, 8, 11, 13
16,sensor1,2022-04-17 22:07:09,36.9 -> 1, 7, 8, 11, 13, 16
17,sensor1,2022-04-17 22:07:16,36.9
18,sensor1,2022-04-17 22:07:09,36.9 -> 1, 7, 8, 11, 13, 16, 18
19,sensor1,2022-04-17 22:07:17,36.9 窗口关闭
20,sensor1,2022-04-17 22:07:09,36.9 被丢弃
21,sensor1,2022-04-17 22:07:20,36.9
22,sensor1,2022-04-17 22:07:25,36.9 -> 12, 15, 17, 19
【结果分析】
- 事件15发生时:因水位线延迟5s,所以水位线时间戳=22:07:15-5s=22:07:10,等于第1个窗口的结束时间,故第1个窗口计算,结果为 【1, 7, 8, 11, 13】,但窗口没有关闭,因为lateness为2s,延迟2秒关闭,即当水位线大于等于 22:07:12 时,窗口关闭;
- 事件16发生时:第1个窗口因为 lateness=2s 没有关闭,又事件16时间戳=22:07:09,所以还是参与窗口1的计算,输出结果【1, 7, 8, 11, 13, 16】;
- 事件17发生时:时间戳=22:07:16,水位线时间戳=22:07:11,这小于带lateness=2s的窗口1的关闭时间 22:07:12,所以窗口1还是不会关闭;
- 事件18发生时:时间戳=22:07:09, 因水位线单调递增,故不变,还是22:07:11;事件18参与窗口1的计算,结果为 【1, 7, 8, 11, 13, 16, 18】
- 事件19发生时:时间戳=22:07:17,水位线=22:07:12,等于带lateness=2s的窗口1的关闭时间,窗口1关闭;
- 事件20发生时:时间戳=22:07:09,落入了窗口1的范围(22:07:00~22:07:10),但因窗口1已经关闭,所以事件20被丢弃;
通过以上示例,本文应该是把窗口的lateness属性 讲清楚了;
【问题】 事件20被丢弃的话, 不满足业务场景对数据一致性的要求;
- 因为服务1发送了10条数据,到达服务2的时候却只有9条数据,这不满足业务需求,是开发团队不愿意看到的事情;那如何找回这些被丢弃的事件呢?通过旁路输出;
【3】如何收集迟到数据
?从旁路输出(side output)获取迟到数据; 通过 Flink 的 旁路输出 功能,可以获得迟到数据的数据流。 首先,需要在开窗后的 stream 上使用 sideOutputLateData(OutputTag) 表明需要把迟到数据存入 旁输出流。
代码修改如下:添加旁路输出流(侧输出流)
// 侧输出流,对于延迟的且没有进入窗口的数据,放到侧输出流(旁路输出流)
OutputTag<SensorReadingTimeWatermarkWindow> lateOutputTag = new OutputTag<SensorReadingTimeWatermarkWindow>("late") {
};
// 开窗聚合
SingleOutputStreamOperator<String> aggForWindowStream =
streamWithWatermark.keyBy(SensorReadingTimeWatermarkWindow::getType)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(2)) // 允许延迟 2 秒后关闭窗口
.sideOutputLateData(lateOutputTag) // 无法进入窗口,则进入侧输出流
.aggregate(new AggregateFunction<SensorReadingTimeWatermarkWindow, String, String>() {
@Override
public String createAccumulator() {
return "";
}
@Override
public String add(SensorReadingTimeWatermarkWindow sensorReadingTimeWatermarkWindow, String s) {
return s + ", " + sensorReadingTimeWatermarkWindow.getId();
}
@Override
public String getResult(String s) {
return s;
}
@Override
public String merge(String s, String acc1) {
return s + ", " + acc1;
}
});
// 打印窗口算子结果
aggForWindowStream.print("aggForWindowStream");
// 打印旁输出流
aggForWindowStream.getSideOutput(lateOutputTag).print("lateOutputTag");
// 执行
env.execute("aggForWindowStream");
事件发生详情如下:
1,sensor1,2022-04-17 22:07:01,36.1
7,sensor1,2022-04-17 22:07:02,36.7
8,sensor1,2022-04-17 22:07:04,36.8
11,sensor1,2022-04-17 22:07:07,36.9
12,sensor1,2022-04-17 22:07:11,36.9
13,sensor1,2022-04-17 22:07:09,36.9
15,sensor1,2022-04-17 22:07:15,36.9 -> 1, 7, 8, 11, 13
16,sensor1,2022-04-17 22:07:09,36.9 -> 1, 7, 8, 11, 13, 16
17,sensor1,2022-04-17 22:07:16,36.9
18,sensor1,2022-04-17 22:07:09,36.9 -> 1, 7, 8, 11, 13, 16, 18
19,sensor1,2022-04-17 22:07:17,36.9 窗口关闭
20,sensor1,2022-04-17 22:07:09,36.9 -> lateOutputTag> SensorReadingTimeWindow{id='20', type='sensor1', timestamp=2022-04-17 22:07:09.0, temperature=36.9}
结果分析:
- 相比于【2】中代码示例, 事件20被丢弃了;而【3】中代码,当事件20出现时,由于窗口已经关闭,但存在侧输出流(旁路输出),所以事件20 存入侧输出流(解决了乱序数据迟到事件过长导致数据不一致的问题);相反如果没有侧输出流,则事件20会被丢弃;
|