IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink的window -> 正文阅读

[大数据]flink的window

Flink的Window操作

Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而
窗口(window)就是从 Streaming 到 Batch 的一个桥梁。Flink 提供了非常完善的窗口机制。
什么是window
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息
就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这
种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
如果在数据流上,截取固定大小的一部分,这部分是可以进行统计的。 截取方式主要有两种:
1.根据时间 进行截取(time-driven-window),比如每1分钟统计一次或每10分钟统计一次。
2.根据消息数量进行截取(data-driven-window),比如每5个数据统计一次或每50个数据统计一次。
tumbling-time-window (翻滚窗口-无重叠数据)
按照 时间 来进行窗口划分,每次窗口的 滑动距离 等于窗口的长度,这样数据不会重复计算

package com.ccj.pxj.heima.stream.window
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object StreamingTumblingTimeWindow {
  def main(args: Array[String]): Unit = {
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val datass: DataStream[String] = senv.socketTextStream("192.168.25.60", 8886)
    val datas = datass.map(x => {
      val data = x.split(",")
      WordCountCart(data(0).toInt, data(1).toInt)
    }).keyBy(_.sen).timeWindow(Time.seconds(5)).sum(1)
    datas.print()
    senv.execute("pxj")
  }
}
case class WordCountCart(sen: Int, cardNum: Int)

sliding-time-window (滑动窗口-有重叠数据)

按照时间来进行窗口划分,每次窗口的滑动距离小于窗口的长度,这样数据就会有一部分重复计算

在这里插入图片描述

package com.ccj.pxj.heima.stream.window
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object StreamingTimeSlidingWindow {
  def main(args: Array[String]): Unit = {
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val datas: DataStream[String] = senv.socketTextStream("pxj60", 8886)
    val data: DataStream[WordCountCart] = datas.map(x => {
      val data: Array[String] = x.split(",")
      WordCountCart(data(0).toInt, data(1).toInt)
    }).keyBy(_.sen).timeWindow(Time.seconds(10), Time.seconds(2)).sum(1)
    data.print()
    senv.execute("pxj")
  }
}
case class WordCountCart(sen: Int, cardNum: Int)

小结:
1.如果窗口计算时间 > 窗口时间,会出现数据丢失
2.如果窗口计算时间 < 窗口时间,会出现数据重复计算
3.如果窗口计算时间 = 窗口时间,数据不会被重复计算
窗口计算时间 > 窗口时间
窗口计算时间 < 窗口时间
窗口计算时间 = 窗口时间
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

Count-Window

tumbling-count-window (无重叠数据)

按照个数进行统计,比如:
每个路口分别统计,收到关于它的5条消息时,统计在最近5条消息中,各自路口通过的汽车数量

package com.ccj.pxj.heima.stream.window
import org.apache.flink.streaming.api.scala._
object StreamingCountTumblingWindow {
  def main(args: Array[String]): Unit = {
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val datas = senv.socketTextStream("pxj60", 8886)
    val data = datas.map(x => {
      val data = x.split(",")
      CountCart(data(0).toInt, data(1).toInt)
    }).keyBy(_.sen).countWindow(5).sum(1)
    data.print()
    senv.execute("pxj")
  }
}
case class CountCart(sen:Int, cardNum:Int)

sliding-count-window (有重叠数据)

同样也是窗口长度和滑动窗口的操作:窗口长度是5,滑动长度是3

package com.ccj.pxj.heima.stream.window
import org.apache.flink.streaming.api.scala._
object StreamingCountSlidingWindow {
  def main(args: Array[String]): Unit = {
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val data: DataStream[String] = senv.socketTextStream("pxj60", 8886)
    val data1: DataStream[CountCart] = data.map(x => {
      val da = x.split(",")
      CountCart(da(0).toInt, da(1).toInt)
    }).keyBy(_.sen).countWindow(5, 3).sum(1)
    data1.print()
    senv.execute("pxj")
  }
}
case class CountCart(sen:Int, cardNum:Int)
    
    
    2,1
2,2
2,3
2,4
2,5
2,7,2
12> CountCart(2,6)
12> CountCart(2,21)

Window apply

apply方法可以进行一些自定义处理,通过匿名内部类的方法来实现。当有一些复杂计算时使用。
用法
1.实现 WindowFunction 类
2.指定该类的泛型为 [输入数据类型, 输出数据类型, keyBy中使用分组字段的类型, 窗口类型]
示例
使用apply方法来实现单词统计
步骤
1.获取流处理运行环境
2.构建socket流数据源,并指定IP地址和端口号
3.对接收到的数据转换成单词元组
4.使用keyBy 进行分流(分组)
5.使用timeWinodw 指定窗口的长度(每3秒计算一次)
6.实现一个WindowFunction匿名内部类在apply方法中实现聚合计算使用Collector.collect收集数据
7.打印输出
8.启动执行
9.在Linux中,使用 nc -lk 端口号 监听端口,并发送单词

package com.ccj.pxj.heima.stream.window
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object WindowApply {
  def main(args: Array[String]): Unit = {
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val datas = senv.socketTextStream("pxj60", 8886)
    val dataStream: DataStream[(String, Int)] = datas.flatMap(x => {
      x.split(" ").map(_ -> 1)
    })
    val data = dataStream.keyBy(_._1).timeWindow(Time.seconds(5))
    val datass: DataStream[(String, Int)] = data.apply(new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
      override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
        //        -在apply方法中实现聚合计算
        val re: (String, Int) = input.reduce(
          (x, y) => (x._1, x._2 + y._2)
        )
        out.collect(re)
      }
    })
    datass.print()
    senv.execute("pxj")
  }
}

作者:pxj
日期:2021-08-03 22:44:58
你若安好便是晴天!

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-04 11:16:50  更:2021-08-04 11:17:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 9:44:28-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码