getPreferredLocations
在了解spark本地话和延迟调度之前先来看下RDD的五个重要属性
其中getPreferredLocations用来获取rdd每一个分区的优先级列表位置,由不同的子类RDD分别实现,仅从字面意思就知道这个函数将和数据本地化密切相关:
此函数返回类型为 Seq[String] ,其实对应的是 Seq[TaskLocation] ,在返回前都会执行 TaskLocation#toString 方法。TaskLocation 是一个 trait,共有以三种实现,分别代表数据存储在不同的位置:
val inMemoryLocationTag = "hdfs_cache_"
val executorLocationTag = "executor_"
private [spark]
case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
extends TaskLocation {
override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {
override def toString: String = host
}
private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {
override def toString: String = TaskLocation.inMemoryLocationTag + host
}
- ExecutorCacheTaskLocation: 代表 partition 数据已经被 cache 到内存,比如使用cache函数到内存,其 toString 方法返回的格式如
executor_$host_$executorId - HostTaskLocation:代表 partition 数据存储在某个节点的磁盘上,其 toString 方法直接返回 host
- HDFSCacheTaskLocation:代表 partition 数据存储在 hdfs 上,比如从 hdfs 上加载而来的 HadoopRDD 的 partition,其 toString 方法返回的格式如
hdfs_cache_$host
这样,我们就知道不同的 RDD 会有不同的优先位置,并且不同的位置字符串的格式是不同的,这在之后 TaskSetManager 计算 tasks 的最优本地性会有作用。
那返回值为何会是一个Seq集合?其实很好理解,比方说一个hadoopRDD只有一个分区,读取一个hdfs上的文件,且此文件大小正好是一个block块,且副本为三,则在集群中,此hadoopRDD的唯一分区应该有三个位置:Seq[1.5、1.6、1.7]
为了更好的理解,接下来看四个具有代表性的RDD的getPreferredLocations函数实现:
HadoopRDD :对应sc.textFile(“hdfs://xxxxx”)
MapPartitionsRDD:对应map/filter 等转换操作
ShuffledRDD:对应reduceByKey / groupByKey 等shuffle
JdbcRDD:对应jdbc等需要远程获取的rdd
HadoopRDD
直接调用hadoop包下的InputSplitWithLocationInfo的getLocations方法获得所在的位置
再经过convertSplitLocationInfo函数进行转换成spark的TaskLocation:
根据hadoop返回的分片类中的状态创建spark的HDFSCacheTaskLocation、HostTaskLocation,最终都调用toString返回字符串
MapPartitionsRDD
**可以看出MapPartitionsRDD没有实现getPreferredLocations函数,默认是父类RDD的getPreferredLocations = Nil; **
这是由于map/filter这类操作不存在位置偏好,在spark调度时发现getPreferredLocations = Nil后会不断向上查找血缘,直到找到实现getPreferredLocations的RDD;
ShuffledRDD
shuffledRdd比较特殊,这是由于shuffle过程中涉及到了数据spill到磁盘的过程,故下游reduce算子需要找到
map-write阶段溢写到磁盘的位置,但这个位置只靠血缘是有可能找不到,因为DAG会划分stage阶段,以及任务重试等可能导致下游stage无法知晓上游shuffle-write的位置;
那么spark是通过一个mapOutputTracker组件统一解决shuffle问题:
MapOutputTracker是spark环境的主要组件之一,其功能是管理各个shuffleMapTask的输出数据。reduce任务就是根据MapOutputTracker提供的信息决定从哪些executor获取需要的map输出数据。关于MapOutputTracker的实现这里不过多赘述,只需了解ShuffledRDD的位置偏好获取即可;
JdbcRDD
像这类JDBC操作的rdd,内部没有实现getPreferredLocations函数,默认是父类RDD的getPreferredLocations = Nil;
这么做其实很好理解,jdbc这类算子是远程访问,故其所在位置哪里都一样
submitMissingTasks
了解了以上内容我们再来看DAGScheduler 生成 taskSet过程
DAGScheduler 通过调用 submitStage 来提交一个 stage 对应的 tasks,submitStage 会找到还未执行的missTasks,然后调用submitMissingTasks
在函数中首先会根据stage的类型,找到未执行的partition的位置偏好,如下图:
接下来看getPreferredLocs函数,发现是调用getPreferredLocsInternal函数,在getPreferredLocsInternal函数中会根据不同的情况调用rdd的preferredLocations 函数,比如如果有缓存则获取缓存级别的TaskLocation,如果不是缓存则调用rdd的优先级函数;
并且在方法的最后还会判断是否是窄依赖,这里是因为例如mapRDD没有位置偏好,故寻找父类rdd然后递归调用getPreferredLocsInternal函数,直到父类rdd的preferredLocations 函数返回值;
rdd的preferredLocations函数其实是先判断是否使用了checkpoint检查点,如果有检查点直接从检查点获取位置偏好,因为数据在检查点上,如果是没有设置检查点,则直接调用开头部分的 getPreferredLocations函数获取位置集合
这里深入一下,如果有缓存调用的函数:这个函数中如果从blocKManager中能够获取到缓存,则构建的TaskLocation为ExecutorCacheTaskLocation级别
然后我们再回到submitMissingTasks函数中,当我们获取到了val taskIdToLocations: Map[Int, Seq[TaskLocation]] 所有待执行的task的分区id和位置偏好后,在下面构建Task类的时候将会有作用,如下图:
至此DAGScheduler构建Tasks阶段结束,接下来就是将task构建成TaskSet,然后交给TaskScheduler,如下图:
TaskSetManager
由TaskSchedulerImpl来实现submitTask函数,TaskSchedulerImpl 会为每个 taskSet 创建一个 TaskSetManager 对象,该对象包含taskSet 所有 tasks,并管理这些 tasks 的执行,其中就包括计算 taskSetManager 中的 tasks 都有哪些locality levels,以便在调度和延迟调度 tasks 时发挥作用。
而在构建TaskSetManager时会通过addPendingTasks函数将所有的task 调用addPendingTask函数,addPendingTask函数其目的是用于将待处理Task的按照Task的偏好位置,添加到不同的缓存队列中。
五种优先级
addPendingTask函数中会根据各个task不同的TaskLocation来添加到不同的队列中,先来说一下spark的默认五种优先级别:
PROCESS_LOCAL : 数据在同一个 JVM 中,即同一个 executor 上。这是最佳数据 locality。
NODE_LOCAL : 数据在同一个节点上。比如数据在同一个节点的另一个 executor上;或在 HDFS 上,恰好有 block 在同一个节点上。速度比 PROCESS_LOCAL 稍慢,因为数据需要在不同进程之间传递或从文件中读取
NO_PREF : 数据从哪里访问都一样快,不需要位置优先
RACK_LOCAL : 数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比 NODE_LOCAL 慢
ANY : 数据在非同一机架的网络上,速度最慢
接下来看一下再TaskSetManager中将不同的优先级队列,如下图:
可以看出部分是map结构,部分是buffer结构,这是因为noPrefs、all没有最佳位置,故只存放了分区id
而forExecutor的key是executorID, forHost的key是host,forRack的key是机架id
value之所以是buffer集合类型,是因为有可能一个多个task的优先级都在一个Executor上
private[scheduler] class PendingTasksByLocality {
val forExecutor = new HashMap[String, ArrayBuffer[Int]]
val forHost = new HashMap[String, ArrayBuffer[Int]]
val noPrefs = new ArrayBuffer[Int]
val forRack = new HashMap[String, ArrayBuffer[Int]]
val all = new ArrayBuffer[Int]
}
addPendingTask
接下来再看addPendingTask函数,可以看出如果是ExecutorCacheTaskLocation类型[比如此rdd-cache过],就放入forExecutor队列中
如果是HDFSCacheTaskLocation类型,并且host下存在此ExecutorsId,则放入forExecutor队列中
重点:下面可以发现前面无论放不放到forExecutor队列,都会放入到forHost队列中,如果有forRack存在还会放入到forRack队列中
如果preferredLocations == Nil 代表无偏好,也会放入到fangrudaonoPrefs,并且最终还会放入到all队列中
这是因为这五种Locality级别存在包含关系,RACK_LOCAL包含NODE_LOCAL,NODE_LOCAL包含PROCESS_LOCAL,然而ANY包含其他所有四种。比如,一个Task的preferredLocations指定了在Executor-2上执行,那么它属于Executor-2对应的PROCESS_LOCAL类别,同时也把他加入到Executor-2所在的主机对应的NODE_LOCAL类别,Executor-2所在的主机的机架对应的RACK_LOCAL类别中,以及ANY类别,这样在调度执行时,满足不了PROCESS_LOCAL,就逐步退化到NODE_LOCAL,RACK_LOCAL,ANY
computeValidLocalityLevels
那么在构建好了各种队列后,TaskSetManager还调用了一个computeValidLocalityLevels函数,其目的是对外开放 taskSetManager 中的 tasks 都有哪些locality levels,以便在后续TaskSechduler调度和延迟调度 tasks 时发挥作用。
可以看到下面的computeValidLocalityLevels函数实现并不复杂,显示判断队列是否为空,然后分别判断isExecutorAlive、hasExecutorsAliveOnHost、hasHostAliveOnRack;
这三个函数的目的是为了确保executor / host / rack是存在的,因为在上面addPendingTask函数中只是判断位置偏好后就放入了不同的队列中,有可能某些executor已经不在了,故这里多了一层判断
这里我们挑sched.isExecutorAlive的实现为例子:executorIdToRunningTaskIds保存着所有可运行的executor的key
至此TaskSetManager的 addPendingTask 、 computeValidLocalityLevels函数已经了解了,接下来就到延迟调度策略了
延迟调度:getAllowedLocalityLevel
spark的延迟调度策略主要是getAllowedLocalityLevel函数实现的: 用来返回当前该 taskSetManager 中未执行的 tasks 的最高可能 locality level
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
var indexOffset = pendingTaskIds.size
while (indexOffset > 0) {
indexOffset -= 1
val index = pendingTaskIds(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return true
} else {
pendingTaskIds.remove(indexOffset)
}
}
false
}
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
val emptyKeys = new ArrayBuffer[String]
val hasTasks = pendingTasks.exists {
case (id: String, tasks: ArrayBuffer[Int]) =>
if (tasksNeedToBeScheduledFrom(tasks)) {
true
} else {
emptyKeys += id
false
}
}
emptyKeys.foreach(id => pendingTasks.remove(id))
hasTasks
}
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasks.forExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasks.forHost)
case TaskLocality.NO_PREF => pendingTasks.noPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasks.forRack)
}
if (!moreTasks) {
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
lastLaunchTime += localityWaits(currentLocalityIndex)
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
currentLocalityIndex += 1
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}
函数中又包含了两个小函数和一个while循环,循环条件为while (currentLocalityIndex < myLocalityLevels.length - 1) , 其中myLocalityLevels: Array[TaskLocality.TaskLocality] 是当前 TaskSetManager 的所有 tasks 所包含的本地性(locality)集合,本地性越高的 locality level 在 myLocalityLevels 中的下标越小,上文中已经介绍过了
可以看到除了noPref以外每一个级别都会调用moreTasksToRunIn函数:
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasks.forExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasks.forHost)
case TaskLocality.NO_PREF => pendingTasks.noPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasks.forRack)
}
moreTasksToRunIn主要作用是用来判断不同等级队列中是否有需要执行的作业!这是因为有些task可能正在运行或已经运行完毕了,具体实现为:
- 对于不同等级的 locality level 的 tasks 列表,将已经成功执行的或正在执行的该 locality level 的 task 从对应的列表中移除
- 判断对应的 locality level 的 task 是否还要等待执行的,若有则返回 true,否则返回 false
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
var indexOffset = pendingTaskIds.size
while (indexOffset > 0) {
indexOffset -= 1
val index = pendingTaskIds(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return true
} else {
pendingTaskIds.remove(indexOffset)
}
}
false
}
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
val emptyKeys = new ArrayBuffer[String]
val hasTasks = pendingTasks.exists {
case (id: String, tasks: ArrayBuffer[Int]) =>
if (tasksNeedToBeScheduledFrom(tasks)) {
true
} else {
emptyKeys += id
false
}
}
emptyKeys.foreach(id => pendingTasks.remove(id))
hasTasks
}
看完了这两个函数后我们主要看while循环中的逻辑:
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasks.forExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasks.forHost)
case TaskLocality.NO_PREF => pendingTasks.noPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasks.forRack)
}
if (!moreTasks) {
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
lastLaunchTime += localityWaits(currentLocalityIndex)
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
currentLocalityIndex += 1
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
整个循环体都在做这几个事情:
1、判断 myLocalityLevels(currentLocalityIndex) 这个级别的本地性对应的待执行 tasks 集合中是否还有待执行的 task
2、若无:则将 currentLocalityIndex += 1 进行下一次循环,即将 locality level 降低一级回到第1步
3、若有,且当前时间与lastLaunchTime【第一次是当前时间,第二次则是上次getAllowedLocalityLevel返回 myLocalityLevels(currentLocalityIndex) ]】时间间隔大于localityWaits(currentLocalityIndex)时间,代表超过了延迟调度的极限,则将 currentLocalityIndex += 1 进行下一次循环,即将 locality level 降低一级回到第1步;
这里看一下localityWaits函数:
通过上图可以看出不同的级别对应着不同的时间,参照官网查看默认时间:发现都是默认3s
4、若有,且当前时间在3s的延迟调度等待区间,则返回此等级
函数总结:比如上次以 TaskLocality返回process级别的 taskSetManager 启动 task 失败,说明taskSetManager 中 tasks 对应 TaskLocality 的节点均没有空闲资源来启动 task,但 Spark 这次调用此时仍然会以 TaskLocality 来为 taskSetManager 启动 task。为什么要这样做?一般来说,task 执行耗时相对于网络传输/文件IO 要小得多,调度器多等待1 2秒可能就可以以更好的本地性执行 task,避免了更耗时的网络传输或文件IO,task 整体执行时间会降低
至此getAllowedLocalityLevel函数介绍完毕,看完getAllowedLocalityLevel函数后可能会有疑问,这个函数是谁调用的呢?
使用顺序
接下来我们从上到下看一下调用顺序:首先在 taskScheduler 调用 submitTasks 函数中会为每个 taskSet 创建一个 TaskSetManager,用于管理 taskSet。然后向调度池中添加该 TaskSetManager,最后会调用 backend.reviveOffers() 方法向executor发送请求。
下面主要看 backend.reviveOffers() 这个方法,在提交模式是 yarn-cluster 模式下,实际上是调用 YarnClusterSchedulerBackend 的 reviveOffers 方法,实则调用的是其父类 CoarseGrainedSchedulerBackend 的 reviveOffers 方法,这个方法是executor接收到请求后再向 driverEndpoint 发送一个 ReviveOffers 消息。
DriverEndpoint 收到信息后会调用 makeOffers 方法来准备让所有的task执行:
接下来会调用TaskSchedulerImpl的resourceOffers
在resourceOffers函数中,会循环所有的TaskSet并调用resourceOfferSingleTaskSet函数
而在resourceOfferSingleTaskSet函数中终于到了taskSetManager的resourceOffer函数用来获取可以执行的task
在resourceOffer函数中调用了getAllowedLocalityLevel函数获取当前首选优先级别,并且将优先级给到dequeueTask函数
而在dequeueTask函数中,则是根据本地策略去forExecutor,forHost等队列中获取对应的分区下标等参数构建成一个tuple类型
返回tuple类型后我们回到TaskSetManager的resourceOffer函数,这里会根据返回的tuple类型构建成一个TaskDescription类返回给上层的TaskSchedulerImpl
而TaskSchedulerImpl在拿到TaskDescription信息后会根据当前dirver端记录的executor的数量和资源来判断是否能够执行任务,假如资源不够则会重新执行调用,就又回到了geAllowedLocalityLevel函数中,循环往复直到可以执行任务;
至此spark的数据本地化以及延迟调度策略介绍完毕,建议参考源码进行对照查看,此内容源码为spark3.0版本
|