import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.junit.Test
/** spark 滑动窗口测试(slide window)
*
* 参考 https://www.cnblogs.com/duanxz/p/4408789.html
*
* window: 对每个滑动窗口的数据执行自定义的计算
* countByWindow: 对每个滑动窗口的数据执行count操作
* reduceByWindow: 对每个滑动窗口的数据执行reduce操作
* reduceByKeyAndWindow: 对每个滑动窗口的数据执行countByValue操作
* countByValueAndWindow: 对每个滑动窗口的数据执行reduceByKey操作
* groupByKeyAndWindow: 对每个滑动窗口的数据执行groupByKey操作
*
* This program connects to a server socket and reads strings from the socket.
* The easiest way to try this out is to open a text sever (at port 12345)
* using the ''netcat'' tool via
* {{{
* nc -l 12345 on Linux or nc -l -p 12345 on Windows
* 本地使用的是 ncat -lk 12345,安装 https://nmap.org/download.html
* }}}
* and run this example with the hostname and the port as arguments..
*
* @ClassName: WindowHotWords
* @Author: 撒哈拉之风
* @Date: 2021/7/22 15:35
* @Version: V1.0.0
*/
class WindowTest {
/*
所有测试都依次输入:
q
a
z
q
a
q
*/
/*
输出结果:
windowTest q
windowTest a
windowTest z
windowTest q
windowTest a
windowTest q
*/
@Test
def windowTest: Unit = {
val conf = new SparkConf().setAppName("windowTest").setMaster("local[2]")
// Scala中,创建的是StreamingContext,拉取数据间隔为5秒
val ssc = new StreamingContext(conf, Seconds(5))
val searchLogsDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 12345)
val words = searchLogsDStream.flatMap(_.split(" "))
/*
第一个参数,是窗口长度,这是是60秒
第二个参数,是滑动间隔,这里是10秒
也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
而是只是放在那里
然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,所以之前60秒,就有12个RDD
所以这里的 window,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
每隔10秒钟,出来 之前60秒的收集到的单词
*/
val windowTest = words.window(Seconds(60), Seconds(10))
windowTest.foreachRDD(rdd=>rdd.foreach(x=>println(s"windowTest $x")))
ssc.start()
ssc.awaitTermination()
}
/*
输出结果:
countByWindowTest 6
*/
@Test
def countByWindowTest: Unit = {
val conf = new SparkConf().setAppName("countByWindowTest").setMaster("local[2]")
// Scala中,创建的是StreamingContext,拉取数据间隔为5秒
val ssc = new StreamingContext(conf, Seconds(5))
val searchLogsDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 12345)
val words = searchLogsDStream.flatMap(_.split(" "))
/*
第一个参数,是窗口长度,这是是60秒
第二个参数,是滑动间隔,这里是10秒
也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
而是只是放在那里
然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,所以之前60秒,就有12个RDD,给聚合起来,然后统一执行 count 操作
所以这里的 countByWindow,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
每隔10秒钟,出来 之前60秒的收集到的单词的统计次数(本代码暂不不分组)
*/
val countByWindowTest = words.countByWindow(Seconds(60), Seconds(10))
countByWindowTest.foreachRDD(rdd=>rdd.foreach(x=>println(s"countByWindowTest $x")))
// notes: 要加checkpoint,不加报错 2021/7/22
ssc.checkpoint("x")
ssc.start()
ssc.awaitTermination()
}
/*
输出结果:
reduceByWindowTest q-a-z-q-a-q
*/
@Test
def reduceByWindowTest: Unit = {
val conf = new SparkConf().setAppName("reduceByWindowTest").setMaster("local[2]")
// Scala中,创建的是StreamingContext,拉取数据间隔为5秒
val ssc = new StreamingContext(conf, Seconds(5))
val searchLogsDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 12345)
val words = searchLogsDStream.flatMap(_.split(" "))
/*
第一个参数,func
第二个参数,是窗口长度,这是是60秒
第三个参数,是滑动间隔,这里是10秒
也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
而是只是放在那里
然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,所以之前60秒,就有12个RDD,给聚合起来,然后统一执行 reduce 操作
所以这里的 reduceByWindow,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
每隔10秒钟,出来 之前60秒的收集到的单词的统计次数
*/
val reduceByWindowTest = words.reduceByWindow(_ + "-" + _, Seconds(60), Seconds(10))
reduceByWindowTest.foreachRDD(rdd=>rdd.foreach(x=>println(s"reduceByWindowTest $x")))
ssc.start()
ssc.awaitTermination()
}
/*
输出结果:
top3SearchWordCounts (q,3)
top3SearchWordCounts (a,2)
top3SearchWordCounts (z,1)
reduceByKeyAndWindowTest (z,1)
reduceByKeyAndWindowTest (a,2)
reduceByKeyAndWindowTest (q,3)
finalDStream (q,3)
finalDStream (a,2)
finalDStream (z,1)
*/
@Test
def reduceByKeyAndWindowTest: Unit = {
val conf = new SparkConf().setAppName("reduceByKeyAndWindowTest").setMaster("local[2]")
// Scala中,创建的是StreamingContext,拉取数据间隔为5秒
val ssc = new StreamingContext(conf, Seconds(5))
val searchLogsDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 12345)
val words = searchLogsDStream.flatMap(_.split(" "))
val searchWordPairDStream = words.map { searchWord => (searchWord, 1) }
/*
调用该操作的DStream中的元素格式为(k, v)
第一个参数,func
第二个参数,是窗口长度,这是是60秒
第三个参数,是滑动间隔,这里是10秒
也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
而是只是放在那里
然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,所以之前60秒,就有12个RDD,给聚合起来,然后统一执行 reduceByKey 操作
所以这里的reduceByKeyAndWindow,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
每隔10秒钟,出来 之前60秒的收集到的单词的统计次数
*/
val reduceByKeyAndWindowTest: DStream[(String, Int)] = searchWordPairDStream.reduceByKeyAndWindow((a:Int , b:Int) => (a + b), Seconds(60), Seconds(10))
reduceByKeyAndWindowTest.foreachRDD(rdd=>rdd.foreach(x=>println(s"reduceByKeyAndWindowTest $x")))
val finalDStream = reduceByKeyAndWindowTest.transform(searchWordCountsRDD => {
// notes: 数据转置 (K,V) > (V,K) 2021/7/22
val countSearchWordsRDD = searchWordCountsRDD.map(tuple => (tuple._2, tuple._1))
val sortedCountSearchWordsRDD = countSearchWordsRDD.sortByKey(false)
// notes: 数据还原 (V,K) > (K,V) 2021/7/22
val sortedSearchWordCountsRDD = sortedCountSearchWordsRDD.map(tuple => (tuple._2, tuple._1))
val top3SearchWordCounts = sortedSearchWordCountsRDD.take(3)
top3SearchWordCounts.foreach(x=>println(s"top3SearchWordCounts $x"))
sortedSearchWordCountsRDD
})
finalDStream.foreachRDD(rdd=>rdd.foreach(x=>println(s"finalDStream $x")))
ssc.start()
ssc.awaitTermination()
}
/*
输出结果:
top3SearchWordCounts (q,3)
top3SearchWordCounts (a,2)
top3SearchWordCounts (z,1)
reduceByKeyAndWindowTest (z,1)
reduceByKeyAndWindowTest (a,2)
reduceByKeyAndWindowTest (q,3)
finalDStream (q,3)
finalDStream (a,2)
finalDStream (z,1)
一段时间不输入任何信息,输出结果:
top3SearchWordCounts (z,0)
top3SearchWordCounts (a,0)
top3SearchWordCounts (q,0)
reduceByKeyAndWindowTest2 (z,0)
reduceByKeyAndWindowTest2 (a,0)
reduceByKeyAndWindowTest2 (q,0)
finalDStream (z,0)
finalDStream (a,0)
finalDStream (q,0)
*/
@Test
def reduceByKeyAndWindowTest2: Unit = {
val conf = new SparkConf().setAppName("reduceByKeyAndWindowTest2").setMaster("local[2]")
// Scala中,创建的是StreamingContext,拉取数据间隔为5秒
val ssc = new StreamingContext(conf, Seconds(5))
val searchLogsDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 12345)
val words = searchLogsDStream.flatMap(_.split(" "))
val searchWordPairDStream = words.map { searchWord => (searchWord, 1) }
/*
调用该操作的DStream中的元素格式为(k, v)
第一个参数,func
第二个参数,invFunc,invFunc是用于处理流出rdd
第三个参数,是窗口长度,这是是60秒
第四个参数,是滑动间隔,这里是10秒
也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
而是只是放在那里
然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,所以之前60秒,就有12个RDD,给聚合起来,然后统一执行reduceByKey操作
所以这里的reduceByKeyAndWindow,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
每隔10秒钟,出来 之前60秒的收集到的单词的统计次数
*/
// notes: 第二个参数是做什么用? 如果把60秒的时间窗口当成一个池塘,池塘每一秒都会有鱼游进或者游出,那么第一个函数表示每由进来一条鱼,就在该类鱼的数量上累加。而第二个函数是,每由出去一条鱼,就将该鱼的总数减去一。 2021/7/23
val reduceByKeyAndWindowTest2: DStream[(String, Int)] = searchWordPairDStream.reduceByKeyAndWindow((a: Int, b:Int ) => (a + b) , (a:Int, b: Int) => (a - b), Seconds(60), Seconds(10))
reduceByKeyAndWindowTest2.foreachRDD(rdd=>rdd.foreach(x=>println(s"reduceByKeyAndWindowTest2 $x")))
val finalDStream = reduceByKeyAndWindowTest2.transform(searchWordCountsRDD => {
// notes: 数据转置 (K,V) > (V,K) 2021/7/22
val countSearchWordsRDD = searchWordCountsRDD.map(tuple => (tuple._2, tuple._1))
val sortedCountSearchWordsRDD = countSearchWordsRDD.sortByKey(false)
// notes: 数据还原 (V,K) > (K,V) 2021/7/22
val sortedSearchWordCountsRDD = sortedCountSearchWordsRDD.map(tuple => (tuple._2, tuple._1))
val top3SearchWordCounts = sortedSearchWordCountsRDD.take(3)
top3SearchWordCounts.foreach(x=>println(s"top3SearchWordCounts $x"))
sortedSearchWordCountsRDD
})
finalDStream.foreachRDD(rdd=>rdd.foreach(x=>println(s"finalDStream $x")))
// notes: 要加checkpoint,不加报错 2021/7/22
ssc.checkpoint("x")
ssc.start()
ssc.awaitTermination()
}
/*
输出结果:
countByValueAndWindowTest (z,1)
countByValueAndWindowTest (a,2)
countByValueAndWindowTest (q,3)
*/
@Test
def countByValueAndWindowTest: Unit = {
val conf = new SparkConf().setAppName("countByValueAndWindowTest").setMaster("local[2]")
// Scala中,创建的是StreamingContext,拉取数据间隔为5秒
val ssc = new StreamingContext(conf, Seconds(5))
val searchLogsDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 12345)
val words = searchLogsDStream.flatMap(_.split(" "))
/*
第一个参数,是窗口长度,这是是60秒
第二个参数,是滑动间隔,这里是10秒
也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
而是只是放在那里
然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,所以之前60秒,就有12个RDD,给聚合起来,然后统一执行 countByValue 操作
所以这里的 countByValueAndWindow,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
每隔10秒钟,出来 之前60秒的收集到的单词的统计次数
*/
val countByValueAndWindowTest = words.countByValueAndWindow(Seconds(60), Seconds(10))
countByValueAndWindowTest.foreachRDD(rdd=>rdd.foreach(x=>println(s"countByValueAndWindowTest $x")))
// notes: 要加checkpoint,不加报错 2021/7/22
ssc.checkpoint("x")
ssc.start()
ssc.awaitTermination()
}
/*
输出结果:
groupByKeyAndWindowTest (z,ArrayBuffer(1))
groupByKeyAndWindowTest (a,ArrayBuffer(1, 1))
groupByKeyAndWindowTest (q,ArrayBuffer(1, 1, 1))
*/
@Test
def groupByKeyAndWindowTest: Unit = {
val conf = new SparkConf().setAppName("groupByKeyAndWindowTest").setMaster("local[2]")
// Scala中,创建的是StreamingContext,拉取数据间隔为5秒
val ssc = new StreamingContext(conf, Seconds(5))
val searchLogsDStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 12345)
val words = searchLogsDStream.flatMap(_.split(" "))
val searchWordPairDStream = words.map { searchWord => (searchWord, 1) }
/*
调用该操作的DStream中的元素格式为(k, v)
第一个参数,是窗口长度,这是是60秒
第二个参数,是滑动间隔,这里是10秒
也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续计算
而是只是放在那里
然后,等待我们的滑动间隔到了以后,10秒到了,会将之前60秒的RDD,因为一个batch间隔是5秒,所以之前60秒,就有12个RDD,给聚合起来,然后统一执行 groupByKey 操作
所以这里的 groupByKeyAndWindow,是针对每个窗口执行计算的,而不是针对 某个DStream中的RDD
每隔10秒钟,出来 之前60秒的收集到的单词的统计次数
*/
val groupByKeyAndWindowTest = searchWordPairDStream.groupByKeyAndWindow(Seconds(60), Seconds(10))
groupByKeyAndWindowTest.foreachRDD(rdd=>rdd.foreach(x=>println(s"groupByKeyAndWindowTest $x")))
ssc.start()
ssc.awaitTermination()
}
}
参考:
https://spark.apache.org/docs/3.1.2/streaming-programming-guide.html
https://www.cnblogs.com/duanxz/p/4408789.html
|