IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark Streaming 滑动窗口实践 -> 正文阅读

[大数据]Spark Streaming 滑动窗口实践

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.junit.Test

/** spark 滑动窗口测试(slide window)
  *
  * 参考 https://www.cnblogs.com/duanxz/p/4408789.html
  *
  * window: 对每个滑动窗口的数据执行自定义的计算
  * countByWindow: 对每个滑动窗口的数据执行count操作
  * reduceByWindow: 对每个滑动窗口的数据执行reduce操作
  * reduceByKeyAndWindow: 对每个滑动窗口的数据执行countByValue操作
  * countByValueAndWindow: 对每个滑动窗口的数据执行reduceByKey操作
  * groupByKeyAndWindow: 对每个滑动窗口的数据执行groupByKey操作
  *
  * This program connects to a server socket and reads strings from the socket.
  * The easiest way to try this out is to open a text sever (at port 12345)
  * using the ''netcat'' tool via
  * {{{
  * nc -l 12345 on Linux or nc -l -p 12345 on Windows
  * 本地使用的是  ncat -lk 12345,安装 https://nmap.org/download.html
  * }}}
  * and run this example with the hostname and the port as arguments..
  *
  * @ClassName: WindowHotWords
  * @Author: 撒哈拉之风
  * @Date: 2021/7/22 15:35
  * @Version: V1.0.0
  */
class WindowTest {

  /*
  所有测试都依次输入:
  q
  a
  z
  q
  a
  q
  */

  /*
  输出结果:
  windowTest q
  windowTest a
  windowTest z
  windowTest q
  windowTest a
  windowTest q
   */
  @Test
  def windowTest: Unit = {
    val conf = new SparkConf().setAppName("windowTest").setMaster("local[2]")

    // Scala中,创建的是StreamingContext,拉取数据间隔为5秒
    val ssc = new StreamingContext(conf, Seconds(5))

    val searchLogsDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 12345)

    val words = searchLogsDStream.flatMap(_.split(" "))

    /*
      第一个参数,是窗口长度,这是是60秒
      第二个参数,是滑动间隔,这里是10秒
      也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
      而是只是放在那里
      然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,所以之前60秒,就有12个RDD
      所以这里的 window,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
      每隔10秒钟,出来 之前60秒的收集到的单词
    */
    val windowTest = words.window(Seconds(60), Seconds(10))

    windowTest.foreachRDD(rdd=>rdd.foreach(x=>println(s"windowTest $x")))

    ssc.start()
    ssc.awaitTermination()
  }

  /*
  输出结果:
  countByWindowTest 6
   */
  @Test
  def countByWindowTest: Unit = {
    val conf = new SparkConf().setAppName("countByWindowTest").setMaster("local[2]")

    // Scala中,创建的是StreamingContext,拉取数据间隔为5秒
    val ssc = new StreamingContext(conf, Seconds(5))

    val searchLogsDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 12345)

    val words = searchLogsDStream.flatMap(_.split(" "))

    /*
      第一个参数,是窗口长度,这是是60秒
      第二个参数,是滑动间隔,这里是10秒
      也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
      而是只是放在那里
      然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,所以之前60秒,就有12个RDD,给聚合起来,然后统一执行 count 操作
      所以这里的 countByWindow,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
      每隔10秒钟,出来 之前60秒的收集到的单词的统计次数(本代码暂不不分组)
    */
    val countByWindowTest = words.countByWindow(Seconds(60), Seconds(10))

    countByWindowTest.foreachRDD(rdd=>rdd.foreach(x=>println(s"countByWindowTest $x")))

    // notes: 要加checkpoint,不加报错 2021/7/22
    ssc.checkpoint("x")
    ssc.start()
    ssc.awaitTermination()
  }

  /*
  输出结果:
  reduceByWindowTest q-a-z-q-a-q
   */
  @Test
  def reduceByWindowTest: Unit = {
    val conf = new SparkConf().setAppName("reduceByWindowTest").setMaster("local[2]")

    // Scala中,创建的是StreamingContext,拉取数据间隔为5秒
    val ssc = new StreamingContext(conf, Seconds(5))

    val searchLogsDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 12345)

    val words = searchLogsDStream.flatMap(_.split(" "))

    /*
      第一个参数,func
      第二个参数,是窗口长度,这是是60秒
      第三个参数,是滑动间隔,这里是10秒
      也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
      而是只是放在那里
      然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,所以之前60秒,就有12个RDD,给聚合起来,然后统一执行 reduce 操作
      所以这里的 reduceByWindow,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
      每隔10秒钟,出来 之前60秒的收集到的单词的统计次数
    */
     val reduceByWindowTest = words.reduceByWindow(_ + "-" + _, Seconds(60), Seconds(10))
    reduceByWindowTest.foreachRDD(rdd=>rdd.foreach(x=>println(s"reduceByWindowTest $x")))

    ssc.start()
    ssc.awaitTermination()
  }

  /*
  输出结果:
  top3SearchWordCounts (q,3)
  top3SearchWordCounts (a,2)
  top3SearchWordCounts (z,1)
  reduceByKeyAndWindowTest (z,1)
  reduceByKeyAndWindowTest (a,2)
  reduceByKeyAndWindowTest (q,3)
  finalDStream (q,3)
  finalDStream (a,2)
  finalDStream (z,1)
   */
  @Test
  def reduceByKeyAndWindowTest: Unit = {
    val conf = new SparkConf().setAppName("reduceByKeyAndWindowTest").setMaster("local[2]")

    // Scala中,创建的是StreamingContext,拉取数据间隔为5秒
    val ssc = new StreamingContext(conf, Seconds(5))

    val searchLogsDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 12345)

    val words = searchLogsDStream.flatMap(_.split(" "))

    val searchWordPairDStream = words.map { searchWord => (searchWord, 1) }

    /*
      调用该操作的DStream中的元素格式为(k, v)
      第一个参数,func
      第二个参数,是窗口长度,这是是60秒
      第三个参数,是滑动间隔,这里是10秒
      也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
      而是只是放在那里
      然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,所以之前60秒,就有12个RDD,给聚合起来,然后统一执行 reduceByKey 操作
      所以这里的reduceByKeyAndWindow,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
      每隔10秒钟,出来 之前60秒的收集到的单词的统计次数
    */
    val reduceByKeyAndWindowTest: DStream[(String, Int)] = searchWordPairDStream.reduceByKeyAndWindow((a:Int , b:Int) => (a + b), Seconds(60), Seconds(10))
    reduceByKeyAndWindowTest.foreachRDD(rdd=>rdd.foreach(x=>println(s"reduceByKeyAndWindowTest $x")))

    val finalDStream = reduceByKeyAndWindowTest.transform(searchWordCountsRDD => {
      // notes: 数据转置 (K,V) > (V,K) 2021/7/22
      val countSearchWordsRDD = searchWordCountsRDD.map(tuple => (tuple._2, tuple._1))

      val sortedCountSearchWordsRDD = countSearchWordsRDD.sortByKey(false)
      // notes: 数据还原 (V,K) > (K,V) 2021/7/22
      val sortedSearchWordCountsRDD = sortedCountSearchWordsRDD.map(tuple => (tuple._2, tuple._1))
      val top3SearchWordCounts = sortedSearchWordCountsRDD.take(3)

      top3SearchWordCounts.foreach(x=>println(s"top3SearchWordCounts $x"))

      sortedSearchWordCountsRDD
    })

    finalDStream.foreachRDD(rdd=>rdd.foreach(x=>println(s"finalDStream $x")))

    ssc.start()
    ssc.awaitTermination()
  }

  /*
  输出结果:
  top3SearchWordCounts (q,3)
  top3SearchWordCounts (a,2)
  top3SearchWordCounts (z,1)
  reduceByKeyAndWindowTest (z,1)
  reduceByKeyAndWindowTest (a,2)
  reduceByKeyAndWindowTest (q,3)
  finalDStream (q,3)
  finalDStream (a,2)
  finalDStream (z,1)

  一段时间不输入任何信息,输出结果:
  top3SearchWordCounts (z,0)
  top3SearchWordCounts (a,0)
  top3SearchWordCounts (q,0)
  reduceByKeyAndWindowTest2 (z,0)
  reduceByKeyAndWindowTest2 (a,0)
  reduceByKeyAndWindowTest2 (q,0)
  finalDStream (z,0)
  finalDStream (a,0)
  finalDStream (q,0)
   */
  @Test
  def reduceByKeyAndWindowTest2: Unit = {
    val conf = new SparkConf().setAppName("reduceByKeyAndWindowTest2").setMaster("local[2]")

    // Scala中,创建的是StreamingContext,拉取数据间隔为5秒
    val ssc = new StreamingContext(conf, Seconds(5))

    val searchLogsDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 12345)

    val words = searchLogsDStream.flatMap(_.split(" "))

    val searchWordPairDStream = words.map { searchWord => (searchWord, 1) }

    /*
      调用该操作的DStream中的元素格式为(k, v)
      第一个参数,func
      第二个参数,invFunc,invFunc是用于处理流出rdd
      第三个参数,是窗口长度,这是是60秒
      第四个参数,是滑动间隔,这里是10秒
      也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
      而是只是放在那里
      然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,所以之前60秒,就有12个RDD,给聚合起来,然后统一执行reduceByKey操作
      所以这里的reduceByKeyAndWindow,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
      每隔10秒钟,出来 之前60秒的收集到的单词的统计次数
    */
    // notes: 第二个参数是做什么用? 如果把60秒的时间窗口当成一个池塘,池塘每一秒都会有鱼游进或者游出,那么第一个函数表示每由进来一条鱼,就在该类鱼的数量上累加。而第二个函数是,每由出去一条鱼,就将该鱼的总数减去一。 2021/7/23
    val reduceByKeyAndWindowTest2: DStream[(String, Int)] = searchWordPairDStream.reduceByKeyAndWindow((a: Int, b:Int ) => (a + b) , (a:Int, b: Int) => (a - b), Seconds(60), Seconds(10))
    reduceByKeyAndWindowTest2.foreachRDD(rdd=>rdd.foreach(x=>println(s"reduceByKeyAndWindowTest2 $x")))

    val finalDStream = reduceByKeyAndWindowTest2.transform(searchWordCountsRDD => {
      // notes: 数据转置 (K,V) > (V,K) 2021/7/22
      val countSearchWordsRDD = searchWordCountsRDD.map(tuple => (tuple._2, tuple._1))

      val sortedCountSearchWordsRDD = countSearchWordsRDD.sortByKey(false)
      // notes: 数据还原 (V,K) > (K,V) 2021/7/22
      val sortedSearchWordCountsRDD = sortedCountSearchWordsRDD.map(tuple => (tuple._2, tuple._1))
      val top3SearchWordCounts = sortedSearchWordCountsRDD.take(3)

      top3SearchWordCounts.foreach(x=>println(s"top3SearchWordCounts $x"))

      sortedSearchWordCountsRDD
    })

    finalDStream.foreachRDD(rdd=>rdd.foreach(x=>println(s"finalDStream $x")))

    // notes: 要加checkpoint,不加报错 2021/7/22
    ssc.checkpoint("x")
    ssc.start()
    ssc.awaitTermination()
  }

  /*
  输出结果:
  countByValueAndWindowTest (z,1)
  countByValueAndWindowTest (a,2)
  countByValueAndWindowTest (q,3)
   */
  @Test
  def countByValueAndWindowTest: Unit = {
    val conf = new SparkConf().setAppName("countByValueAndWindowTest").setMaster("local[2]")

    // Scala中,创建的是StreamingContext,拉取数据间隔为5秒
    val ssc = new StreamingContext(conf, Seconds(5))

    val searchLogsDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 12345)

    val words = searchLogsDStream.flatMap(_.split(" "))

    /*
      第一个参数,是窗口长度,这是是60秒
      第二个参数,是滑动间隔,这里是10秒
      也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
      而是只是放在那里
      然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,所以之前60秒,就有12个RDD,给聚合起来,然后统一执行 countByValue 操作
      所以这里的 countByValueAndWindow,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
      每隔10秒钟,出来 之前60秒的收集到的单词的统计次数
    */
    val countByValueAndWindowTest = words.countByValueAndWindow(Seconds(60), Seconds(10))
    countByValueAndWindowTest.foreachRDD(rdd=>rdd.foreach(x=>println(s"countByValueAndWindowTest $x")))

    // notes: 要加checkpoint,不加报错 2021/7/22
    ssc.checkpoint("x")
    ssc.start()
    ssc.awaitTermination()
  }

  /*
  输出结果:
  groupByKeyAndWindowTest (z,ArrayBuffer(1))
  groupByKeyAndWindowTest (a,ArrayBuffer(1, 1))
  groupByKeyAndWindowTest (q,ArrayBuffer(1, 1, 1))
   */
  @Test
  def groupByKeyAndWindowTest: Unit = {
    val conf = new SparkConf().setAppName("groupByKeyAndWindowTest").setMaster("local[2]")

    // Scala中,创建的是StreamingContext,拉取数据间隔为5秒
    val ssc = new StreamingContext(conf, Seconds(5))

    val searchLogsDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 12345)

    val words = searchLogsDStream.flatMap(_.split(" "))

    val searchWordPairDStream = words.map { searchWord => (searchWord, 1) }

    /*
      调用该操作的DStream中的元素格式为(k, v)
      第一个参数,是窗口长度,这是是60秒
      第二个参数,是滑动间隔,这里是10秒
      也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
      而是只是放在那里
      然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,所以之前60秒,就有12个RDD,给聚合起来,然后统一执行 groupByKey 操作
      所以这里的 groupByKeyAndWindow,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
      每隔10秒钟,出来 之前60秒的收集到的单词的统计次数
    */
    val groupByKeyAndWindowTest = searchWordPairDStream.groupByKeyAndWindow(Seconds(60), Seconds(10))
    groupByKeyAndWindowTest.foreachRDD(rdd=>rdd.foreach(x=>println(s"groupByKeyAndWindowTest $x")))

    ssc.start()
    ssc.awaitTermination()
  }

}

参考:

https://spark.apache.org/docs/3.1.2/streaming-programming-guide.html

https://www.cnblogs.com/duanxz/p/4408789.html

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-24 11:33:51  更:2021-07-24 11:34:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 6:05:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码