一、Flink中的时间语义
在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:
1.三种时间语义
1.1 Event Time
是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。
1.2 Ingestion Time
是事件进入Flink的时间。
1.3 Processing Time
是每一个执行基于时间操作的算子的本地系统时间,与机器 相关,默认的时间属性就是 Processing Time。
1.4总结
例如,一条日志进入Flink的时间为2017-11-12 10:00:00.123,到达Window的系统时间为2017-11-12 10:00:01.234,日志的内容如下:
2017-11-02 18:37:15.624 INFO Fail over to rm2
对于业务来说,要统计1min内的故障日志个数,哪个时间是最有意义的? 毫无疑问是eventTime,因为我们要根据日志的生成时间进行统计。
2.EventTime 的引入
在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。 如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
二、Watermark
1.产生原因
流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 Event Time 顺序排列的。 假设时间窗口为5。 这里的1,2,3…是事件发生的事件,这里是按照eventTime来计算的。 按照理想情况来说,数据是按正常的顺序到达的,所以1~4的数据会放到第一个桶,当遇到5这个数据,会放到第二个桶,同时第一个桶关闭并计算输出结果。 如果出现了乱序的情况1和4两个数据放到第一个桶,到5这个数据,第一个桶关闭了,后面2和3这两个数据发生了丢失。 为了避免前面问题的发生,我们不能只根据eventTime来决定window的运行,要使用一个机制来保证在一定时间后,数据全部到达并且去触发window进行计算,这个机制就是Watermark。
2.Watermark的理解
1)Watermark是一种衡量Event Time进展的机制。 2)Watermark是用于处理乱序事件的,而正确处理乱序事件,通常用Watermark机制结合window来实现。 3)数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。 4)Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 eventTime小于maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。
2.1有序流的Watermarker
如下图所示:
2.2乱序流的Watermarker
Watermarker=maxEventTime-延迟时长,而不是说设置的Watermark为2 上图中,我们设置的允许最大延迟到达时间为 2s,所以时间戳为 7s 的事件对应的 Watermark 是 5s,时间戳为 12s 的事件的 Watermark 是 10s,如果我们的窗口 1是1s到5 s,窗口2是6s到10s,那么时间戳7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。 Watermark 就是触发前一窗口的"关窗时间",一旦触发关门那么以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。 只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗。
2.3总结
1)当Flink接收到数据时,会按照一定的规则去生成Watermark,这条Watermark就等于当前所有到达数据中的maxEventTime-延迟时长,所以Watermark是基于数据携带的时间戳生成的。 2)一旦Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。 3)如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不会被触发。
3.Watermark的传递
Watermark可以向下游传递,Watermark代表的是时间,代表的是之前的数据都处理完了,所以Watermark从上游到下游是广播形式传递的。 同时,上游也可能有并行子任务,即可能有多个Watermark,不同子任务的数据是不一样的,而且处理的速度也不一样,上游两个Watermark给下游一个任务发送Waterark,要取二者之间最小的那个Watermark。
4.Watermark的引入
4.1 乱序数据Watermark的引入
使用方法如下:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<SensorReading> sensorReadingSingleOutputStreamOperator = mapResult.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading sensorReading) {
return sensorReading.getTime() * 1000L;
}
});
BoundedOutOfOrdernessTimestampExtractor意思就是有界的、乱序的(超出一定顺序的)、时间戳提取器。 这个方法首先就要求传一个参数,作为Watermark的延迟时间ts,要求实现extractTimestamp方法,获取当前数据的时间戳,时间戳格式是毫秒。 注意:Event Time 的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事件时间是什么(数据源里的数据没有时间戳的话,就只能使用 Processing Time 了)。 代码进阶: assignTimestampsAndWatermarks这个类要求我们实现一个接口,这个接口有两种形式: 1)第一种是AssignerWithPeriodicWatermarks,它是周期性生成Watermark的方式。 如下图: 它是按照一种固定的时间生成的,不需要传参数。 再看下图: BoundedOutOfOrdernessTimestampExtractor是实现了AssignerWithPeriodicWatermarks接口,这个接口是周期性生成Watermark的分配器,并不是一个数据来了就生成一个Watermark,而是隔一个固定的周期就生成一个Watermark。 类中有一个变量currentMaxTimestamp,保存的是当前时间为止最大的时间戳。 lastEmittedWatermark变量表示的是上一次的Watermark的值,初始值设置为Long的最小值。 maxOutOfOrderness变量表示的是最大延迟时间。 对下面语句的说明:
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = -9223372036854775808L + this.maxOutOfOrderness;
public final Watermark getCurrentWatermark() {
long potentialWM = this.currentMaxTimestamp - this.maxOutOfOrderness;
if (potentialWM >= this.lastEmittedWatermark) {
this.lastEmittedWatermark = potentialWM;
}
return new Watermark(this.lastEmittedWatermark);
}
因为这是周期性生成Watermark,而不是来一个数据生成一个Watermark,所以可能会出现刚开始没有数据的时候就在生成Watermark,因为Watermark是由当前最大时间戳减去延迟时间,所以在第一次生成时间戳的时候可能会导致Long的最小值减去一个延迟时间ts导致数据溢出,所以要设置默认的当前最大时间戳为Long的最小值加上延迟时间ts,这样一减数据不会溢出。 下面是获取当前Watermark的代码:
public final Watermark getCurrentWatermark() {
long potentialWM = this.currentMaxTimestamp - this.maxOutOfOrderness;
if (potentialWM >= this.lastEmittedWatermark) {
this.lastEmittedWatermark = potentialWM;
}
return new Watermark(this.lastEmittedWatermark);
}
生成Watermark的代码实际上就是那本次时间戳和延迟时间计算得到的watermark跟之前最大的watermark作比较,取最大值。
2)第二种是AssignerWithPunctuatedWatermarks这个接口(断点式生成方式),它是来一个数据就生成一个时间戳 第一个参数是刚刚接收到的数据,第二个参数是提取到的时间戳,返回一个Watermark。
4.2 有序数据Watermark的引入
代码如下:
SingleOutputStreamOperator<SensorReading> sensorReadingSingleOutputStreamOperator1 = mapResult.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {
@Override
public long extractAscendingTimestamp(SensorReading sensorReading) {
return sensorReading.getTime() * 1000L;
}
});
有序数据生成watermark要实现AscendingTimestampExtractor这个类,它也是一个周期性生成watermark的类,它也实现了AssignerWithPeriodicWatermarks接口,如下代码:
@PublicEvolving
public abstract class AscendingTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
private static final long serialVersionUID = 1L;
private long currentTimestamp = -9223372036854775808L;
private AscendingTimestampExtractor.MonotonyViolationHandler violationHandler = new AscendingTimestampExtractor.LoggingHandler();
public AscendingTimestampExtractor() {
}
public abstract long extractAscendingTimestamp(T var1);
public AscendingTimestampExtractor<T> withViolationHandler(AscendingTimestampExtractor.MonotonyViolationHandler handler) {
this.violationHandler = (AscendingTimestampExtractor.MonotonyViolationHandler)Objects.requireNonNull(handler);
return this;
}
public final long extractTimestamp(T element, long elementPrevTimestamp) {
long newTimestamp = this.extractAscendingTimestamp(element);
if (newTimestamp >= this.currentTimestamp) {
this.currentTimestamp = newTimestamp;
return newTimestamp;
} else {
this.violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
return newTimestamp;
}
}
public final Watermark getCurrentWatermark() {
return new Watermark(this.currentTimestamp == -9223372036854775808L ? -9223372036854775808L : this.currentTimestamp - 1L);
}
public static final class LoggingHandler implements AscendingTimestampExtractor.MonotonyViolationHandler {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AscendingTimestampExtractor.class);
public LoggingHandler() {
}
public void handleViolation(long elementTimestamp, long lastTimestamp) {
LOG.warn("Timestamp monotony violated: {} < {}", elementTimestamp, lastTimestamp);
}
}
public static final class FailingHandler implements AscendingTimestampExtractor.MonotonyViolationHandler {
private static final long serialVersionUID = 1L;
public FailingHandler() {
}
public void handleViolation(long elementTimestamp, long lastTimestamp) {
throw new RuntimeException("Ascending timestamps condition violated. Element timestamp " + elementTimestamp + " is smaller than last timestamp " + lastTimestamp);
}
}
public static final class IgnoringHandler implements AscendingTimestampExtractor.MonotonyViolationHandler {
private static final long serialVersionUID = 1L;
public IgnoringHandler() {
}
public void handleViolation(long elementTimestamp, long lastTimestamp) {
}
}
public interface MonotonyViolationHandler extends Serializable {
void handleViolation(long var1, long var3);
}
}
它的生成watermark的方式:
public final Watermark getCurrentWatermark() {
return new Watermark(this.currentTimestamp == -9223372036854775808L ? -9223372036854775808L : this.currentTimestamp - 1L);
}
如果当前数据的时间戳是Long的最小值,就取这个值,否则就取当前数据时间戳减1,相当于延迟了1ms,例如时间窗口为0~4,当前数据为5秒,则watermark为4,代表4之前的数据全部到达,同时触发时间窗口的关闭,若不减1,则会把4点多的数据也放进去。
5.进一步理解Assigner with periodic watermarks
周期性的生成 watermark:系统会周期性的将 watermark 插入到流中(水位线也是一种特殊的事件!)。默认周期是 200 毫秒。可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。
env.getConfig.setAutoWatermarkInterval(5000);
产生 watermark 的逻辑:每隔 5 秒钟,Flink 会调用 AssignerWithPeriodicWatermarks 的 getCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的 watermark 会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会 产生新的 watermark。 自定义一个周期性的时间戳抽取:
public static class MyPeriodicAssigner implements AssignerWithPeriodicWatermarks<SensorReading>{
private Long bound = 60 * 1000L;
private Long maxTs = Long.MIN_VALUE;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(maxTs - bound);
}
@Override
public long extractTimestamp(SensorReading element, long previousElementTimestamp)
{
maxTs = Math.max(maxTs, element.getTimestamp());
return element.getTimestamp();
}
}
6.进一步理解Assigner with punctuated watermarks
间断式地生成 watermark,和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理。直接上代码来举个例子,我们只给sensor_1 的传感器的数据流插入 watermark,代码如下:
public static class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks<SensorReading> {
private Long bound = 60 * 1000L;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(SensorReading lastElement, long extractedTimestamp) {
if(lastElement.getId().equals("sensor_1"))
return new Watermark(extractedTimestamp - bound);
else
return null;
}
@Override
public long extractTimestamp(SensorReading element, long previousElementTimestamp)
{
return element.getTime();
}
}
7.Watermark的设定
在Flink中,watermark由应用程序开发人员生成,这通常需要对相应的领域有一定的了解。 如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果。 而如果watermark到达得太早,则可能收到错误结果,不过Flink处理迟到数据的机制可以解决这个问题。 间断性生成watermark的优点是实时性,每来一条数据就能生成一个watermark;缺点是数据量过大时,时间戳都差不多,可能有很多重复的watermark,很浪费资源。适用于:数据稀疏的情况。 周期型生成watermark的优点是不会生成那么频繁,在数据量过大情况下不会生成很多重复的数据,缺点是如果数据很长时间才生成一次,它还会生成watermark,这个时候生成的watermark是浪费的。适用于:数据稠密的情况。 周期型生成watermark的时间设置: 在设置事件语义的方法中setStreamTimeCharacteristic:
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = (TimeCharacteristic)Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
this.getConfig().setAutoWatermarkInterval(0L);
} else {
this.getConfig().setAutoWatermarkInterval(200L);
}
}
三、时间语义下的窗口测试
1.时间语义下的窗口测试
需求:计算十五秒内的温度的最小值。 代码如下:
public class WindowTest_03 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> readDataStream = env.readTextFile("D:\\opt\\idea-workspace\\Flume_Interceptor\\src\\main\\java\\com\\atguigu\\flinkTest\\Sensor.txt");
DataStream<SensorReading> mapDataStream = readDataStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields = s.split(",");
return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
}
});
mapDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading sensorReading) {
return sensorReading.getTime()*1000L;
}
});
SingleOutputStreamOperator<SensorReading> resultDataStream = mapDataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.minBy("temperature");
resultDataStream.print();
env.execute();
}
}
测试数据: 这里是模拟端口输入数据 执行结果: 第一次输出:在时间戳到达212时,控制台打印: 第二次输出:在时间戳为227时,控制台多打印了一条信息: 说明它的时间窗口为[195,210),在212时,watermark的值为212-2=210,触发了窗口的关闭和结果的计算输出。 第二个时间窗口为[210,225],在227时,watermark的值为227-2=225,触发了窗口的关闭和结果的计算输出。
2.时间窗口的划分
从上面图可以看出来,我们输入的第一个数据的时间戳时199,但是窗口却是[195,210),原因如下,在设置时间窗口的timewindow方法中:
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
return this.environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime ? this.window(TumblingProcessingTimeWindows.of(size)) : this.window(TumblingEventTimeWindows.of(size));
}
@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final long offset;
protected TumblingEventTimeWindows(long size, long offset) {
if (Math.abs(offset) >= size) {
throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
} else {
this.size = size;
this.offset = offset;
}
}
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > -9223372036854775808L) {
long start = TimeWindow.getWindowStartWithOffset(timestamp, this.offset, this.size);
return Collections.singletonList(new TimeWindow(start, start + this.size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
public String toString() {
return "TumblingEventTimeWindows(" + this.size + ")";
}
public static TumblingEventTimeWindows of(Time size) {
return new TumblingEventTimeWindows(size.toMilliseconds(), 0L);
}
public static TumblingEventTimeWindows of(Time size, Time offset) {
return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
}
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new Serializer();
}
public boolean isEventTime() {
return true;
}
}
上面的assginWindows方法是用来开时间窗口的,当第一条数据来了的时候,getWindowStartWithOffset方法会去计算时间窗口的值:
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
默认的offset为0,windowSize%windowSize的结果也为0,实际上就是当前时间戳减去当前时间戳对窗口大小的余数,这个结果是窗口大小的整数倍。 即第一条数据的时间戳并不一定是窗口的起始值,窗口的起始值是第一条数据时间戳之前的最近的是窗口大小整数倍的时间戳。
3.时间窗口的修改
2中展示的时间窗口的offset为0,我们要想根据自己的需求修改时间窗口,那么就要修改offset的值。 1)对于滑动窗口 滑动窗口可以用window+TumblingEventTimeWindows的方法去创建,TumblingEventTimeWindows的of方法可以传两个参数,第二个参数就是offset的值。
mapResult.keyBy(data->data.getId()).window(TumblingEventTimeWindows.of(Time.seconds(15)).minBy(1);
public static TumblingEventTimeWindows of(Time size) {
return new TumblingEventTimeWindows(size.toMilliseconds(), 0L);
}
public static TumblingEventTimeWindows of(Time size, Time offset) {
return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
}
2)对于滚动窗口 滚动窗口可以使用window+SlidingEventTimeWindows的方式创建,SlidingEventTimeWindows的of方法可以传offset参数。
mapResult.keyBy("id")
.window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5),Time.seconds(2)))
.minBy("temperature");
public static SlidingEventTimeWindows of(Time size, Time slide) {
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0L);
}
public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
}
PS:偏移量一般用于处理不同时区的时间,可以设置时间窗口为1天,偏移量为-8小时
四、迟到数据的处理
watermark只是延迟了时间,相当于把表调慢了,但是这并不能保证所有的数据在watermark这个机制下能全部到达,因此要结合allowedLateness(Time.minutes(1))这个允许迟到数据的方法。 1)只使用watermark方法 如果只使用watermark方法,那么在watermark的值达到窗口[a,b)中的b的值时,窗口会触发计算并打印结果,同时会关闭窗口。 2)使用watermark+allowedLateness 使用watermark+allowedLateness时,那么在watermark的值达到窗口[a,b)中的b的值时,窗口会触发计算并打印结果,但是窗口并不会关闭,会将计算的结果保存为一个状态,在watermark<窗口结束时间+允许延迟时间内,来一条数据,就要跟之前的状态进行计算,在watermark>=窗口结束时间+允许延迟时间时,窗口真正关闭,后面来的数据统统写入侧输出流中。 例子如下:
OutputTag<SensorReading> outputTag=new OutputTag<SensorReading>("late"){};
SingleOutputStreamOperator<SensorReading> sum = mapResult.keyBy("id")
.timeWindow(Time.seconds(15))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(outputTag)
.sum("temperature");
sum.getSideOutput(outputTag).print("late");
i.当输入数据在红线框内时,还未触发计算,时间戳到212时,watermark为210,触发第一个窗口计算,并输出一个计算结果: ii.窗口触发计算后,在窗口真正关闭前,每来一条数据就会更新一次数据并打印,在下图红框内窗口还未关闭(时间戳272前),当有数据到第一个桶中时,会更新数据:
iii.当watermark大于等于窗口结束时间+允许迟到时间,即watermark大于等于210+60=270时,窗口关闭,下图中时间戳为272时,watermark为270,此时窗口关闭,后面来的第一个桶的数据会写入侧输出流:
|