一、什么是SparkStreaming
Spark Streaming 是 Spark 核心 API 的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。为了实现流式处理数据。
二、创建SparkStreaming环境对象
val conf = new SparkConf().setMaster("local[*]").setAppName("streaming")
// 创建环境对象,需要传递两个参数 第一个是环境配置 第二个是采集的周期
val streamContext = new StreamingContext(conf, Seconds(3))
... // 业务代码
streamContext.start() // 开始采集数据
... // 这里可以放一些自定义的DStream值
streamContext.awaitTermination() // 等待采集器结束
streamContext.stop()
三、SparkStreaming读入数据
1. 通过socket方式读入数据
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("streaming")
val streamContext = new StreamingContext(conf, Seconds(3))
// 通过socket获取数据
val lines = streamContext.socketTextStream("localhost", 9999)
lines .print()
streamContext.start()
streamContext.awaitTermination()
}
2. 自定义DS数据
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("streaming")
val streamContext = new StreamingContext(conf, Seconds(3))
// 创建一个dstream 类似创建一个rdd
val intQueue = new mutable.Queue[RDD[Int]]()
// oneAtATime 是否在每个间隔中只应从队列中使用一个RDD
val Dstream = streamContext.queueStream(intQueue, oneAtATime = false)
// 开始采集数据,数据开始采集之后放值不是一开始就是赋值好
streamContext.start()
// 将数据入队
intQueue.enqueue(streamContext.sparkContext.makeRDD(seq = 1 to 30, numSlices = 8))
streamContext.awaitTermination()
}
四、自定义数据采集器
object Dstream {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("streaming")
val streamContext = new StreamingContext(conf, Seconds(3))
// 使用自定义采集数据
val messageDs = streamContext.receiverStream(new MyReceiver())
messageDs.print()
streamContext.start()
streamContext.awaitTermination()
}
// 自定义数据采集器,需要继承Receiver并传一个接收数据的泛型,参数传存储级别
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
var tag = true;
// 开始采集数据的时候需要开辟另外一个线程
override def onStart(): Unit = {
new Thread(() => {
while (tag){
val message = new Random().nextInt().toString
// 自动将数据存储
store(message)
Thread.sleep(500)
}
}).start()
}
override def onStop(): Unit = {
tag = false
}
}
}
五、从kafka读数据
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("streaming")
val streamContext = new StreamingContext(conf, Seconds(3))
val kafkaPara: Map[String, Object] = Map[String, Object](
// 这里是kafka的访问地址或者是集群地址
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "",
// 组名称
ConsumerConfig.GROUP_ID_CONFIG -> "",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
// kafka util 创建连接, 需要一个泛型用于代表接受参数的类型
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](streamContext,
LocationStrategies.PreferConsistent, // 类似sparkCore的首选位置
// 消费者策略,需要topic的名称和kafka的参数
ConsumerStrategies.Subscribe[String, String](List("asd"), kafkaPara)
)
// _.value()就会获得kafka传过来的值
kafkaStream.map(_.value())
}
六、数据的状态
有状态和无状态的区别:
无状态操作只会对当前周期内的数据进行处理。
有状态就是数据有状态,可以对所有数据做汇总,并不是对一批数据进行汇总。
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("streaming")
val streamContext = new StreamingContext(conf, Seconds(3))
// 使用有状态的数据,需要设置检查点路径
streamContext.checkpoint("cp")
val lines = streamContext.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordToOne: DStream[(String, Int)] = words.map((_, 1))
// 根据key更新数据状态,需要传两个参数,第一个参数 相同key的value数据 第二个参数 缓冲区相同key的value数据
wordToOne.updateStateByKey((seq: Seq[Int], opt: Option[Int]) => {
// 由于这里拿到的都是二元组的第二个数据,直接sum进行求和
val value = opt.getOrElse(0) + seq.sum
Option(value) // 返回需要是缓冲区类型
}).print()
streamContext.start()
streamContext.awaitTermination()
}
七、转换操作
val lines = streamContext.socketTextStream("localhost", 9999)
// 用于周期性执行代码或者Dstream功能不完善 可以直接用底层rdd的方法
val rawRdd: DStream[String] = lines.transform(rdd => rdd)
transform和map的区别
val rawRdd: DStream[String] = lines.transform(
rdd => {
// 在这里的代码会周期性的执行在driver端
rdd.map(a => {
a // 在executor执行
})
}
)
val rawMap: DStream[String] = lines.map(
rdd => {
rdd // 在executor执行
}
)
八、无状态的join操作
val line1 = streamContext.socketTextStream("localhost", 9999)
val line2 = streamContext.socketTextStream("localhost", 8888)
val stream1 = line1.map((_, 0))
val stream2 = line2.map((_, 1))
// 无状态的操作join, 相同的key把关联的v放在一起
val value: DStream[(String, (Int, Int))] = stream1.join(stream2)
九、窗口操作(有状态)
val lines = streamContext.socketTextStream("localhost", 9999)
val wordToOne: DStream[(String, Int)] = lines.map((_, 1))
// 窗口的周期最好是采集周期的整数倍, 默认情况一个采集周期一个滑动
// 第一个参数 窗口大小 第二个参数 步长(防止窗口出现重叠)
val Dstream = wordToOne.window(Seconds(6), slideDuration = Seconds(6))
val value = Dstream.reduceByKey(_ + _)
可以自定义对于新数据和移除数据的处理
val lines = streamContext.socketTextStream("localhost", 9999)
val wordToOne: DStream[(String, Int)] = lines.map((_, 1))
// 需要设置检查点 按照窗口进行叠加
val value = wordToOne.reduceByKeyAndWindow(
(x, y) => {x + y}, // 当窗口滑动的时候新进来的数据怎么做(这里新增的数据做加法)
(x, y) => {x - y}, // 当窗口滑动的时候新移除的数据怎么做(这里新增的数据做减 法)
Seconds(9), // 窗口大小
Seconds(3) // 窗口周期
)
十、优雅的关闭
val conf = new SparkConf().setMaster("local[*]").setAppName("streaming")
val streamContext = new StreamingContext(conf, Seconds(6))
val line1 = streamContext.socketTextStream("localhost", 9999)
val stream1 = line1.map((_, 0))
stream1.start()
// 需要开启另外一个线程进行关闭,一般去读取第三方数据的状态,例如mysql、redis等等
new Thread(() => {
// 这里只是模拟
while(true){
if(streamContext.getState() == StreamingContextState.ACTIVE){
// 优雅的关闭, 计算节点不再接收新的数据 等节点处理完内部数据以后在进行关闭
streamContext.stop(true,true)
}
}
}).start()
streamContext.awaitTermination()
十一、数据的恢复
// 创建检查点 每次从检查点恢复数据,如果没有数据就去创建
val context = StreamingContext.getActiveOrCreate("cp", () => {
val conf = new SparkConf().setMaster("local[*]").setAppName("streaming")
val streamContext = new StreamingContext(conf, Seconds(6))
val line1 = streamContext.socketTextStream("localhost", 9999)
val stream1 = line1.map((_, 0))
stream1.print()
streamContext
})
context.checkpoint("cp")
context.start()
context.awaitTermination()
|