一、简介
Spark处理的是批量的数据(离线数据),Spark Streaming实际上是对接的外部数据流之后按照时间切分,批处理一个个切分后的文件,和Spark处理逻辑是相同的。 Dstream:Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream 假如外部数据不断涌入,按照一分钟切片,每个一分钟内部的数据是连续的(连续数据流),而一分钟与一分钟的切片却是相互独立的(离散流)。 DStream是Spark Streaming特有的数据类型。 Dstream可以看做一组RDDs,即RDD的一个序列: 将连续的数据持久化,离散化,然后进行批量处理。 持久化:接收到的数据暂存。做容错,当数据流出错了,因为没有得到计算,需要把数据从源头进行回溯,暂存的数据可以进行恢复。 离散化:按时间分片,形成处理单元。 分片处理:分批处理。
二、总结
park Streaming称之为微批处理 做这个微批处理主要体现在三个部分:1.数据的读取,2.数据的计算,3.计算结果 的输出
2.1.数据的读取:
Spark Streaming接收Kafka、Flume、HDFS等各种来源的实时输入数据,进行处理后,处理结构保存在HDFS、DataBase等各种地方。
2.1.数据的计算:
Spark Streaming将接收到的实时流数据,按照一定时间间隔,对数据进行拆分,交给Spark Engine引擎计算,最终得到一批批的结果 任何对DStream的操作都会转变为对底层RDD的操作(通过算子):
- transformation 转换算子:
reduce,count算子不会直接触发Dstreami计算。 - output 执行算子(输出算子):
·Print · saveAsObjectFile、saveAsTextFile、saveAsHadoopFiles:将一批数据输出到Hadoop文件系统中,用批量数据的开始时间 戳来命名 · forEachRDD:允许用户对DStream的每一批量数据对应的RDD本身做任意操作
2.3.数据的写出:
通过Print、saveAsTextFile、saveAsHadoopFiles等输出算子进行计算结果的输出
(1)print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。
(2)saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。`res.saveAsTextFiles(“file:///f:/result/a”) (3)saveAsObjectFiles(prefix,[suffix]):以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]".
(4)foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。注意:函数func在运行流应用的驱动中被执行,同时其中一般函数RDD操作从而强制其对于流RDD的运算。
三、简单应用之从不同数据源读取数据进行计算然后输出:
3.1从LINUX的一个端口中使用nc发送数据,然后读取
- 在IDEA中创建一个maven项目
- 导入依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.3</version>
</dependency>
- 编写测试代码:
object StreamWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lineStreams = ssc.socketTextStream("hadoop10", 9999)
val wordStreams = lineStreams.flatMap(_.split(" "))
val wordAndOneStreams = wordStreams.map((_, 1))
val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)
wordAndCountStreams.print()
ssc.start()
ssc.awaitTermination()
}
}
- 运行IDEA程序,然后打开hadoop10,使用nc,往9999端口发送数据
hello hello hello world
hello world
3.2从hadoop10的HDFS文件系统上读取
val sparkConf = new SparkConf().setMaster("local[*]")
.setAppName("FileStream")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val dirStream = ssc.textFileStream(" hdfs://hadoop10:9000/xx ")
val wordStreams = dirStream.flatMap(_.split("\t"))
val wordAndOneStreams = wordStreams.map((_, 1))
val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)
wordAndCountStreams.print()
ssc.start()
ssc.awaitTermination()
3.3从Kafka读取数据
- 导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.3</version>
</dependency>
2.编写代码:
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val brokers = "hadoop10:9092,hadoop11:9092,hadoop12:9092"
val consumerGroup = "g1"
val kafkaParam: Map[String, String] = Map[String, String](
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.GROUP_ID_CONFIG -> consumerGroup,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers
)
val records: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](List("topica"), kafkaParam))
records.map(record=>record.value)
.flatMap(line=>line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.print()
ssc.start()
ssc.awaitTermination()
四、work在工作的时候 Executor从哪个分区中拿数据进行计算
- LocationStrategies.PreferConsistent策略
SparkStreaming读取数据使用 LocationStrategies.PreferConsistent 这种策略 , 这种策略会将kafka的分区均匀的分布在集群的Executor之间。【推荐】
- LocationStrategies.PreferBrokers
如果Executor在kafka 集群中的某些节点上,可以使用 LocationStrategies.PreferBrokers这种策略 那么当前这个Executor 中的数据会来自当前broker节点。
如果Executor和Kafka Broker在同一主机,则可使用此策略。
- LocationStrategies.PreferFixed
如果节点之间的分区有明显的分布不均,可以使用 LocationStrategies.PreferFixed 这种策略, 可以通过一个map 指定将topic分区分布在哪些节点中。
五、DStream中的状态转换
分为两种状态,一种是无状态转化,一种是有状态转化
无状态转化就是指每个批次的RDD在计算完之后就输出什么的,不涉及到下一个批次的计算
有状态转化指,上一个批次处理完得到的结果下个批次的数据在计算时候也需要使用
pdateStateByKey原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件 更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。 updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。 updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,你需要做下面两步:
- 定义状态,状态可以是一个任意的数据类型。
- 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。 例子:
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StateWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("file:///f:/streamCheck")
val lines = ssc.socketTextStream("spark0", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val stateDstream = pairs.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
|