IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark3.0源码分析-数据本地化优先级及延迟调度策略 -> 正文阅读

[大数据]Spark3.0源码分析-数据本地化优先级及延迟调度策略

getPreferredLocations

在了解spark本地话和延迟调度之前先来看下RDD的五个重要属性

image-20220621115129375

其中getPreferredLocations用来获取rdd每一个分区的优先级列表位置,由不同的子类RDD分别实现,仅从字面意思就知道这个函数将和数据本地化密切相关:

此函数返回类型为 Seq[String],其实对应的是 Seq[TaskLocation],在返回前都会执行 TaskLocation#toString 方法。TaskLocation 是一个 trait,共有以三种实现,分别代表数据存储在不同的位置:

val inMemoryLocationTag = "hdfs_cache_"

// Identify locations of executors with this prefix.
val executorLocationTag = "executor_"

/**
 * 代表数据存储在 executor 的内存中,也就是这个 partition 被 cache到内存了
 */
private [spark]
case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
  extends TaskLocation {
  override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}

/**
 * 代表数据存储在 host 这个节点的磁盘上
 */
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {
  override def toString: String = host
}

/**
 * 代表数据存储在 hdfs 上
 */
private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {
  override def toString: String = TaskLocation.inMemoryLocationTag + host
}
  • ExecutorCacheTaskLocation: 代表 partition 数据已经被 cache 到内存,比如使用cache函数到内存,其 toString 方法返回的格式如 executor_$host_$executorId
  • HostTaskLocation:代表 partition 数据存储在某个节点的磁盘上,其 toString 方法直接返回 host
  • HDFSCacheTaskLocation:代表 partition 数据存储在 hdfs 上,比如从 hdfs 上加载而来的 HadoopRDD 的 partition,其 toString 方法返回的格式如 hdfs_cache_$host

这样,我们就知道不同的 RDD 会有不同的优先位置,并且不同的位置字符串的格式是不同的,这在之后 TaskSetManager 计算 tasks 的最优本地性会有作用。

那返回值为何会是一个Seq集合?其实很好理解,比方说一个hadoopRDD只有一个分区,读取一个hdfs上的文件,且此文件大小正好是一个block块,且副本为三,则在集群中,此hadoopRDD的唯一分区应该有三个位置:Seq[1.5、1.6、1.7]

image-20220701143021335

为了更好的理解,接下来看四个具有代表性的RDD的getPreferredLocations函数实现:

HadoopRDD :对应sc.textFile(“hdfs://xxxxx”)

MapPartitionsRDD:对应map/filter 等转换操作

ShuffledRDD:对应reduceByKey / groupByKey 等shuffle

JdbcRDD:对应jdbc等需要远程获取的rdd

HadoopRDD

直接调用hadoop包下的InputSplitWithLocationInfo的getLocations方法获得所在的位置

再经过convertSplitLocationInfo函数进行转换成spark的TaskLocation:

根据hadoop返回的分片类中的状态创建spark的HDFSCacheTaskLocation、HostTaskLocation,最终都调用toString返回字符串

MapPartitionsRDD

**可以看出MapPartitionsRDD没有实现getPreferredLocations函数,默认是父类RDD的getPreferredLocations = Nil; **

这是由于map/filter这类操作不存在位置偏好,在spark调度时发现getPreferredLocations = Nil后会不断向上查找血缘,直到找到实现getPreferredLocations的RDD;

image-20220701140551138

ShuffledRDD

shuffledRdd比较特殊,这是由于shuffle过程中涉及到了数据spill到磁盘的过程,故下游reduce算子需要找到

map-write阶段溢写到磁盘的位置,但这个位置只靠血缘是有可能找不到,因为DAG会划分stage阶段,以及任务重试等可能导致下游stage无法知晓上游shuffle-write的位置;

那么spark是通过一个mapOutputTracker组件统一解决shuffle问题:

MapOutputTracker是spark环境的主要组件之一,其功能是管理各个shuffleMapTask的输出数据。reduce任务就是根据MapOutputTracker提供的信息决定从哪些executor获取需要的map输出数据。关于MapOutputTracker的实现这里不过多赘述,只需了解ShuffledRDD的位置偏好获取即可;

image-20220701141036116

JdbcRDD

像这类JDBC操作的rdd,内部没有实现getPreferredLocations函数,默认是父类RDD的getPreferredLocations = Nil;

这么做其实很好理解,jdbc这类算子是远程访问,故其所在位置哪里都一样

image-20220701142539570

submitMissingTasks

了解了以上内容我们再来看DAGScheduler 生成 taskSet过程

DAGScheduler 通过调用 submitStage 来提交一个 stage 对应的 tasks,submitStage 会找到还未执行的missTasks,然后调用submitMissingTasks

image-20220701145027201

在函数中首先会根据stage的类型,找到未执行的partition的位置偏好,如下图:

image-20220701145027201

接下来看getPreferredLocs函数,发现是调用getPreferredLocsInternal函数,在getPreferredLocsInternal函数中会根据不同的情况调用rdd的preferredLocations 函数,比如如果有缓存则获取缓存级别的TaskLocation,如果不是缓存则调用rdd的优先级函数;

并且在方法的最后还会判断是否是窄依赖,这里是因为例如mapRDD没有位置偏好,故寻找父类rdd然后递归调用getPreferredLocsInternal函数,直到父类rdd的preferredLocations 函数返回值;

image-20220701150403830

rdd的preferredLocations函数其实是先判断是否使用了checkpoint检查点,如果有检查点直接从检查点获取位置偏好,因为数据在检查点上,如果是没有设置检查点,则直接调用开头部分的 getPreferredLocations函数获取位置集合

image-20220701150708011

这里深入一下,如果有缓存调用的函数:这个函数中如果从blocKManager中能够获取到缓存,则构建的TaskLocation为ExecutorCacheTaskLocation级别

image-20220701162850472
image-20220701162946170

然后我们再回到submitMissingTasks函数中,当我们获取到了val taskIdToLocations: Map[Int, Seq[TaskLocation]] 所有待执行的task的分区id和位置偏好后,在下面构建Task类的时候将会有作用,如下图:
image-20220701151128443

至此DAGScheduler构建Tasks阶段结束,接下来就是将task构建成TaskSet,然后交给TaskScheduler,如下图:

image-20220701151503406

TaskSetManager

由TaskSchedulerImpl来实现submitTask函数,TaskSchedulerImpl 会为每个 taskSet 创建一个 TaskSetManager 对象,该对象包含taskSet 所有 tasks,并管理这些 tasks 的执行,其中就包括计算 taskSetManager 中的 tasks 都有哪些locality levels,以便在调度和延迟调度 tasks 时发挥作用。

image-20220701151803061

image-20220701152033908

而在构建TaskSetManager时会通过addPendingTasks函数将所有的task 调用addPendingTask函数,addPendingTask函数其目的是用于将待处理Task的按照Task的偏好位置,添加到不同的缓存队列中。

image-20220701152719410

五种优先级

addPendingTask函数中会根据各个task不同的TaskLocation来添加到不同的队列中,先来说一下spark的默认五种优先级别:

image-20220620174501515

PROCESS_LOCAL: 数据在同一个 JVM 中,即同一个 executor 上。这是最佳数据 locality。

NODE_LOCAL: 数据在同一个节点上。比如数据在同一个节点的另一个 executor上;或在 HDFS 上,恰好有 block 在同一个节点上。速度比 PROCESS_LOCAL 稍慢,因为数据需要在不同进程之间传递或从文件中读取

NO_PREF: 数据从哪里访问都一样快,不需要位置优先

RACK_LOCAL: 数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比 NODE_LOCAL 慢

ANY: 数据在非同一机架的网络上,速度最慢

接下来看一下再TaskSetManager中将不同的优先级队列,如下图:

可以看出部分是map结构,部分是buffer结构,这是因为noPrefs、all没有最佳位置,故只存放了分区id

而forExecutor的key是executorID, forHost的key是host,forRack的key是机架id

value之所以是buffer集合类型,是因为有可能一个多个task的优先级都在一个Executor上

private[scheduler] class PendingTasksByLocality {

  // 对应PROCESS_LOCAL
  val forExecutor = new HashMap[String, ArrayBuffer[Int]]
  // 对应NODE_LOCAL
  val forHost = new HashMap[String, ArrayBuffer[Int]]
  // 对应NO_PREF
  val noPrefs = new ArrayBuffer[Int]
  // 对应RACK_LOCAL
  val forRack = new HashMap[String, ArrayBuffer[Int]]
  // 对应ANY
  val all = new ArrayBuffer[Int]
}
addPendingTask

接下来再看addPendingTask函数,可以看出如果是ExecutorCacheTaskLocation类型[比如此rdd-cache过],就放入forExecutor队列中

如果是HDFSCacheTaskLocation类型,并且host下存在此ExecutorsId,则放入forExecutor队列中

重点:下面可以发现前面无论放不放到forExecutor队列,都会放入到forHost队列中,如果有forRack存在还会放入到forRack队列中

如果preferredLocations == Nil 代表无偏好,也会放入到fangrudaonoPrefs,并且最终还会放入到all队列中

这是因为这五种Locality级别存在包含关系,RACK_LOCAL包含NODE_LOCAL,NODE_LOCAL包含PROCESS_LOCAL,然而ANY包含其他所有四种。比如,一个Task的preferredLocations指定了在Executor-2上执行,那么它属于Executor-2对应的PROCESS_LOCAL类别,同时也把他加入到Executor-2所在的主机对应的NODE_LOCAL类别,Executor-2所在的主机的机架对应的RACK_LOCAL类别中,以及ANY类别,这样在调度执行时,满足不了PROCESS_LOCAL,就逐步退化到NODE_LOCAL,RACK_LOCAL,ANY

image-20220701153312038

computeValidLocalityLevels

那么在构建好了各种队列后,TaskSetManager还调用了一个computeValidLocalityLevels函数,其目的是对外开放 taskSetManager 中的 tasks 都有哪些locality levels,以便在后续TaskSechduler调度和延迟调度 tasks 时发挥作用。

image-20220701165225872

可以看到下面的computeValidLocalityLevels函数实现并不复杂,显示判断队列是否为空,然后分别判断isExecutorAlive、hasExecutorsAliveOnHost、hasHostAliveOnRack;

这三个函数的目的是为了确保executor / host / rack是存在的,因为在上面addPendingTask函数中只是判断位置偏好后就放入了不同的队列中,有可能某些executor已经不在了,故这里多了一层判断

image-20220701165457983

这里我们挑sched.isExecutorAlive的实现为例子:executorIdToRunningTaskIds保存着所有可运行的executor的key

image-20220701165821630

image-20220701165848597

至此TaskSetManager的 addPendingTask 、 computeValidLocalityLevels函数已经了解了,接下来就到延迟调度策略了

延迟调度:getAllowedLocalityLevel

spark的延迟调度策略主要是getAllowedLocalityLevel函数实现的: 用来返回当前该 taskSetManager 中未执行的 tasks 的最高可能 locality level

private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
    // Remove the scheduled or finished tasks lazily
    def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
      var indexOffset = pendingTaskIds.size
      while (indexOffset > 0) {
        indexOffset -= 1
        val index = pendingTaskIds(indexOffset)
        if (copiesRunning(index) == 0 && !successful(index)) {
          return true
        } else {
          pendingTaskIds.remove(indexOffset)
        }
      }
      false
    }
    // Walk through the list of tasks that can be scheduled at each location and returns true
    // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
    // already been scheduled.
    def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
      val emptyKeys = new ArrayBuffer[String]
      val hasTasks = pendingTasks.exists {
        case (id: String, tasks: ArrayBuffer[Int]) =>
          if (tasksNeedToBeScheduledFrom(tasks)) {
            true
          } else {
            emptyKeys += id
            false
          }
      }
      // The key could be executorId, host or rackId
      emptyKeys.foreach(id => pendingTasks.remove(id))
      hasTasks
    }

    while (currentLocalityIndex < myLocalityLevels.length - 1) {
      val moreTasks = myLocalityLevels(currentLocalityIndex) match {
        case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasks.forExecutor)
        case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasks.forHost)
        case TaskLocality.NO_PREF => pendingTasks.noPrefs.nonEmpty
        case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasks.forRack)
      }
      if (!moreTasks) {
        // This is a performance optimization: if there are no more tasks that can
        // be scheduled at a particular locality level, there is no point in waiting
        // for the locality wait timeout (SPARK-4939).
        lastLaunchTime = curTime
        logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
          s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
        currentLocalityIndex += 1
      } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
        // Jump to the next locality level, and reset lastLaunchTime so that the next locality
        // wait timer doesn't immediately expire
        lastLaunchTime += localityWaits(currentLocalityIndex)
        logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
          s"${localityWaits(currentLocalityIndex)}ms")
        currentLocalityIndex += 1
      } else {
        return myLocalityLevels(currentLocalityIndex)
      }
    }
    myLocalityLevels(currentLocalityIndex)
  }

函数中又包含了两个小函数和一个while循环,循环条件为while (currentLocalityIndex < myLocalityLevels.length - 1)
其中myLocalityLevels: Array[TaskLocality.TaskLocality]是当前 TaskSetManager 的所有 tasks 所包含的本地性(locality)集合,本地性越高的 locality level 在 myLocalityLevels 中的下标越小,上文中已经介绍过了

可以看到除了noPref以外每一个级别都会调用moreTasksToRunIn函数:

val moreTasks = myLocalityLevels(currentLocalityIndex) match {
        case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasks.forExecutor)
        case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasks.forHost)
        case TaskLocality.NO_PREF => pendingTasks.noPrefs.nonEmpty
        case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasks.forRack)
      }

moreTasksToRunIn主要作用是用来判断不同等级队列中是否有需要执行的作业!这是因为有些task可能正在运行或已经运行完毕了,具体实现为:

  1. 对于不同等级的 locality level 的 tasks 列表,将已经成功执行的或正在执行的该 locality level 的 task 从对应的列表中移除
  2. 判断对应的 locality level 的 task 是否还要等待执行的,若有则返回 true,否则返回 false
// 删除正在执行的团任务或完成的任务
    def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
      var indexOffset = pendingTaskIds.size
      while (indexOffset > 0) {
        indexOffset -= 1
        val index = pendingTaskIds(indexOffset)
        if (copiesRunning(index) == 0 && !successful(index)) {
          return true
        } else {
          pendingTaskIds.remove(indexOffset)
        }
      }
      false
    }
    // 遍历调用tasksNeedToBeScheduledFrom函数,如果有需要调用的则返回ture, 如果没有则删除的executorId or host or rackId
    def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
      val emptyKeys = new ArrayBuffer[String]
      val hasTasks = pendingTasks.exists {
        case (id: String, tasks: ArrayBuffer[Int]) =>
          if (tasksNeedToBeScheduledFrom(tasks)) {
            true
          } else {
            emptyKeys += id
            false
          }
      }
      // The key could be executorId, host or rackId
      emptyKeys.foreach(id => pendingTasks.remove(id))
      hasTasks
    }

看完了这两个函数后我们主要看while循环中的逻辑:

while (currentLocalityIndex < myLocalityLevels.length - 1) {
      val moreTasks = myLocalityLevels(currentLocalityIndex) match {
       // 判断 myLocalityLevels(currentLocalityIndex) 这个级别的本地性对应的待执行 tasks 集合中是否还有待执行的 task
        case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasks.forExecutor)
        case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasks.forHost)
        case TaskLocality.NO_PREF => pendingTasks.noPrefs.nonEmpty
        case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasks.forRack)
      }
      if (!moreTasks) {
        // 若无可执行的task:则将 `currentLocalityIndex += 1` 进行下一次循环,即将 locality level 降低一级回到第1步
        lastLaunchTime = curTime
        logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
          s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
        currentLocalityIndex += 1
      } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
        // 若有,且当前时间与lastLaunchTime时间间隔大于localityWaits(currentLocalityIndex)时间,代表超过了延迟调度的极限,即将 locality level 降低一级回到第1步
        lastLaunchTime += localityWaits(currentLocalityIndex)
        logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
          s"${localityWaits(currentLocalityIndex)}ms")
        currentLocalityIndex += 1
      } else {
        return myLocalityLevels(currentLocalityIndex)
      }
    }

整个循环体都在做这几个事情:

1、判断 myLocalityLevels(currentLocalityIndex) 这个级别的本地性对应的待执行 tasks 集合中是否还有待执行的 task

2、若无:则将 currentLocalityIndex += 1 进行下一次循环,即将 locality level 降低一级回到第1步

3、若有,且当前时间与lastLaunchTime【第一次是当前时间,第二次则是上次getAllowedLocalityLevel返回 myLocalityLevels(currentLocalityIndex)]】时间间隔大于localityWaits(currentLocalityIndex)时间,代表超过了延迟调度的极限,则将 currentLocalityIndex += 1 进行下一次循环,即将 locality level 降低一级回到第1步;

这里看一下localityWaits函数:

image-20220701174827170

image-20220701174840277

image-20220701174902145

通过上图可以看出不同的级别对应着不同的时间,参照官网查看默认时间:发现都是默认3s

image-20220701175027010

4、若有,且当前时间在3s的延迟调度等待区间,则返回此等级

函数总结:比如上次以 TaskLocality返回process级别的 taskSetManager 启动 task 失败,说明taskSetManager 中 tasks 对应 TaskLocality 的节点均没有空闲资源来启动 task,但 Spark 这次调用此时仍然会以 TaskLocality 来为 taskSetManager 启动 task。为什么要这样做?一般来说,task 执行耗时相对于网络传输/文件IO 要小得多,调度器多等待1 2秒可能就可以以更好的本地性执行 task,避免了更耗时的网络传输或文件IO,task 整体执行时间会降低

至此getAllowedLocalityLevel函数介绍完毕,看完getAllowedLocalityLevel函数后可能会有疑问,这个函数是谁调用的呢?

使用顺序

接下来我们从上到下看一下调用顺序:首先在 taskScheduler 调用 submitTasks 函数中会为每个 taskSet 创建一个 TaskSetManager,用于管理 taskSet。然后向调度池中添加该 TaskSetManager,最后会调用 backend.reviveOffers() 方法向executor发送请求。

下面主要看 backend.reviveOffers() 这个方法,在提交模式是 yarn-cluster 模式下,实际上是调用 YarnClusterSchedulerBackend 的 reviveOffers 方法,实则调用的是其父类 CoarseGrainedSchedulerBackend 的 reviveOffers 方法,这个方法是executor接收到请求后再向 driverEndpoint 发送一个 ReviveOffers 消息。

image-20220702172536162

DriverEndpoint 收到信息后会调用 makeOffers 方法来准备让所有的task执行:

接下来会调用TaskSchedulerImpl的resourceOffers

在resourceOffers函数中,会循环所有的TaskSet并调用resourceOfferSingleTaskSet函数

image-20220702165829246

而在resourceOfferSingleTaskSet函数中终于到了taskSetManager的resourceOffer函数用来获取可以执行的task

image-20220702170003390

在resourceOffer函数中调用了getAllowedLocalityLevel函数获取当前首选优先级别,并且将优先级给到dequeueTask函数

image-20220702170341231

而在dequeueTask函数中,则是根据本地策略去forExecutor,forHost等队列中获取对应的分区下标等参数构建成一个tuple类型

image-20220702170512445

返回tuple类型后我们回到TaskSetManager的resourceOffer函数,这里会根据返回的tuple类型构建成一个TaskDescription类返回给上层的TaskSchedulerImpl

image-20220702170902546

而TaskSchedulerImpl在拿到TaskDescription信息后会根据当前dirver端记录的executor的数量和资源来判断是否能够执行任务,假如资源不够则会重新执行调用,就又回到了geAllowedLocalityLevel函数中,循环往复直到可以执行任务;

至此spark的数据本地化以及延迟调度策略介绍完毕,建议参考源码进行对照查看,此内容源码为spark3.0版本

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-07-04 22:59:50  更:2022-07-04 23:02:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 2:04:02-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码