4.DStream转换
DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的算子,如:updateStateByKey()、transform()以及各种Window相关的算子。
4.1 无状态转换操作
4.1.1 Transform
Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。 代码实现:
object SparkStreaming01_Transform {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkStreaming01_Transform").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(3))
val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
val resDS: DStream[(String, Int)] = socketDS.transform(
rdd => {
val flatMapRDD: RDD[String] = rdd.flatMap(_.split(" "))
val mapRDD: RDD[(String, Int)] = flatMapRDD.map((_, 1))
val redeceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
redeceRDD.sortByKey()
}
)
resDS.print()
ssc.start()
ssc.awaitTermination()
}
}
向hadoop102:9999端口发送数据
> a a a a c c d b b d c
输出:
-------------------------------------------
Time: 1628500017000 ms
-------------------------------------------
(a,4)
(b,2)
(c,3)
(d,2)
4.2 有状态转换操作
4.2.1 UpdateStateByKey
UpdateStateByKey算子用于将历史结果应用到当前批次,该操作允许在使用新信息不断更新状态的同时能够保留他的状态。 有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。 UpdateStateByKey() 的结果会是一个新的DStream,其内部的RDD 序列是由每个时间区间对应的(键,状态)对组成的。 为使用这个功能,需要做下面两步:
- 定义状态,状态可以是一个任意的数据类型。
- 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。 更新版的wordcount
代码实现:
object SparkStreaming02_updateStateByKey {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkStreaming02_updateStateByKey").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(3))
ssc.checkpoint("E:\\spark-0701\\cp")
val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
val flatMapDS: DStream[String] = socketDS.flatMap(_.split(" "))
val mapDS: DStream[(String, Int)] = flatMapDS.map((_, 1))
val stateDS: DStream[(String, Int)] = mapDS.updateStateByKey(
(seq: Seq[Int], state: Option[Int]) => {
Option(seq.sum + state.getOrElse(0))
}
)
stateDS.print()
ssc.start()
ssc.awaitTermination()
}
}
发送数据:
> a a
>
> a a a a
状态可以累加下去
输出:
-------------------------------------------
Time: 1628501622000 ms
-------------------------------------------
(a,2)
-------------------------------------------
Time: 1628501625000 ms
-------------------------------------------
(a,6)
4.2.2 Window Operations(窗口操作)
Spark Streaming 也提供了窗口计算, 允许执行转换操作作用在一个窗口内的数据。默认情况下, 计算只对一个时间段内的RDD进行, 有了窗口之后, 可以把计算应用到一个指定的窗口内的所有 RDD 上。一个窗口可以包含多个时间段,基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
窗口时长:计算内容的时间范围; 滑动步长:隔多久触发一次计算。 注意:这两者都必须为采集周期的整数倍。
窗口操作数据流解析: 代码实现:
object SparkStreaming03_window {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkStreaming01_Transform").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(3))
val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
val windowDS: DStream[String] = socketDS.window(Seconds(6), Seconds(3))
val resDS: DStream[(String, Int)] = windowDS.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
resDS.print()
ssc.start()
ssc.awaitTermination()
}
}
发送数据:
a
a
a
a
a
a
a
a
a
a
a
a
a
a
a
a
a
a
-------------------------------------------
Time: 1628506935000 ms
-------------------------------------------
(a,1)
-------------------------------------------
Time: 1628506938000 ms
-------------------------------------------
(a,12)
-------------------------------------------
Time: 1628506941000 ms
-------------------------------------------
(a,17)
-------------------------------------------
Time: 1628506944000 ms
-------------------------------------------
(a,6)
4.2.3 关于Window的其他操作
1) window(windowLength, slideInterval) 基于对源DStream窗化的批次进行计算返回一个新的Dstream 2) countByWindow(windowLength, slideInterval) 返回一个滑动窗口计数流中的元素个数 3) countByValueAndWindow() 返回的DStream则包含窗口中每个值的个数 4) reduceByWindow(func, windowLength, slideInterval) 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流 5) reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值 6) reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。如果把3秒的时间窗口当成一个池塘,池塘每一秒都会有鱼游进或者游出,那么第一个函数表示每由进来一条鱼,就在该类鱼的数量上累加。而第二个函数是,每由出去一条鱼,就将该鱼的总数减去一。
5.DStream输出
输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。
5.1 常用输出操作
1) print() 在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()。 2) saveAsTextFiles(prefix, [suffix]) 以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。 3) saveAsObjectFiles(prefix, [suffix]) 以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。 4) saveAsHadoopFiles(prefix, [suffix]) 将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API 中目前不可用。 5) foreachRDD(func) 这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。 通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中,但是在使用的时候需要注意以下几点 连接不能写在driver层面(序列化); 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失; 增加foreachPartition,在分区创建(获取)。
6.DStream 编程进阶
6.1 累加器和广播变量
和RDD中的累加器和广播变量的用法完全一样,RDD中怎么用, 这里就怎么用。
6.2 DataFrame and SQL Operations
你可以很容易地在流数据上使用 DataFrames 和SQL,你必须使用SparkContext来创建StreamingContext要用的SQLContext。此外,这一过程可以在驱动失效后重启。我们通过创建一个实例化的SQLContext单实例来实现这个工作。 如下例所示,我们对前例WordCount进行修改从而使用DataFrames和SQL来实现。每个RDD被转换为DataFrame,以临时表格配置并用SQL进行查询。
val spark = SparkSession.builder.config(conf).getOrCreate()
import spark.implicits._
mapDS.foreachRDD(rdd =>{
val df: DataFrame = rdd.toDF("word", "count")
df.createOrReplaceTempView("words")
spark.sql("select * from words").show
})
6.3 Caching/Persistence
和 RDDs 类似,DStreams 同样允许开发者将流数据保存在内存中。也就是说,在DStream上使用persist()方法将会自动把DStreams中的每个RDD保存在内存中。 当DStream中的数据要被多次计算时,这个非常有用(如在同样数据上的多次操作)。对于像reduceByWindow和reduceByKeyAndWindow以及基于状态的(updateStateByKey)这种操作,保存是隐含默认的。因此,即使开发者没有调用persist(),由基于窗操作产生的DStreams也会自动保存在内存中。
7.项目实战
7.1 数据准备
分析处理用户对广告点击的行为数据
7.1.1 数据生成方式
使用代码的方式持续的生成数据,然后写入到kafka中,然后从kafka消费数据,并对数据根据需求进行分析。
7.1.2 模拟数据生成及从kafka中读取数据
随机生成数据
...
msg = 1628517618902,华北,北京,104,2
msg = 1628517618914,华南,深圳,105,1
msg = 1628517618926,华南,深圳,105,1
msg = 1628517618938,华北,北京,103,4
msg = 1628517618950,华南,广州,103,1
msg = 1628517618962,华中,杭州,100,5
msg = 1628517618974,华南,深圳,100,4
msg = 1628517618986,华东,上海,102,1
msg = 1628517618998,华南,深圳,101,1
msg = 1628517619010,华南,深圳,103,5
msg = 1628517619022,华南,深圳,105,1
msg = 1628517619033,华南,广州,103,2
msg = 1628517619045,华东,上海,103,3
msg = 1628517619059,华南,广州,103,5
msg = 1628517619071,华东,上海,100,4
msg = 1628517619083,华北,北京,104,5
msg = 1628517619095,华南,深圳,102,4
步骤: 1)开启集群 kafka,zookeeper 2)创建topic
[atguigu@hadoop102 kafka]$ kafka-topics.sh --zookeeper hadoop102:2181 --create --topic my-ads --partitions 2 --replication-factor 2
3)循环产生数据指定的topic,创建spark-realtime模块
导入依赖:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-native_2.11</artifactId>
<version>3.2.11</version>
</dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_2.11</artifactId>
<version>3.2.11</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
</dependencies>
样例类AdsInfo.scala
import java.sql.Timestamp
case class AdsInfo(ts: Long,
timestamp: Timestamp,
dayString: String,
hmString: String,
area: String,
city: String,
userId: String,
adsId: String)
样例类CityInfo.scala
case class CityInfo(city_id: Long,
city_name: String,
area: String)
生成随机数 RandomNumUtil
import java.util.Random
import scala.collection.mutable
object RandomNumUtil {
val random = new Random()
def randomInt(from: Int, to: Int): Int = {
if (from > to) throw new IllegalArgumentException(s"from = $from 应该小于 to = $to")
random.nextInt(to - from + 1) + from
}
def randomLong(from: Long, to: Long): Long = {
if (from > to) throw new IllegalArgumentException(s"from = $from 应该小于 to = $to")
random.nextLong().abs % (to - from + 1) + from
}
def randomMultiInt(from: Int, to: Int, count: Int, canReat: Boolean = true): List[Int] = {
if (canReat) {
(1 to count).map(_ => randomInt(from, to)).toList
} else {
val set: mutable.Set[Int] = mutable.Set[Int]()
while (set.size < count) {
set += randomInt(from, to)
}
set.toList
}
}
def main(args: Array[String]): Unit = {
println(randomMultiInt(1, 15, 10))
println(randomMultiInt(1, 8, 10, false))
}
}
用于生成带有比重的随机选项 RandomOptions.scala
import scala.collection.mutable.ListBuffer
object RandomOptions {
def apply[T](opts: (T, Int)*): RandomOptions[T] = {
val randomOptions = new RandomOptions[T]()
randomOptions.totalWeight = (0 /: opts) (_ + _._2)
opts.foreach {
case (value, weight) => randomOptions.options ++= (1 to weight).map(_ => value)
}
randomOptions
}
def main(args: Array[String]): Unit = {
val opts = RandomOptions(("张三", 10), ("李四", 30), ("ww", 20))
println(opts.getRandomOption())
println(opts.getRandomOption())
}
}
class RandomOptions[T] {
var totalWeight: Int = _
var options = ListBuffer[T]()
def getRandomOption() = {
options(RandomNumUtil.randomInt(0, totalWeight - 1))
}
}
生成模拟数据 MockRealTime.scala
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.collection.mutable.ArrayBuffer
object MockRealTime {
def mockRealTimeData(): ArrayBuffer[String] = {
val array = ArrayBuffer[String]()
val randomOpts = RandomOptions(
(CityInfo(1, "北京", "华北"), 30),
(CityInfo(2, "上海", "华东"), 30),
(CityInfo(3, "广州", "华南"), 10),
(CityInfo(4, "深圳", "华南"), 20),
(CityInfo(4, "杭州", "华中"), 10))
(1 to 50).foreach {
i => {
val timestamp = System.currentTimeMillis()
val cityInfo = randomOpts.getRandomOption()
val area = cityInfo.area
val city = cityInfo.city_name
val userid = RandomNumUtil.randomInt(100, 105)
val adid = RandomNumUtil.randomInt(1, 5)
array += s"$timestamp,$area,$city,$userid,$adid"
Thread.sleep(10)
}
}
array
}
def createKafkaProducer: KafkaProducer[String, String] = {
val props: Properties = new Properties
props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
new KafkaProducer[String, String](props)
}
def main(args: Array[String]): Unit = {
val topic = "my-ads"
val producer: KafkaProducer[String, String] = createKafkaProducer
while (true) {
mockRealTimeData().foreach {
msg => {
producer.send(new ProducerRecord(topic, msg))
println("msg = " + msg);
Thread.sleep(100)
}
}
Thread.sleep(1000)
}
}
}
测试读取数据 RealtimeApp.scala
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
object RealTimeApp {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")
val ssc = new StreamingContext(conf, Seconds(3))
val brokers = "hadoop102:9092,hadoop103:9092,hadoop104:9092"
val topic = "my-ads"
val group = "bigdata"
val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
val kafkaParams = Map(
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization
)
val kafkaDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Set(topic))
val resRDD: DStream[String] = kafkaDS.map(_._2)
resRDD.print()
ssc.start()
ssc.awaitTermination()
}
}
7.2每天每地区热门广告Top3
代码实现:
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import java.text.SimpleDateFormat
import java.util.Date
object RealTime_req1 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RealTime_req1")
val ssc = new StreamingContext(conf, Seconds(3))
ssc.sparkContext.setCheckpointDir("E:\\spark-realtime\\cp")
val brokers = "hadoop102:9092,hadoop103:9092,hadoop104:9092"
val topic = "my-ads"
val group = "bigdata"
val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
val kafkaParams = Map(
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization
)
val kafkaDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Set(topic))
val dataDS: DStream[String] = kafkaDS.map(_._2)
val mapDS: DStream[(String, Int)] = dataDS.map {
line => {
val fields: Array[String] = line.split(",")
val timStamp: Long = fields(0).toLong
val day = new Date(timStamp)
val sdf = new SimpleDateFormat("yyyy-MM-dd")
val dayStr: String = sdf.format(day)
var area = fields(1)
var adv = fields(4)
(dayStr + "_" + area + "_" + adv, 1)
}
}
val updateDS: DStream[(String, Int)] = mapDS.updateStateByKey(
(seq: Seq[Int], buffer: Option[Int]) => {
Option(seq.sum + buffer.getOrElse(0))
}
)
val mapDS1: DStream[(String, (String, Int))] = updateDS.map {
case (k, sum) => {
val fields: Array[String] = k.split("_")
(fields(0) + "_" + fields(1), (fields(2), sum))
}
}
val groupDS: DStream[(String, Iterable[(String, Int)])] = mapDS1.groupByKey()
val resDS: DStream[(String, List[(String, Int)])] = groupDS.mapValues {
datas => {
datas.toList.sortBy(-_._2).take(3)
}
}
resDS.print()
ssc.start()
ssc.awaitTermination()
}
}
输出
-------------------------------------------
Time: 1628517585000 ms
-------------------------------------------
(2021-08-09_华北,List((5,7), (2,6), (4,4)))
(2021-08-09_华中,List((1,3), (4,2), (2,1)))
(2021-08-09_华南,List((2,7), (5,3), (1,3)))
(2021-08-09_华东,List((4,6), (3,5), (5,4)))
-------------------------------------------
Time: 1628517588000 ms
-------------------------------------------
(2021-08-09_华北,List((5,8), (2,7), (3,5)))
(2021-08-09_华中,List((1,3), (4,2), (2,1)))
(2021-08-09_华南,List((5,8), (2,7), (1,5)))
(2021-08-09_华东,List((2,6), (3,6), (4,6)))
|