1.sparkStreaming 流式处理框架,是Spark API的扩展,RDD最终封装到DStream中
2.第一个wordcount
pom依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkStreaming01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("streaming01")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val ssc = new StreamingContext(sc, Durations.seconds(10))
val lines = ssc.socketTextStream("hadoop102", 999)
val words=lines.flatMap(line=>line.split(" "))
val pairWords=words.map(word=>new Tuple2(word,1))
val result= pairWords.reduceByKey((v1,v2)=>{v1 + v2})
result.print()
ssc.start()
ssc.awaitTermination()
}
}
数据来源
3.foreachRDD算子
1.foreachRDD可以获取DStream中的RDD,可以对RDD使用RDD的算子操作,但是一定要使用RDD的action算子触发执行
result.foreachRDD((rdd: RDD[(String, Int)]) => {
val rdd1: RDD[String] = rdd.map(tp => {
println("======="+tp)
tp._1 + "=" + tp._2
})
rdd1.count()
})
4.transform transformation类算子,对Dstream做RDD到RDD的任意操作
5.updateStateByKey transformation类算子,对每一个key的状态进行更新
6.Driver HA
第一:提交任务层面,在提交任务的时候加上选项 --supervise,当Driver挂掉的时候会自动重启Driver。 第二:代码层面,使用JavaStreamingContext.getOrCreate(checkpoint路径,JavaStreamingContextFactory)
|