窗口计算
概述
窗口计算是流计算的核心。
窗口计算就是把无界数据流切分为有限大小的“bucket”—>窗口(bucket/window/panel),在窗口上应用计算换上完成计算处理
整体的程序结构
窗口的切分
专业的说法:window assigner–>窗口分配器:把无界数据流怎么做窗口的划分
Tumbling Windows:滚动窗口
窗口大小是固定的、上一个窗口的结束是下一个窗口的开始(窗口不会重叠)
Sliding Windows:滑动窗口
窗口大小是固定的,窗口有可能有重叠。窗口会有一个滑动步长(上一个窗口开始,往后滑动一定的时间下一个窗口开始)
Session Windows:会话窗口
窗口大小不固定,窗口之间会有一个间隙(gap)
Global Window:全局窗口
整个数据流是一个窗口,因为数据流是无界的,所以全局窗口默认情况下,永远不会触发计算数据
窗口代码-窗口分配器
窗口切分—windowAssigner的使用
TumblingWindowsAssigner
package com.baizhi.flink.assigner
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object TumblingWindowsAssignerJob {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
val keyedStream: KeyedStream[(String, Int), String] = dataStream
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
val windowedStream: WindowedStream[(String, Int), String, TimeWindow] = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
val result: DataStream[(String, Int)] = windowedStream.reduce((sum, elem) => (elem._1, sum._2 + elem._2))
result.print()
environment.execute("tumblingWindowsAssignerJob")
}
}
SlidingWindowsAssigner
package com.baizhi.flink.assigner
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object SlidingWindowsAssignerJob {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
val keyedStream: KeyedStream[(String, Int), String] = dataStream
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
val windowedStream: WindowedStream[(String, Int), String, TimeWindow] = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
val result: DataStream[(String, Int)] = windowedStream.reduce((sum, elem) => (elem._1, sum._2 + elem._2))
result.print()
environment.execute("tumblingWindowsAssignerJob")
}
}
SessionWindowsAssigner
package com.baizhi.flink.assigner
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{ProcessingTimeSessionWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object SessionWindowsAssignerJob {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
val keyedStream: KeyedStream[(String, Int), String] = dataStream
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
var windowedStream = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
val result: DataStream[(String, Int)] = windowedStream.reduce((sum, elem) => (elem._1, sum._2 + elem._2))
result.print()
environment.execute("tumblingWindowsAssignerJob")
}
}
GlobalWindowAssigner
package com.baizhi.flink.assigner
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow}
object GlobalWindowsAssignerJob {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
val keyedStream: KeyedStream[(String, Int), String] = dataStream
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
val windowedStream: WindowedStream[(String, Int), String, GlobalWindow] = keyedStream
.window(GlobalWindows.create())
.trigger(CountTrigger.of[GlobalWindow](3))
val result: DataStream[(String, Int)] = windowedStream.reduce((sum, elem) => {
println("***********")
(elem._1, sum._2 + elem._2)
})
result.print()
environment.execute("tumblingWindowsAssignerJob")
}
}
窗口代码-窗口计算函数
reduceFunction
reduceFunction 做的是增量计算—》每过来一个数据,就会执行一次计算。到窗口结束,触发计算的时候,就把计算的结果发送出去
package com.baizhi.flink.function
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object ReduceFunctionJob {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
val keyedStream: KeyedStream[(String, Int), String] = dataStream
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
val result: DataStream[(String, Int)] = keyedStream
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new MyReduceFunction)
result.print()
environment.execute("reduceFunctionJob")
}
}
class MyReduceFunction extends ReduceFunction[(String, Int)]{
override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
(value2._1,value2._2+value1._2)
}
}
processWindowFunction
processWindowFunction做的是批量计算—》每过来一个数据,就会把数据存储起来。到窗口结束,触发计算的时候,把所有的数据获取到进行一次性计算
可以获取到窗口的元数据信息—》窗口的开始时间,结束时间…
package com.baizhi.flink.function
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object ProcessWindowFunctionJob {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
val keyedStream: KeyedStream[(String, Int), String] = dataStream
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
val result: DataStream[(String, Int)] = keyedStream
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new MyProcessWindowFunction)
result.print()
environment.execute("processWindowFunctionJob")
}
}
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Int),(String, Int),String,TimeWindow]{
override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
val count: Int = elements.map(_._2).sum
val timeWindow: TimeWindow = context.window
val start: Long = timeWindow.getStart
val end: Long = timeWindow.getEnd
println(s"窗口时间:[${start}-${end}),窗口中的数据:${elements.mkString(",")}")
out.collect((key,count))
}
}
既要高效计算又要元数据信息
- 高效计算就是指增量计算,是由ReduceFunction、aggregateFunction完成的
- 元数据信息,是由ProcessWindowFunction完成的
就应该把这两个放在一起使用
package com.baizhi.flink.function
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object ReduceAndProcessWindowFunctionJob {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
val keyedStream: KeyedStream[(String, Int), String] = dataStream
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
val result: DataStream[String] = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new MyReduceFunction2, new MyProcessWindowFunction2)
result.print()
environment.execute("processWindowFunctionJob")
}
}
class MyReduceFunction2 extends ReduceFunction[(String,Int)]{
override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
println(value1+"***"+value2)
(value2._1,value1._2+value2._2)
}
}
class MyProcessWindowFunction2 extends ProcessWindowFunction[(String,Int),String,String,TimeWindow]{
override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = {
println(elements.mkString(","))
val timeWindow: TimeWindow = context.window
val start: Long = timeWindow.getStart
val end: Long = timeWindow.getEnd
println(s"窗口时间:[${start}-${end})")
val list: List[(String, Int)] = elements.toList
out.collect(list(0)._1+"的个数是:"+list(0)._2)
}
}
在窗口计算中使用状态
- 在窗口中使用状态:一个窗口对应一个状态,做数据的存储
- 在窗口之间使用状态:把所有窗口的数据汇总起来
package com.baizhi.flink.function
import org.apache.flink.api.common.state.{KeyedStateStore, ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object StateJob {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)
val keyedStream: KeyedStream[(String, Int), String] = dataStream
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
val result: DataStream[String] = keyedStream
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new MyProcessWindowFunction3)
result.print()
environment.execute("stateJob")
}
}
class MyProcessWindowFunction3 extends ProcessWindowFunction[(String,Int),String,String,TimeWindow]{
var vsdForEachWindow:ValueStateDescriptor[Int]=_
var vsdForAllWindows:ValueStateDescriptor[Int]=_
override def open(parameters: Configuration): Unit = {
vsdForEachWindow=new ValueStateDescriptor[Int]("vsdfew",createTypeInformation[Int])
vsdForAllWindows=new ValueStateDescriptor[Int]("vsdfaw",createTypeInformation[Int])
}
override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = {
val count: Int = elements.map(_._2).sum
val windowRuntimeContext: KeyedStateStore = context.windowState
val valueStateForEachWindow: ValueState[Int] = windowRuntimeContext.getState(vsdForEachWindow)
val globalRuntimeContext: KeyedStateStore = context.globalState
val valueStateForAllWindows: ValueState[Int] = globalRuntimeContext.getState(vsdForAllWindows)
val oldCount: Int = valueStateForAllWindows.value()
valueStateForAllWindows.update(oldCount+count)
println(s"到目前位置,${key}的个数是:"+valueStateForAllWindows.value())
out.collect(key+"个数是:"+count)
}
}
Trigger
Trigger:触发器
由触发器决定了窗口的计算函数执行时间
如果默认的触发器不能满足业务需要,就可以通过代码完成触发器的使用
keyedStream.window.trigger(trigger)
Flink提供了Trigger。flink提供了很多Trigger的具体类型可以直接使用
- ConutTrigger:数量触发器,当窗口中的数据达到规定的数量时就触发计算
- DeltaTrigger:计算两个数的差值,符合了规定的标准,就触发计算
如果Flink提供的Trigger具体类型不能满足业务需要,就自定义Trigger
Evictor
evict:剔除器,在计算函数触发计算之前/之后,可以把窗口中的数据剔除掉
keyedStream.window.evictor(evictor)
Flink提供了Evictor.Flink提供了很多具体的Evictor实现类
- CountEvictor:数量剔除器,当窗口中数据的数量达到规定的值,进行数据的剔除
如果Flink提供的Evictor不能满足业务需要,可以自定义Evictor
如果默认的触发器不能满足业务需要,就可以通过代码完成触发器的使用
keyedStream.window.trigger(trigger)
Flink提供了Trigger。flink提供了很多Trigger的具体类型可以直接使用
- ConutTrigger:数量触发器,当窗口中的数据达到规定的数量时就触发计算
- DeltaTrigger:计算两个数的差值,符合了规定的标准,就触发计算
如果Flink提供的Trigger具体类型不能满足业务需要,就自定义Trigger
Evictor
evict:剔除器,在计算函数触发计算之前/之后,可以把窗口中的数据剔除掉
keyedStream.window.evictor(evictor)
Flink提供了Evictor.Flink提供了很多具体的Evictor实现类
- CountEvictor:数量剔除器,当窗口中数据的数量达到规定的值,进行数据的剔除
如果Flink提供的Evictor不能满足业务需要,可以自定义Evictor
|