Flink的Window操作
Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而 窗口(window)就是从 Streaming 到 Batch 的一个桥梁。Flink 提供了非常完善的窗口机制。 什么是window 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息 就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这 种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。 如果在数据流上,截取固定大小的一部分,这部分是可以进行统计的。 截取方式主要有两种: 1.根据时间 进行截取(time-driven-window),比如每1分钟统计一次或每10分钟统计一次。 2.根据消息数量进行截取(data-driven-window),比如每5个数据统计一次或每50个数据统计一次。 tumbling-time-window (翻滚窗口-无重叠数据) 按照 时间 来进行窗口划分,每次窗口的 滑动距离 等于窗口的长度,这样数据不会重复计算
package com.ccj.pxj.heima.stream.window
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object StreamingTumblingTimeWindow {
def main(args: Array[String]): Unit = {
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val datass: DataStream[String] = senv.socketTextStream("192.168.25.60", 8886)
val datas = datass.map(x => {
val data = x.split(",")
WordCountCart(data(0).toInt, data(1).toInt)
}).keyBy(_.sen).timeWindow(Time.seconds(5)).sum(1)
datas.print()
senv.execute("pxj")
}
}
case class WordCountCart(sen: Int, cardNum: Int)
sliding-time-window (滑动窗口-有重叠数据)
按照时间来进行窗口划分,每次窗口的滑动距离小于窗口的长度,这样数据就会有一部分重复计算
package com.ccj.pxj.heima.stream.window
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object StreamingTimeSlidingWindow {
def main(args: Array[String]): Unit = {
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val datas: DataStream[String] = senv.socketTextStream("pxj60", 8886)
val data: DataStream[WordCountCart] = datas.map(x => {
val data: Array[String] = x.split(",")
WordCountCart(data(0).toInt, data(1).toInt)
}).keyBy(_.sen).timeWindow(Time.seconds(10), Time.seconds(2)).sum(1)
data.print()
senv.execute("pxj")
}
}
case class WordCountCart(sen: Int, cardNum: Int)
小结: 1.如果窗口计算时间 > 窗口时间,会出现数据丢失 2.如果窗口计算时间 < 窗口时间,会出现数据重复计算 3.如果窗口计算时间 = 窗口时间,数据不会被重复计算 窗口计算时间 > 窗口时间 窗口计算时间 < 窗口时间 窗口计算时间 = 窗口时间
Count-Window
tumbling-count-window (无重叠数据)
按照个数进行统计,比如: 每个路口分别统计,收到关于它的5条消息时,统计在最近5条消息中,各自路口通过的汽车数量
package com.ccj.pxj.heima.stream.window
import org.apache.flink.streaming.api.scala._
object StreamingCountTumblingWindow {
def main(args: Array[String]): Unit = {
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val datas = senv.socketTextStream("pxj60", 8886)
val data = datas.map(x => {
val data = x.split(",")
CountCart(data(0).toInt, data(1).toInt)
}).keyBy(_.sen).countWindow(5).sum(1)
data.print()
senv.execute("pxj")
}
}
case class CountCart(sen:Int, cardNum:Int)
sliding-count-window (有重叠数据)
同样也是窗口长度和滑动窗口的操作:窗口长度是5,滑动长度是3
package com.ccj.pxj.heima.stream.window
import org.apache.flink.streaming.api.scala._
object StreamingCountSlidingWindow {
def main(args: Array[String]): Unit = {
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val data: DataStream[String] = senv.socketTextStream("pxj60", 8886)
val data1: DataStream[CountCart] = data.map(x => {
val da = x.split(",")
CountCart(da(0).toInt, da(1).toInt)
}).keyBy(_.sen).countWindow(5, 3).sum(1)
data1.print()
senv.execute("pxj")
}
}
case class CountCart(sen:Int, cardNum:Int)
2,1
2,2
2,3
2,4
2,5
2,7,2
12> CountCart(2,6)
12> CountCart(2,21)
Window apply
apply方法可以进行一些自定义处理,通过匿名内部类的方法来实现。当有一些复杂计算时使用。 用法 1.实现 WindowFunction 类 2.指定该类的泛型为 [输入数据类型, 输出数据类型, keyBy中使用分组字段的类型, 窗口类型] 示例 使用apply方法来实现单词统计 步骤 1.获取流处理运行环境 2.构建socket流数据源,并指定IP地址和端口号 3.对接收到的数据转换成单词元组 4.使用keyBy 进行分流(分组) 5.使用timeWinodw 指定窗口的长度(每3秒计算一次) 6.实现一个WindowFunction匿名内部类在apply方法中实现聚合计算使用Collector.collect收集数据 7.打印输出 8.启动执行 9.在Linux中,使用 nc -lk 端口号 监听端口,并发送单词
package com.ccj.pxj.heima.stream.window
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object WindowApply {
def main(args: Array[String]): Unit = {
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val datas = senv.socketTextStream("pxj60", 8886)
val dataStream: DataStream[(String, Int)] = datas.flatMap(x => {
x.split(" ").map(_ -> 1)
})
val data = dataStream.keyBy(_._1).timeWindow(Time.seconds(5))
val datass: DataStream[(String, Int)] = data.apply(new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
val re: (String, Int) = input.reduce(
(x, y) => (x._1, x._2 + y._2)
)
out.collect(re)
}
})
datass.print()
senv.execute("pxj")
}
}
作者:pxj 日期:2021-08-03 22:44:58 你若安好便是晴天!
|