spark-Streaming
1、SparkStreaming简介
- SparkStreaming是流式处理框架,是Spark API(RDD)的扩展,支持可扩展、高吞吐量、容错的准实时数据流处理 。
- 实时数据的来源可以是:Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,在接受数据同时可以使用高级功能的复杂算子来处理流数据。
- 最终处理后的数据可以存放在文件系统,数据库等,方便实时展现。
2、SparkStreaming与Storm的区别
- 处理模型以及延迟
- 都提供了可扩展性(scalability)和可容错性(fault tolerance),
- Storm可以实现亚秒级时延的处理,而每次只处理一条event,而 Spark Streaming可以在一个短暂的时间窗口里面处理多条(batches)Event。
- 说Storm可以实现亚秒级时延的处理,而Spark Streaming则有一定的时延。
- 容错和数据保证
- 在Storm中,每条记录在系统的移动过程中都需要被标记跟踪,所以Storm只能保证每条记录最少被处理一次,但是允许从错误状态恢复时被处理多次。
- Spark Streaming仅仅需要在批处理级别对记录进行追踪,所以他能保证每个批处理记录仅仅被处理一次,
- 实现和编程API
- Storm主要是由Clojure语言实现,Spark Streaming是由Scala实现。
- Storm提供了Java API,同时也支持其他语言的API。 Spark Streaming支持Scala和Java语言 (其实也支持Python)。
- 批处理框架集成
- 它是在Spark框架上运行的。这样你就可以想使用其他批处理代码一样来写Spark Streaming程序,或者是在Spark中交互查询。这就减少了单独 编写流批量处理程序和历史数据处理程序。
- 生产支持
3、SparkStreaming流式计算
3.1. 流式计算过程
- Spark Streaming是将流式计算分解成一系列短小的批处理作业(batch)。
- 批处理引擎是Spark Core,按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每段数据都转换成Spark中的RDD(Resilient Distributed Dataset)
- Streaming中对DStream的Transformation操作变为针对Spark中对RDD的 Transformation操作
- 将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结 果进行叠加或者存储到外部设备。
-
Spark Streaming在内部的处理机制:
- 接收实时流的数据,并根据一定的时间间隔拆分成一批批的数据,然后通过Spark Engine处 理这些批数据,最终得到处理后的一批批结果数据。
- 对应的批数据,在Spark内核对应一个RDD实例,因此,对应流数据的DStream可以看成是一 组RDDs,即RDD的一个序列。
- 通俗点理解的话,在流数据分成一批一批后,通过一个先进先出的队列,然后 Spark Engine 从该队列中依次取出一个个批数据,把批数据封装成一个RDD,然后进行处理,这是一个典型的生产者消费者模型,对应的就有生产者消费者模型的问题,即如何协调生产速率和消费速 率。
-
receiver task
-
-
receiver task是一直在执行,一直接受数据,将一段时间内接收来的数据保 存到batch中。 -
假设batchInterval 为 5s,那么会将接收来的数据每隔 5 秒封装到一个 batch 中 -
batch 没有分布式计算特性,这一个batch的数据又被封装到一个RDD中最终封装到一个 DStream中,然后sparkStreaming回启动一个job去计算.
3.2. 流式计算特性
- 容错性
- 每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系 (lineage)
- 只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是可以 利用原始输入数据通过转换操作而重新算出的。
- 每一行最后一个 RDD则表示每一个Batch Size所产生的中间结果RDD。都是通过lineage相连接的。
- 实时性
- Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程。
- 最小的Batch Size的选取在0.5~2秒钟之间(Storm 目前最小的延迟是100ms左右)
- 扩展性与吞吐量
- 已能够线性扩展到100个节点(每个节点4Core),可以以数秒的延迟处理 6GB/s的数据量(60M records/s),其吞吐量也比流行的Storm高2~5倍
3.3. 编程模型DStream
- DStream(Discretized Stream)作为Spark Streaming的基础抽象,它代表持续性的数据流。
- DStream由一组时间序列上连续的RDD来表示。每个RDD都包含了自己特定时间间隔内的数据流。
- DStream与RDD
- Spark Streaming 计算还是基于Spark Core的,Spark Core 的核心又是RDD. 所以Spark Streaming 肯定也要和RDD扯上关系。
- Spark Streaming 并没有直接让用户使用RDD而是自己抽象了一套DStream的概念。
- DStream 和 RDD 是包含的关系,你可以理解为Java里的装饰模式,也就是DStream 是对 RDD的增强,但是行为表现和RDD是基本上差不多的。
4、SparkStreaming代码实现
4.1. 代码实现
-
pom.xml
-
代码实现
-
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("App")
val streamingContext = new StreamingContext(sparkConf, streaming.Seconds(5))
val value: ReceiverInputDStream[String] = streamingContext.socketTextStream("localhost", 9999)
val dStream: DStream[(String, Int)] = value.flatMap(_.split("\\s")).map((_, 1)).reduceByKey((x, y) => x + y)
dStream.print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop(false)
}
4.2. DStream转换操作
- transform(func)
- 通过对源DStream的每RDD应用RDD-to-RDD函数返回一个 新的DStream,这可以用来在DStream做任意RDD操作。
- updateStateByKey(func)
- 该 updateStateByKey 操作可以让你保持任意状态,同时不断有新的信息进行更新。要使用此功能,必须进行两个步骤 :
- 定义状态 - 状态可以是任意的数据类型。
- 定义状态更新函数 - 用一个函数指定如何使用先前的状态和从输入流中获取的新值 更新状态
- 使用到updateStateByKey要开启checkpoint机制和功能。
4.3. DStream窗口操作
-
在Spark Streaming中,数据处理是按批进行的,而数据采集是逐条进行的,因此在Spark Streaming中会先设置好批处理间隔(batch duration),当超过批处理间隔的时候就会把采 集到的数据汇总起来成为一批数据交给系统去处理。 -
窗口操作而言,在其窗口内部会有N个批处理数据,批处理数据的大小由窗口间隔 (window duration)决定,而窗口间隔指的就是窗口的持续时间,只有窗口 的长度满足了才会触发批数据的处理。 -
有另一参数就是滑动间隔(slide duration),它指的是经过多长时间窗口滑动一次形成新的窗口,滑动窗口默认情况下和批次间隔的相同, -
-
批处理间隔是1个时间单位,窗口间隔是3个时间单位,滑动间隔是2个时间单位。 -
窗口长度和滑动间隔必须是batchInterval的整数倍。如果不是整数倍会检测报错。 -
window(windowLength, slideInterval)
- – 窗口总长度(window length):你想计算多长时间的数据
- – 滑动时间间隔(slide interval):你每多长时间去更新一次
-
window(windowLength, slideInterval) 返回一个基于源DStream的窗口批次计算后得到新的DStream。 -
countByWindow(windowLength,slideInterval) 返回基于滑动窗口的DStream中的元素的数量。 -
reduceByWindow(func, windowLength,slideInterval) 基于滑动窗口对源DStream中的元素 进行聚合操作,得到一个新的 DStream。 -
countByValueAndWindow(windowLength,slideInterval, [numTasks]) 基于滑动窗口计算源DStream中每个 RDD内每个元素出现的频次并返回 DStream[(K,Long)],其中K是RDD 中元素的类型,Long是元素频次。
4.4. DStream输出操作
- print() 打印出DStream中数据的前10个元素。
- saveAsTextFiles(prefix, [suffix]) 将DStream中的内容以文本的形式保存为文本文件,件以prefix-TIME_IN_MS[.suffix]的 方式命名。
- saveAsObjectFiles(prefix, [suffix]) 按对象序列化并且以SequenceFile的格式保存。
- saveAsHadoopFiles(prefix, [suffix]) 容以文本的形式保存为Hadoop文件,
- foreachRDD(func) 将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统。
5、SparkStreaming数据源
5.1. 基础数据源
- streamingContext.socketTextStream()方法,可以通过 TCP 套接字连接,从文本数据中创建了一个 DStream。
- streamingContext.**fileStream(dataDirectory)方法可以从任何文件系统 (如:HDFS、S3、NFS 等)**的文件中读取数据。
- streamingContext.textFileStream(dataDirectory)来读取简单的文本件。
- streamingContext.actorStream(actorProps, actor-name)可以基于自定 义 Actors 的流创建DStream。
- treamingContext.queueStream(queueOfRDDs)方法可以创建基于 RDD 队列的DStream。
5.2. 高级数据源
- Twitter:Spark Streaming的TwitterUtils工具类使用Twitter4j。
- Flume:Spark Streaming可以从Flume中接受数据。
- Kafka:Spark Streaming可以从Kafka中接受数据。
- Kinesis:Spark Streaming可以从Kinesis中接受数据。
7、SparkStreaming2.2(包含以前)+Kafka0.8
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>2.4.6</version>
</dependency>
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("stream05_kafka")
.set("spark.streaming.stopGracefullyOnShutdown","true")
val streamingContext = new StreamingContext(sparkConf, streaming.Seconds(3))
val kafkaPar: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "yjx_bigdata",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (true: lang.Boolean)
)
val topics: Array[String] = Array("userlog")
val kfDStream: InputDStream[ConsumerRecord[String, String]]
= KafkaUtils.createDirectStream(streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaPar))
val result: DStream[(String, Int)] = kfDStream.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop(false)
}
8、Kafka接受数据方式
- Kafka与Spark Streaming集成时有两种方法:旧的基于receiver的方法,新的基于direct stream的 方法。
8.1. 基于receiver的方法
- 基于receiver的方法采用Kafka的高级消费者API,每个executor进程都不断拉取消息,并同时保存在executor内存与HDFS上的预写日志(write-ahead log/WAL)。当消息写入WAL后, 自动更新ZooKeeper中的offset。
- 它可以保证at least once语义,但无法保证exactly once语义。虽然引入了WAL来确保消息不会丢失,但还有可能会出现消息已经写入WAL,但offset更新失败的情况,Kafka就会按上一次的offset重新发送消息。
- receiver的并行度是由spark.streaming.blockInterval来决定的,默认为200ms;
- 假设 batchInterval为5s,那么每隔5s就会产生一个block,这里就对应每批次产生RDD的 partition,这样5秒产生的这个Dstream中的这个RDD的partition为25个,并行度就是25。如 果想提高并行度可以减少blockInterval的数值,但是最好不要低于50ms。
8.2. 基于direct stream的方法
-
基于direct stream的方法采用Kafka的简单消费者API,它的流程大大简化了。executor不再从Kafka中连续读取消息,也消除了receiver和WAL。还有一个改进就是Kafka分区与RDD分区是一一对应的,更可控。 -
driver进程只需要每次从Kafka获得批次消息的offset range,然后executor进程根据offset range去读取该批次对应的消息即可。由于offset在Kafka中能唯一确定一条消息,且在外部只 能被Streaming程序本身感知到,因此消除了不一致性,达到了exactly once。 -
由于它采用了简单消费者API,我们就需要自己来管理offset。 -
Direct模式的并行度是由读取的kafka中topic的partition数决定的。
8.3. 偏移量offset处理
- SparkStreaming Kafka 维护offset 官网有三种实现方式
- Checkpoints
- Kafka itself
- Your own data store 自定义
8.3.1. CheckPoint
- spark streaming里面管理偏移量的策略,默认的spark streaming它自带管理的offset的方式是通过checkpoint来记录每个批次的状态持久化到HDFS中,如果机器发生故障,或者程序故障停止, 下次启动时候,仍然可以从checkpoint的目录中读取故障时候rdd的状态,便能接着上次处理的数据继续处理。
- checkpoint方式最大的弊端是如果代码升级,新版本的jar不能复用旧版本的序列化状态,导致两个版本不能平滑过渡,结果就是要么丢数据,要么数据重复。
- checkpoint第一次持久化的时候会把整个相关的jar给序列化成一个二进制文件,每次重启都会从里面恢复,但是当你新的程序打包之后序列化加载的仍然是旧的序列化文件,这就会导致报错或者依旧执行旧代码。
- 一旦你删除了旧的checkpoint,新启动的程序,只能从kafka的smallest或者largest的偏移量消 费,默认是从最新的。
8.3.2. Kafka Itself
- kafka本身就有机制可以定时存储消费者分组的偏移量,但是这样会有重复消费的情况,还如果采用这种方式那么就是将kafka的offset全部交给kafka管理
- kafka的的数据其实也是内存和磁盘存储的,如果数据量上来了,无疑也是对kafka集群的一种压力。
- 因为我们项目在实际开发中的时候,遇到数据峰值很高的时候kafka集群的磁盘io是特别高的,这样是非常不安全的。
8.3.3. Your own data store
- 自己写代码管理offset,其实就是把每批次offset存储到一个外部的存储系统里面包括(Hbase,HDFS,Zookeeper,Kafka,DB等等)
- 当一个新的spark streaming+kafka的流式项目第一次启动的时候,这个时候发现外部的存储系统并没有记录任何有关这个topic所有分区的偏移量,所以就从 KafkaUtils.createDirectStream直接创建InputStream流,默认是从最新的偏移量消费,如果 是第一次其实最新和最旧的偏移量时相等的都是0,然后在以后的每个批次中都会把最新的offset给存储到外部存储系统中,不断的做更新。
- 当流式项目停止后再次启动,会首先从外部存储系统读取是否记录的有偏移量,如果有的话, 就读取这个偏移量,然后把偏移量集合传入到KafkaUtils.createDirectStream中进行构建 InputSteam,这样的话就可以接着上次停止后的偏移量继续处理,
9、数据的反压机制
? 数据流入的速度远高于数据处理的速度,对流处理系统构成巨大的负载压力,如果不能正确处理, 可能导致集群资源耗尽最终集群崩溃,因此有效的反压机制(backpressure)对保障流处理系统的稳定至关重要。
9.1. Storm反压
- 旧版本
- 开启了acker机制的storm程序,可以通过设置conf.setMaxSpoutPending参数来实现反压效果,如果下游组件(bolt)处理速度跟不上导致spout发送的tuple没有及时确认的数超过了参数设定的值,spout会停止发送数据 。
- 但是conf.setMaxSpoutPending参数的设置很难达到最好的反压效果
- 设小了会导致吞吐上不去
- 设大了会导致worker OOM;有震荡,数据流会处于一个颠簸状态,效果不如逐级反 压;
- 另外对于关闭acker机制的程序无效;
- 新版本
- storm自动反压机制(Automatic Back Pressure)通过监控bolt中的接收队列的情况,当超过高水位值时专门的线程会将反压信息写到 Zookeeper ,Zookeeper上的watch会通知该拓扑的所有Worker都进入反压状态,最后Spout降低tuple发送的速度。
9.2. Spark反压
-
旧版本
-
Spark Streaming程序中当计算过程中出现batch processing time > batch interval的情况时,(其中batch processing time为实际计算一个批次花费时间,batch interval为Streaming 应用设置的批处理间隔) -
意味着处理数据的速度小于接收数据的速度,如果这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题(如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟) -
可以通过设置参数spark.streaming.receiver.maxRate来限制Receiver的数据接收速率,此举 虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。 -
比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。 -
新版本 -
Spark Streaming 从v1.5开始引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力 。 -
Spark Streaming Backpressure: 根据JobScheduler反馈作业的执行信息来动态调整 Receiver数据接收率。 -
-
spark streaming的数据源方式有两种:
- 若是基于Receiver的数据源,可以通过设置spark.streaming.receiver.maxRate来控制最大输入速率;
- 若是基于Direct的数据源(如Kafka Direct Stream),则可以通过设置 spark.streaming.kafka.maxRatePerPartition来控制最大输入速率。
-
反压原理
- RateController 继承自接口StreamingListener,并实现了onBatchCompleted方 法。每一个Batch处理完成后都会调用此方法。记录批处理的处理时间与数量。
- RateEstimator 是速率估算器,主要用来估算最大处理速率。
- RateLimiter 是一个抽象类,实质上是一个限流器,也可以叫做令牌,
- 如果Executor中task每秒计算的速度大于该值则阻塞,如果小于该值则通过将流数据加入缓存中进行计算。这种机制也可以叫做令牌桶机制。
- 令牌桶机制: 大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。当进行某操作时需要令牌时会从令牌桶中取出相应的令牌数,如果获取到则继续操作,否则阻塞。用完之后不用放回。
- 注意:
- 反压机制真正起作用时需要至少处理一个批。
- 保证反压机制真正起作用前应用不会崩溃,需要控制每个批次最大摄入速率。
10、SparkStreaming事务
10.1. 逻辑处理
- Spark容错分为:Driver级别的容错和Executor级别的容错。
- 在Driver级别的容错具体为DAG生成的模板,即DStreamGraph,存储着RecevierTracker中存储的元数据信息和JobScheduler中存储的Job进行的进度情况等信息,只要通过checkpoint就可以了,每个Job生成之前进行checkpoint,在Job生成之后再进行checkpoint,如果出错的话就从checkpoint中恢复。
- 在Executor级别的容错具体为接收数据的安全性和任务执行的安全性。在接收数据安全性方面,一种方式是Spark Streaming接收到数据默认为MEMORY_AND_DISK_2的方式,另外一种方式是WAL(Write Ahead Log),在 数据到来时先通过WAL机制将数据进行日志记录,如果有问题则从日志记录中恢复。
- Spark Streaming的容错机制是基于RDD的容错机制。
- checkpoint 。
- 基于血统(lineage)的高度容错机制 。
- 出错了之后会从出错的位置从新计算,而不会导致重复计算。
|