一.SparkStreaming概述
1.1 Spark Streaming是什么
Spark Streaming用于流式数据的处理 1.Spark Streaming支持的数据输入源头很多,例如:Kafka,Flume,HDFS等 2.数据输入后可以用Spark的高度抽象原语如:map,reduce,join,window等 3.而结果也能保存在很多地方,如HDFS,数据库等
1.2 Spark Streaming架构原理
1.2.1 什么是DStream 
1.2.2 架构图 整体架构图 
SparkStreaming架构图 
1.2.3 背压机制 Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。 为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure):根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。 通过属性“spark.streaming.backpressure.enabled”来控制是否启用背压机制,默认值false,即不启用。
1.3 Spark Streaming特点
易用 
容错 
易整合到Spark体系 
二.DStream入门
2.1 WordCount案例实操
需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数
1)添加依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
2)编写代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreaming01_WordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val lineDStream = ssc.socketTextStream("node1", 9999)
val wordDStream = lineDStream.flatMap(_.split(" "))
val wordToOneDStream = wordDStream.map((_, 1))
val wordToSumDStream = wordToOneDStream.reduceByKey(_+_)
wordToSumDStream.print()
ssc.start()
ssc.awaitTermination()
}
}
3)更改日志打印级别 将log4j.properties文件添加到resources里面,就能更改打印日志的级别为error,spark conf中也有log4j.properties
log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
4)启动程序并通过netcat发送数据:
[linux@node1 ~]$ nc -lk 9999
hello spark
5)在Idea控制台输出如下内容:
Time: 1602731772000 ms
(hello,1)
(spark,1)
注意:目前用的算子,只能处理本批次数据的累加,不能统计所有批次总的单词个数。
2.2 WordCount解析
DStream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流。 在内部实现上,每一批次的数据封装成一个RDD,一系列连续的RDD组成了DStream。对这些RDD的转换是由Spark引擎来计算。 说明:DStream中批次与批次之间计算相互独立。如果批次设置时间小于计算时间会出现计算任务叠加情况,需要多分配资源。通常情况,批次设置时间要大于计算时间。 
三.DStream创建
版本选型 
注意:目前spark3.0.0以上版本只有Direct模式。 http://spark.apache.org/docs/2.4.7/streaming-kafka-integration.html 
http://spark.apache.org/docs/3.0.0/streaming-kafka-0-10-integration.html 总结:不同版本的offset存储位置 0-8 ReceiverAPI offset默认存储在:Zookeeper中 0-8 DirectAPI offset默认存储在:CheckPoint 手动维护:MySQL等有事务的存储系统 0-10 DirectAPI offset默认存储在:_consumer_offsets系统主题 手动维护:MySQL等有事务存储系统
Kafka 0-10 Direct模式
1)需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。 
2)导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
3)编写代码
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object $02_SparkStreamingKafkaSource {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("test")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topics: Array[String] = Array("testTopic")
val properties: Map[String, Object] = Map[String, Object](
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"bootstrap.servers" -> "node1:9092,node2:9092,node3:9092",
"group.id" -> "sparkStreaming",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> "true"
)
val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, properties))
ds.foreachRDD(rdd => {
println(s"rdd分区数=${rdd.partitions.length}")
rdd.map(x => x.value())
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.foreach(println(_))
})
ssc.start()
ssc.awaitTermination()
}
}
4)测试 (1)分别启动Zookeeper和Kafka集群
[linux@node1 ~]$ zookeeper.sh start
[linux@node1 ~]$ kafka.sh start
(2)创建一个Kafka的Topic主题testTopic,两个分区
[linux@node1 kafka]$ bin/kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --create --topic testTopic --replication-factor 1 --partitions 2
(3)查看Topic列表
[linux@node1 kafka]$ bin/kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --list
(4)查看Topic详情
[linux@node1 kafka]$ bin/kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic testTopic --describe
(5)创建Kafka生产者
[linux@node1 kafka]$ bin/kafka-console-producer.sh --broker-list node1:9092 --topic testTopic
Hello spark
Hello spark
(6)创建Kafka消费者
[linux@node1 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic testTopic
5)查看_consumer_offsets主题中存储的offset
[linux@node1 kafka]$ bin/kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group sparkStreaming
Consumer group 'sparkStreaming' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sparkStreaming testTopic 0 12 12 0 - - -
sparkStreaming testTopic 1 11 11 0 - - -
sparkStreaming sparksqlsource 0 0 0 0 - - -
在生产者中生产数据,再次观察offset变化
四.DStream转换
DStream上的操作与RDD的类似,分为转换和输出两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
4.1 无状态转化操作
无状态转化操作:就是把RDD转化操作应用到DStream每个批次上,每个批次相互独立,自己算自己的。
DStream的部分无状态转化操作列在了下表中,都是DStream自己的API。 注意,针对键值对的DStream转化操作,要添加import StreamingContext._才能在Scala中使用,比如reduceByKey()。  需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD批次组成,且无状态转化操作是分别应用到每个RDD批次上的。
Transform算子 需求:通过Transform可以将DStream每一批次的数据直接转换为RDD的算子操作。
1)代码编写
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object $03_SparkStreaming05_Transform {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node1", 9999)
println("111111111:" + Thread.currentThread().getName)
val wordToSumDStream: DStream[(String, Int)] = lineDStream.transform(
rdd => {
println("222222:" + Thread.currentThread().getName)
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordToOne: RDD[(String, Int)] = words.map(x => {
println("333333:" + Thread.currentThread().getName)
(x, 1)
})
val value: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
value
}
)
wordToSumDStream.print
ssc.start()
ssc.awaitTermination()
}
}
2)测试
[linux@node1 ~]$ nc -lk 9999
hello spark
4.2 有状态转化操作
有状态转化操作:计算当前批次RDD时,需要用到历史RDD的数据。
4.2.1 UpdateStateByKey
updateStateByKey()用于键值对形式的DStream,可以记录历史批次状态。例如可以实现累加WordCount。 updateStateByKey()参数中需要传递一个函数,在函数内部可以根据需求对新数据和历史状态进行整合处理,返回一个新的DStream。
注意:使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。 checkpoint小文件过多 checkpoint记录最后一次时间戳,再次启动的时候会把间隔时间的周期再执行一次
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object $04_UpdateStateByKey {
def main(args: Array[String]): Unit = {
val ssc = new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"), Seconds(5))
ssc.checkpoint("checkpoint")
val ds: ReceiverInputDStream[String] = ssc.socketTextStream("node1", 9999)
ds.flatMap(_.split(" "))
.map((_, 1))
.updateStateByKey((currBath, state: Option[Int]) => {
val num: Int = state.getOrElse(0)
val sum: Int = currBath.sum
Some(sum + num)
})
.print()
ssc.start()
ssc.awaitTermination()
}
}
4.2.2 Window
Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。 窗口时长:计算内容的时间范围; 滑动步长:隔多久触发一次计算。 注意:这两者都必须为采集批次大小的整数倍。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object $05_Window {
def main(args: Array[String]): Unit = {
val ssc = new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"), Seconds(3))
ssc.sparkContext.setLogLevel("error")
ssc.checkpoint("checkpoint")
val ds = ssc.socketTextStream("node1", 9999)
val ds2 = ds.flatMap(_.split(" "))
.map((_, 1))
.window(Seconds(6), Seconds(3))
.reduceByKey(_ + _)
.print()
ssc.start()
ssc.awaitTermination()
}
}
4.2.3 reduceByKeyAndWindow
基本语法:window(windowLength, slideInterval): 基于对源DStream窗化的批次进行计算返回一个新的DStream。 1.reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值 2.reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并“反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于“可逆的reduce函数”,也就是这些reduce函数有相应的“反reduce”函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object $06_reduceByKeyAndWindow {
def main(args: Array[String]): Unit = {
val ssc = new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"), Seconds(3))
ssc.checkpoint("checkpoint")
val ds = ssc.socketTextStream("node1", 9999)
val ds2 = ds.flatMap(_.split(" "))
.map((_, 1))
.reduceByKeyAndWindow((agg: Int, curr: Int) => agg + curr, Seconds(6), Seconds(3))
.print()
val ds3 = ds.flatMap(_.split(" "))
.map((_, 1))
.reduceByKeyAndWindow((agg: Int, curr: Int) => agg + curr, (agg: Int, curr: Int) => {
println(s"agg:${agg} curr:${curr}")
agg - curr
}, Seconds(6), Seconds(3))
.print()
ssc.start()
ssc.awaitTermination()
}
}
Window的其他操作 (1)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数; (2)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
五.DStream输出
DStream通常将数据输出到,外部数据库或屏幕上。 DStream与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个Context就都不会启动。 1)输出操作API如下: saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。“prefix-Time_IN_MS[.suffix]”。 saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将DStream中的数据保存为 SequenceFiles 。每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。 saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files。每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。 注意:以上操作都是每一批次写出一次,会产生大量小文件,在生产环境,很少使用。 print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。 foreachRDD(func):这是最通用的输出操作,即将函数func用于产生DStream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者写入数据库。 在企业开发中通常采用foreachRDD(),它用来对DStream中的RDD进行任意计算。这和transform()有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到如MySQL的外部数据库中。 2)foreachRDD代码实操
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object $07_DStreamingOutPut {
def main(args: Array[String]): Unit = {
val ssc = new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("text"), Seconds(5))
ssc.sparkContext.setLogLevel("warn")
val ds = ssc.socketTextStream("node1", 9999)
val ds2 = ds.flatMap(x => x.split(" "))
.map(x => (x, 1))
.reduceByKey((curr, agg) => agg + curr)
ds2.foreachRDD(rdd => {
rdd.foreachPartition(it => {
var connection: Connection = null
var statement: PreparedStatement = null
try {
connection = DriverManager.getConnection("jdbc:mysql://node1:3306/test", "root", "root")
statement = connection.prepareStatement("insert into xx values(?,?)")
it.foreach(x => {
statement.setString(1, x._1)
statement.setInt(2, x._2)
statement.executeUpdate()
})
} catch {
case e: Exception =>
} finally {
if (statement != null)
statement.close()
if (connection != null)
connection.close()
}
})
})
ssc.start()
ssc.awaitTermination()
}
}
3)注意 (1)连接不能写在Driver层面(序列化) (2)如果写在foreach则每个RDD中的每一条数据都创建,得不偿失; (3)增加foreachPartition,在分区创建(获取)。
六.优雅关闭
流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了。 关闭方式:使用外部文件系统来控制内部程序关闭。 1)主程序
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
object SparkStreaming8_stop {
def main(args: Array[String]): Unit = {
//1.初始化Spark配置信息
val sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")
//2.初始化SparkStreamingContext
val ssc: StreamingContext = new StreamingContext(sparkconf, Seconds(3))
// 设置优雅的关闭
sparkconf.set("spark.streaming.stopGracefullyOnShutdown", "true")
// 接收数据
val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node1", 9999)
// 执行业务逻辑
lineDStream.flatMap(_.split(" "))
.map((_,1))
.print()
// 开启监控程序
new Thread(new MonitorStop(ssc)).start()
//4 启动SparkStreamingContext
ssc.start()
// 将主线程阻塞,主线程不退出
ssc.awaitTermination()
}
}
// 监控程序
class MonitorStop(ssc: StreamingContext) extends Runnable{
override def run(): Unit = {
// 获取HDFS文件系统
val fs: FileSystem = FileSystem.get(new URI("hdfs://node1:9820"),new Configuration(),"atguigu")
while (true){
Thread.sleep(5000)
// 获取/stopSpark路径是否存在
val result: Boolean = fs.exists(new Path("hdfs://node1:9820/stopSpark"))
if (result){
val state: StreamingContextState = ssc.getState()
// 获取当前任务是否正在运行
if (state == StreamingContextState.ACTIVE){
// 优雅关闭
ssc.stop(stopSparkContext = true, stopGracefully = true)
System.exit(0)
}
}
}
}
}
2)测试 (1)发送数据
[linux@node1 ~]$ nc -lk 9999
hello world
(2)启动Hadoop集群
[linux@node1 hadoop-3.1.3]$ sbin/start-dfs.sh
[linux@node1 hadoop-3.1.3]$ hadoop fs -mkdir /stopSpark
|