IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark Streaming官网重点整理 -> 正文阅读

[大数据]Spark Streaming官网重点整理

一.Spark Streaming

1.Input DStreams and Receivers

Receiver:每个输入DStream(除了文件流)都与一个Receiver对象相关联,这个对象接收来自源的数据,并将其存储在Spark的内存中进行处理。
运行Receiver需要增加一个虚拟核,即 setMaster(“local[2]”)。其实虚拟核就对应一个线程。

文件流不需要运行Receiver,因此不需要为接收文件数据分配任何core。

  • Basic sources:HDFS,socket
val lines = ssc.textFileStream("/imooc-workspace/tmp/ss")

val lines = ssc.socketTextStream("hadoop000", 9527)
  • Advanced source:Kafka
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

2.Receiver Reliability

根据数据源的可靠性,可以有两种数据源。数据源(如Kafka)允许确认传输的数据。如果从这些可靠来源接收数据的系统正确地确认了接收到的数据,就可以确保不会因为任何类型的故障而丢失数据。这就导致了两种类型的Receiver:

  • Reliable Receiver:当数据被接收并存储在Spark中并进行复制时,一个可靠的接收方会正确地向一个可靠的源发送确认。
  • Unreliable Receiver:不可靠的接收方不向源发送确认信息。这可以用于不支持确认的源,甚至当不希望或不需要了解确认的复杂性时,也可以用于可靠的源。

3.Transform Operation

DStream join RDD,常见于过滤黑名单场景。

    val data = List("pk")
    val dataRDD = ssc.sparkContext.parallelize(data).map(x => (x,true))

    // 这里的编程模型是DStream   DStream join RDD 
    lines.map(x => (x.split(",")(1), x))     //20221212,pk => (pk , 20221212,pk )
      .transform(y => {
        y.leftOuterJoin(dataRDD)
          .filter(x => {
            x._2._2.getOrElse(false) != true
          }).map(x=>x._2._1)
      }).print()

4.UpdateStateByKey Operation

使用updateStateByKey需要配置checkpoint目录,来保存以前的状态。
这个操作会产生大量的文件,可以不用这个算子,用redis来存储,然后累加。

ssc.checkpoint("pk-ss")  

val result = lines.flatMap(_.split(",")).map((_, 1))
  .updateStateByKey[Int](updateFunction _)
result.print()

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {

  // 使用新值结合已有的老的值进行fun的操作
  val current: Int = newValues.sum
  val old: Int = runningCount.getOrElse(0)

  Some(current + old)
}

5.Output Operations on DStreams

错误写法一:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

因为这需要connection对象被序列化,并从驱动程序发送到worker。这样的connection对象很少可以跨机器转移。这个错误可能表现为序列化错误(connection对象不能序列化)、初始化错误(connection对象需要在worker上初始化),等等。正确的解决方案是在worker上创建连接对象。

错误写法二:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

为每个记录创建和销毁connection对象可能会导致不必要的高开销,并会显著降低系统的总吞吐量。

最好的写法:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

6.DataFrame and SQL Operations

每个RDD转化为DataFrame再进行操作。

word.foreachRDD(rdd => {

  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame =
    spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
})

7.Checkpointing

流式应用程序必须 24/7 全天候运行,因此必须能够应对与应用程序逻辑无关的故障(例如,系统故障、JVM 崩溃等)。 为此,Spark Streaming 需要检查点足够的信息到容错存储系统,以便它可以从故障中恢复。
检查点有两种类型的数据:

  • Metadata checkpointing:将定义流计算的信息保存到 HDFS 等容错存储中。 这用于从运行流应用程序驱动程序的节点故障中恢复(稍后详细讨论)。 元数据包括:
  1. Configuration:用于创建流应用程序的配置。
  2. DStream operations:定义流应用程序的DStream操作集。
  3. Incomplete batches:作业已排队但尚未完成的批。
  • Data checkpointing:将生成的 RDD 保存到可靠的存储中。 这在一些跨多个批次组合数据的有状态转换中是必要的。 在这样的转换中,生成的 RDD 依赖于前一批的 RDD,这导致依赖链的长度随着时间的推移而不断增加。 为了避免恢复时间的无限制增加(与依赖链成正比),有状态转换的中间 RDD 会定期检查点到可靠存储(例如 HDFS)以切断依赖链

符合下列要求的应用程序必须启用检查点:

  • Usage of stateful transformations:如果在应用程序中使用updateStateByKey或reduceByKeyAndWindow(带有逆函数),那么必须提供检查点目录来允许周期性的RDD检查点。
  • Recovering from failures of the driver running the application:元数据检查点用于恢复进度信息

当程序第一次启动时,它将创建一个新的StreamingContext,设置所有的流,然后调用start()。
当程序在失败后重新启动时,它将从检查点目录中的检查点数据重新创建一个StreamingContext

请注意,RDD 的检查点会产生保存到可靠存储的成本。 这可能会导致 RDD 获得检查点的那些批次的处理时间增加。 因此,需要仔细设置检查点的间隔。 在小批量(比如 1 秒)下,每批检查点可能会显着降低操作吞吐量。 相反,检查点太少会导致沿袭和任务大小增加,这可能会产生不利影响。 对于需要 RDD 检查点的有状态转换,默认间隔是至少 10 秒的批处理间隔的倍数。 它可以通过使用 dstream.checkpoint(checkpointInterval) 来设置。 通常,一个 DStream 的 5 - 10 个滑动间隔的检查点间隔是一个很好的尝试设置。

8.Performance Tuning

在高层次上,你需要考虑两件事:

  1. 有效利用集群资源,减少每批数据的处理时间。
  2. 设置正确的批处理大小,以便在接收到数据时处理这些数据批(也就是说,数据处理与数据摄入保持同步)。
  • Reducing the Batch Processing Times

  • Setting the Right Batch Interval
    为了使在集群上运行的Spark Streaming应用程序稳定,系统应该能够以接收数据的速度处理数据。换句话说,批量数据的处理速度应该与生成数据的速度一样快。对于一个应用程序来说,这是否正确,可以通过监控流web UI中的处理时间来发现,其中批处理时间应该小于批处理间隔。

  • Memory Tuning

9.Fault-tolerance Semantics

Semantics of Received Data

  1. With Files
    如果所有的输入数据已经存在于像HDFS这样的容错文件系统中,那么Spark Streaming总是可以从任何故障中恢复并处理所有的数据。这提供了exactly-once的语义,这意味着无论什么失败,所有的数据都将精确地处理一次。

  2. With Receiver-based Sources
    1.Reliable Receiver - 这些Receiver只有在确保接收到的数据已被复制后才会向数据源发出确认可靠的信息。如果这样的Receiver失败,数据源将不会收到对缓冲(未复制)数据的确认。因此,如果Receiver重新启动,数据源会重新发送数据,不会因为失败而丢失数据。
    2.Unreliable Receiver - 这样的Receiver不发送确认,因此当它们由于工人或驱动故障而失败时,可能会丢失数据。

为了避免丢失过去接收到的数据,Spark 1.2引入了提前写日志,将接收到的数据保存到容错存储中。通过启用预写日志和可靠的Receiver,数据不会丢失。在语义方面,它提供了at-least once保证。
拥有Reliable Receiver,worker挂了没事(因为会重传),driver挂了,所有Receiver都会丢失过去的数据,所以加入write-ahead logs,可实现零数据丢失,提供了at-least once保证。

Semantics of output operations

  1. 幂等更新:多次尝试总是写入相同的数据。例如,saveAs***Files总是将相同的数据写入生成的文件。
  2. 事务性更新:所有更新都以事务性方式进行,因此更新仅以原子性方式进行一次。使用下面两个步骤。
  • 使用批量时间(foreachRDD中可用)和RDD的分区索引创建标识符。这个标识符唯一地标识流应用程序中的一个blob数据。
  • 使用该标识符以事务方式(即仅一次原子方式)使用该blob更新外部系统。也就是说,如果标识符还没有提交,则原子地提交分区数据和标识符。否则,如果已经提交,则跳过更新。
dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  }
}

二.Spark Streaming + Kafka Integration Guide

Kafka 0.10的Spark Streaming集成提供了简单的并行性,Kafka分区和Spark分区之间1:1的对应,以及对偏移量和元数据的访问。然而,由于新的集成使用了新的Kafka消费者API而不是简单的API,所以在使用上有显著的差异。

1.LocationStrategies

新 Kafka 消费者 API 会将消息预先拉取到缓冲区中, Spark 会在 executor 上缓存消费者(而不是在每个批次上(batch)创建消费者),并且优先在 有更合适的消费者 所在的主机上安排分区。

  1. 实际中, 大多数情况下使用如上所示的 LocationStrategies.PreferConsistent, 将在可用的 executors 上均匀分布分区。

  2. 如果 executor 与 Kafka 的代理节点在同一台物理机上,使用 PreferBrokers,会更倾向于在该节点上安排 KafkaLeader 对应的分区。

  3. 如果发生分区之间数据负载倾斜,使用 PreferFixed。可以指定分区和主机之间的映射(任何未指定的分区将使用相同的位置)

为消费者提供的最大缓存数为 64,如果希望有处理超过(64*executor 的数量)的 kafka 的分区,可以使用 spark.streaming.kafka.consumer.cache.maxCapacity 参数修改

如果不希望应用 Kafka 消费者的缓存策略,可以将 spark.streanig.kafka.consumer.cache.enabled 设置为 false。

缓存是将 topic 的分区和 groupid 作为 key,因此每次调用 createDirectStream 都需要使用一个单独的 group.id。

2.Obtaining Offsets

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
}

请注意,对HasOffsetRanges的类型转换只有在createDirectStream的结果上调用第一个方法时才会成功,而不是在后面的方法链中。注意RDD分区和Kafka分区之间的一对一映射在shuffle或repartition方法之后不会保留,例如reduceByKey()或window()。

3.Storing Offsets

失败情况下的 Kafka 交付语义取决于偏移量的存储方式和时间。 Spark 输出操作是at-least-once。 因此,如果您想要exactly-once语义,您必须要么在幂等输出之后存储偏移量要么将偏移量存储在与业务输出一起的原子事务中。 通过这种集成,您有 3 个选项,以提高可靠性(和代码复杂性)的顺序来存储偏移量。

Checkpoints
如果启用 Spark 检查点,偏移量将存储在检查点中。 这很容易启用,但也有缺点。 你的输出操作必须是幂等的,因为你会得到重复的输出; 事务不是一种选择。 此外,如果您的应用程序代码已更改,您将无法从检查点恢复。 对于计划中的升级,您可以通过在运行旧代码的同时运行新代码来缓解这种情况(因为无论如何输出都需要是幂等的,它们不应该发生冲突)。 但是对于需要更改代码的计划外故障,除非您有另一种方法来识别已知的良好起始偏移量,否则您将丢失数据。

Kafka itself
Kafka 有一个偏移提交 API,它将偏移存储在一个特殊的 Kafka 主题中。 默认情况下,新消费者将定期自动提交偏移量。 这几乎肯定不是您想要的,因为消费者成功轮询的消息可能尚未导致 Spark 输出操作,从而导致未定义的语义。 这就是上面的流示例将“enable.auto.commit”设置为 false 的原因。 但是,在知道输出已存储后,您可以使用 commitAsync API 将偏移量提交到 Kafka。 与检查点相比的好处是,无论您的应用程序代码如何更改,Kafka 都是一个持久存储。 但是,Kafka 不是事务性的,因此您的输出仍然必须是幂等的

Your own data store
对于支持事务的数据存储,将偏移量保存在与结果相同的事务中可以使两者保持同步,即使在失败情况下也是如此。 如果您小心检测重复或跳过的偏移范围,回滚事务可防止重复或丢失的消息影响结果。 这给出了完全一次语义的等价物。 即使对于聚合产生的输出,也可以使用这种策略,这通常很难做到幂等

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-11 22:17:06  更:2022-03-11 22:20:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 18:39:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码