Event-time Window
Event Time
Flink在流计算的过程中,支持多种时间概念。Event Time / Processing Time / Ingestion Time
- Processing Time:处理时间是指执行相应操作的机器的系统时间。
- Event Time:事件时间是每个事件在其生产设备上发生的时间。处理乱序数据(数据的处理和数据的生成顺序乱啦)
- Ingestion:摄取时间是事件进入Flink的时间
时间机制:在事件时间中,时间的进度取决于数据,而不是任何时钟。事 件时间程序必须指定如何生成事件时间Watermarks,这是事件时间进程的信号机制
Watermark-水位线
概念 Watermark是Flink中测量事件时间进度的机制。Watermark作为数据流的一部分流动,并带有时间戳t。数据流中的Watermark用于表示时间戳小Watermark的数据,都已经到达了。因此流中不应该再有时间戳t’<=watermark的元素。因此只有水位线越过对应窗口的结束时间,窗口才会关闭和进行计算
- watermark是一个衡量事件时间进度的机制:水位线就是时间戳
- watermark是解决乱序问题的重要依据
- 在窗口计算中,watermark没过了窗口的结束时间就触发窗口计算(属于该窗口的元素都已经到达);被认为早于这个时间的数据都已经到达
- watermark的计算应该是:最大事件时间-最大允许迟到时间
所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。
因为有可能乱序,如果只根据eventTime决定窗口的运行,就不能明确数据是否全部到位,但又不能无限期的等待,此时必须要有一种机制来保证一个特定的时间后,必须触发window function进行计算,这个机制就是Watermark。
Flink接收到每一条数据时,都会产生一条Watermark,这条Watermark就等于当前所有到达数据中的maxEventTime - 允许延迟的时间
watermark=maxEventTime-maxAllowedLateness
watermark计算
Flink中常用watermark计算方式有两种
- With Periodic Watermarks–定期提取水位线:比如每间隔1秒计算出来一个水位线
- With Punctuated Watermarks–每一个event计算一个水位线(不常用)
- With Periodic Watermarks
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
class MyAssignerWithPeriodicWatermarks extends AssignerWithPeriodicWatermarks[(String,Long)]{
var maxEventTime:Long=_
var maxAllowedLateness:Long=2000
override def getCurrentWatermark: Watermark = {
new Watermark(maxEventTime-maxAllowedLateness)
}
override def extractTimestamp(element: (String, Long), previousElementTimestamp:
Long): Long = {
val format = new SimpleDateFormat("HH:mm:ss")
maxEventTime=Math.max(maxEventTime,element._2)
println("当前元素:"+(element._1,format.format(element._2))+",水位 线:"+format.format(maxEventTime-maxAllowedLateness))
element._2
}
}
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
class MyAllWindowFunctionForEventTime extends AllWindowFunction[(String,Long),String,TimeWindow]{
override def apply(window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
val start: Long = window.getStart
val end: Long = window.getEnd
val format = new SimpleDateFormat("HH:mm:ss")
println("窗口时间范围:["+format.format(start)+","+format.format(end)+")")
val elements: String =input.map(word=>word._1+":"+format.format(word._2)).mkString(" | ") out.collect(elements)
}
}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
object AssignerWithPeriodicWatermarksDemo { def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(1)
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
environment.getConfig.setAutoWatermarkInterval(1000)
dataStream: DataStream[String] =environment.socketTextStream("flink.baizhiedu.com",9999)
val result: DataStream[String] = dataStream.map(line => line.split("\\s+"))
.map(word => (word(0), word(1).toLong))
.assignTimestampsAndWatermarks(new MyAssignerWithPeriodicWatermarks)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
.apply(new MyAllWindowFunctionForEventTime)
result.print()
environment.execute("AssignerWithPeriodicWatermarksDemoJob")
}
}
迟到数据
当有多个并行度的时候,水位线是按照最低的进行计算的
在?ink中对于迟到数据进行了三种处理
- 默认处理方式:直接丢弃(spark就是采用这个方式)
- 在允许迟到的范围内,会重新开启窗口进行重新计算
- 超出了范围的数据tooLate,可以采用边输出的方式呈现处理方便后续的处理
在Flink中,水位线一旦没过窗口的EndTime,如果还有数据落入到此窗口,这些数据被定义为迟到数据。默认情况 下,迟到数据将被删除。但是,Flink允许为窗口操作符指定允许的最大延迟,在允许的延迟范围内到达的元素仍然 会添加到窗口中。根据使用的触发器,延迟但未丢弃的元素可能会导致窗口再次触发。
- 如果Watermarker时间t < 窗口EndTime t’’ +允许迟到时间 t’ ,则该数据还可以参与窗口计算。
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream,StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object AssignerWithPunctuatedWatermarksForLatenessDemo { def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(1)
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
dataStream: DataStream[String] = environment.socketTextStream("flink.baizhiedu.com",9999)
val result: DataStream[String] = dataStream.map(line => line.split("\\s+"))
.map(word => (word(0), word(1).toLong))
.assignTimestampsAndWatermarks(new MyAssignerWithPunctuatedWatermarks)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
.allowedLateness(Time.seconds(2))
.apply(new MyAllWindowFunctionForEventTime)
result.print()
environment.execute("AssignerWithPeriodicWatermarksDemoJob")
}
}
- 如果Watermarker时间t >= 窗口EndTime t’’ + 允许迟到时间t’ 则该数据会被丢弃。为了能够更直观的呈现这些too late数据,可以通过side out输出
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream,StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object AssignerWithPunctuatedWatermarksForTooLateDemo { def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(1)
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
dataStream: DataStream[String] = environment.socketTextStream("flink.baizhiedu.com",9999)
var outputTag:OutputTag[(String,Long)]=new OutputTag[(String,Long)]("too late")
val result: DataStream[String] = dataStream.map(line => line.split("\\s+"))
.map(word => (word(0), word(1).toLong))
.assignTimestampsAndWatermarks(new MyAssignerWithPunctuatedWatermarks)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
.allowedLateness(Time.seconds(2))
.sideOutputLateData(outputTag)
.apply(new MyAllWindowFunctionForEventTime)
result.print("正常")
result.getSideOutput(outputTag).printToErr("太迟的数据")
environment.execute("AssignerWithPeriodicWatermarksDemoJob")
}
}
|