??EventTime是事件在现实世界中发生的时间,ProcessingTime是Flink系统处理该事件的时间。在实际业务处理中,我们使用EventTime的次数是大于ProcessingTime的。但问题就随之而来了,因为一系列问题会造成的消息的延迟到达,所以就引入了WaterMark这一概念来一定程度上减少消息延迟问题带来的不便。 ??下面代码是演示的理想的情况下,无消息延迟的情况下,不需watermark。
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream = env.socketTextStream("node1", 666)
.map(line => {
val ps = line.split(",")
TrainAlarm(ps(0), ps(1).toLong, ps(2).toDouble)
})
.assignAscendingTimestamps(_.ts*1000L)
.keyBy(_.id)
inputStream
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.maxBy("temp")
.print()
inputStream
.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(2)))
.maxBy("temp")
.print()
inputStream
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.maxBy("temp")
.print()
env.execute()
}
??但往往以上代码的情况是不会出现的,所以以下的代码才是我们经常会遇到的。
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream = env.socketTextStream("node1", 666)
.map(line => {
val ps = line.split(",")
TrainAlarm(ps(0), ps(1).toLong, ps(2).toDouble)
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner[TrainAlarm] {
override def extractTimestamp(element: TrainAlarm, recordTimestamp: Long): Long = element.ts*1000L
})
)
val lateTag = new OutputTag[TrainAlarm]("late")
val result = inputStream
.keyBy(_.id)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateTag)
.maxBy("temp")
result.print()
result.getSideOutput(lateTag)
env.execute()
}
??以下的为自定义Watermark策略
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream = env.socketTextStream("node1", 666)
.map(line => {
val ps = line.split(",")
TrainAlarm(ps(0), ps(1).toLong, ps(2).toDouble)
})
.assignTimestampsAndWatermarks(new AssignAndWm)
}
}
class AssignAndWm extends WatermarkStrategy[TrainAlarm]{
override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[TrainAlarm] = {
new TimestampAssigner[TrainAlarm] {
override def extractTimestamp(element: TrainAlarm, recordTimestamp: Long): Long = element.ts * 1000L
}
}
override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[TrainAlarm] = {
new WatermarkGenerator[TrainAlarm] {
val maxOutOfOrderness = 2000L
var maxTimestamp:Long = 0
override def onEvent(event: TrainAlarm, eventTimestamp: Long, output: WatermarkOutput): Unit = {
maxTimestamp = Math.max(maxTimestamp,eventTimestamp)
}
override def onPeriodicEmit(output: WatermarkOutput): Unit = {
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness))
}
}
}
|