import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object TestEventtime {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
environment.getConfig.setAutoWatermarkInterval(1000)
environment.setParallelism(1)
val dataStream: DataStream[String] = environment.socketTextStream("192.168.229.10", 9999)
val dataStream2: DataStream[(String, Long)] = dataStream.map(_.split(" ")).map(v => (v(1), v(2).toLong))
val resault: DataStream[String] = dataStream2
.assignTimestampsAndWatermarks(new MyWWx)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new Myprocess)
resault.print()
environment.execute("eventttime")
}
}
class MyWWx extends AssignerWithPeriodicWatermarks[(String, Long)]{
var maxEventTime:Long = _
var maxAllowLateness:Long=2000
override def getCurrentWatermark: Watermark = new Watermark(maxEventTime-maxAllowLateness)
override def extractTimestamp(t: (String, Long), l: Long): Long = {
maxEventTime=Math.max(maxEventTime,t._2)
println(s"线程编号是${Thread.currentThread().getId}的线程===>到目前为止,计算出来的水位线是:${maxEventTime-maxAllowLateness}")
t._2
}
}
class Myprocess extends ProcessAllWindowFunction [(String, Long),String,TimeWindow]{
override def process(context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = {
val window: TimeWindow = context.window
val start: Long = window.getStart
val end: Long = window.getEnd
println(s"窗口的开始以及结束时间是:[${start},${end})")
out.collect(elements.map(_._1).mkString(","))
}
}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object LateEvent {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
environment.getConfig.setAutoWatermarkInterval(1000)
environment.setParallelism(1)
val dataSream: DataStream[String] = environment.socketTextStream("192.168.229.10", 9999)
val dataStream2: DataStream[(String, Long)] = dataSream.map(_.split(" ")).map(v => (v(0), v(1).toLong))
val output = new OutputTag[(String, Long)]("迟到数据")
val resault: DataStream[String] = dataStream2
.assignTimestampsAndWatermarks(new MyWWx)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(2))
.sideOutputLateData(output)
.process(new Myprocess)
resault.print()
val latedata: DataStream[(String, Long)] = resault.getSideOutput(output)
latedata.print("太晚的数据:")
environment.execute("laterdata")
}
}
|