1.任务提交分析
????????这里以org.apache.spark.examples.SparkPi为例。当执行reduce(_+_)方法时,其底层调用了sc.runJob方法。核心代码如下:
/**
* 注释:(rdd, func, partitions, callSite, resultHandler, properties)
* 1、应用程序调用 action 算子
* 2、sparkContext.runJob()
* 3、dagScheduler.runJob()
* 4、TaskScheduler.submitTasks(new TaskSet())
* 5、SchedulerBackEnd.driverEndpoint 提交任务
*/
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
? ? ? ? 其中runJob方法中执行的核心代码:
/**
* TODO 注释: 提交任务
* 参数解析:
* 1、rdd:要在其上运行任务的参数RDD目标RDD
* 2、func:在RDD的每个分区上运行的函数
* 3、partitions:要运行的分区的集;某些作业可能不希望在目标RDD的所有分区上进行计算,例如,对于 first() 之类的操作。
* 4、callSite:在用户程序中调用此作业的位置
* 5、resultHandler:回调函数,以将每个分区结果传递给Xxx
* 6、properties:要附加到此作业的scheduler属性,例如fair scheduler pool name
*
* rdd1.xx1().xx2().xx3().xx4() 这里的 rdd = rdd1.xx1().xx2().xx3()
*
*/
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
? ? ? ? 其内部核心代码为:
/**
* TODO 注释:
* 第一步:封装一个JobWaiter对象;
* 第二步:将 JobWaiter 对象赋值给 JobSubmitted 的 listener 属性,
* 并将 JobSubmitted(DAGSchedulerEvent事件)对象传递给 eventProcessLoop 事件循环处理器。
* eventProcessLoop 内部事件消息处理线程将会接收 JobSubmitted 事件,
* 并调用dagScheduler.handleJobSubmitted(...) 方法来处理事件;
* 第三步:返回 JobWaiter 对象。
*/
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
/**
* TODO 注释:这是提交任务运行
* eventProcessLoop 就是当初 DAGScheduler 在初始化的时候,创建的一个 DAGSchedulerEventProcessLoop
* 这个组件主要负责:任务的提交执行
* 把 JobSubmitted 这个消息,放入了 eventQueue 队列中
*/
eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))
// TODO 注释: 返回结果对象的引用
waiter
? ? ? ? 通过eventProcessLoop.post(...)将任务放入了eventQueue队列中,有通过eventProcessLoop.start()方法将任务提交。前面有见到以下代码是等待任务的提交,正好有对应上:
/**
* TODO 注释: driver 中,初始化了一个 dagSchedudelr
* 它里面又初始化了一个 eventThread 专门用来处理 JobSubmitted
*/
// Exposed for testing.
private[spark] val eventThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = {
try {
while (!stopped.get) {
// TODO 注释:获取消息
// TODO 注释:一定要注意:当 sparkContext 还没有初始化好的时候,是不执行 sc.runJob 提交任务的。
// TODO 注释:当执行 sc.runJob(sc) 的时候,就会提交 Job 到这儿来。
val event = eventQueue.take()
try {
/**
* TODO 注释:根据事件的类型,调用不同的 handleXXX 方法来进行处理。
* 当接收到任务提交的时候: event = JobSubmitted
*/
onReceive(event)
} catch {
case NonFatal(e) => try {
onError(e)
} catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
? ? ? ? 在这里通过onReceive进行任务的提交,任务提交给了DAGScheduler
2.Stage切分与提交
2.1.Stage切分
? ? ? ? 任务提交核心代码如下,包含了2个方面,Stage切分与Task分发与执行。在DAGScheduler.handleJobSubmitted(...)中有如下代码:
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
????????这个finalRDD就是rdd链条中的最后一个RDD,也就是触发sc.runJob()方法执行的RDD。必然是针对某个RDD调用了一个action算子才触发执行的,则该RDD就是finalRDD。
? ? ? ? 注意以下概念:
- ShuffleMapStage ?+ ?ResultStage
- ShuffleMapTask + ResultTask
- ShuffleDependency + NarrowDependency
????????在createResultStage(...)方法,返回所有的ResultStage,其中包含了所有的父Stage。如下注释:
/** ?* TODO 注释: Stage 切分 ?* ? 这个 finalRDD 就是 rdd链条中的最后一个 RDD,也就是触发 sc.runJob() 方法执行的RDD ?* ? 必然是针对某个 RDD 调用了一个 action 算子才触发执行的,则该 RDD 就是 finallRDD ?* ?* ? stage切分的核心方法:createResultStage ?* ? 返回的是一个 ResultStage对象,但是这个对象中,会包含他的所有的 父stage ?* ? 这样做的最大好处:容错! ?* ? spark DAG引擎:血脉关系 ?RDD之间的依赖被构建成了 dependency ,也被构建成了 stage 之间的依赖关系 ?*/
? ? ? ? ?可查看下面的详细代码:
/**
* TODO Create a ResultStage associated with the provided jobId.
* 进行Stage切分的详细方法实现
*/
private def createResultStage(rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int,
callSite: CallSite): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
// TODO 注释:获得父stage,若没有shuffle则返回空List
// TODO 注释:获取当前Stage的parent Stage,这个方法是划分Stage的核心实现
// 所有的父类stage都已经构建完成并返回给parents。
val parents = getOrCreateParentStages(rdd, jobId)
// TODO 注释: finalStage=resultStage 的 stageID 这里返回的是最后一个stage的Id
val id = nextStageId.getAndIncrement()
// TODO 注释:创建当前最后的ResultStage
// TODO 注释:parents 所有的 父 stage
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
// TODO 注释:将 ResultStage 与 stageId 相关联, 保存在 map 中
// TODO 注释:stageIdToStage = new HashMap[Int, Stage]
stageIdToStage(id) = stage
// TODO 注释:更新该job中包含的stage
updateJobIdStageIdMaps(jobId, stage)
// TODO 注释:返回ResultStage
stage
}
? ? ? ? 在上面的getOrCreateParentStages(rdd, jobId)代码,获取父Stage,若没有父Stage,则返回空List。所有的父类stage都已经构建完成并返回给parents。调用的方法是以下代码:
getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList
? ? ? ? 虽然只有一行代码,但是做了很多事情。
- 遍历 RDD 的依赖关系,找到第一个 ShuffleDependency (可能多个,也可能没有)。然后放入 HashSet 并返回
- 如果获取不到 ShuffleDependency,逻辑在此终止返回空 list
- 里面会创建当前 ShuffleDependency 的所有父ShuffleMapStage
? ? ? ? 1.getShuffleDependencies(rdd),这里的rdd是指finalRDD。其实这一步只会返回当前RDD的紧邻的父依赖Stage。具体的可以查看此方法的注释。
/**
* Returns shuffle dependencies that are immediate parents of the given RDD.
*
* This function will not return more distant ancestors.
* For example, if C has a shuffle dependency on B which has a shuffle dependency on A:
* A <-- B <-- C
* calling this function with rdd C will only return the B <-- C dependency.
* This function is scheduler-visible for the purpose of unit testing.
*
*/
? ? ? ? 2.getOrCreateShuffleMapStage(shuffleDep, firstJobId)这里用递归的方式,最终是将所有的Stage放在以下2个Map当中。
// TODO 注释:自增id对应stage
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
// TODO 注释:自增id对应ShuffleMapStage
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
? ? ? ? 具体的可以看下这个方法:
/**
* TODO 如果 shuffleIdToMapStage 中存在 shuffle,则获取 shuffle map stage。
* 否则,如果 shuffle map stage 不存在,该方法将创建 shuffle map stage
* 以及任何丢失的 parent shuffle map stage。
*
* 第一次来到这里传进来的是左到右首个shuffleDep,没有父stage,
* shuffleIdToMapStage也没有记录,会调 getMissingAncestorShuffleDependencies 并创建 stage
* 之后进来可直接根据 shuffleIdToMapStage 提取到 ShuffleMapStage
*
* Gets a shuffle map stage if one exists in shuffleIdToMapStage.
* Otherwise, if the shuffle map stage doesn't already exist,
* this method will create the shuffle map stage in addition to any missing ancestor shuffle map stages.
*/
private def getOrCreateShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = {
// TODO 注释: 根据每个 ShuffleDep 的 shuffleID 来获取 Stage 对象
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
// TODO 注释:如果有,则直接返回
case Some(stage) => stage
// TODO 注释:如果还有 ShuffleDependency 没有构建 ShuffleMapStage 则创建一个
// TODO 注释:如果这个stage没有构建
// TODO 注释:第一件事:先把这个stage的所有父stage都构建出来
// TODO 注释:然后再次构建当前stage
case None =>
// TODO 注释:给当前 stage 的所有父stage 创建 ShuffleMapStage
// TODO 注释:举例:C <---B <-- A
// 这里使用了递归。将所有的父依赖,即stage都放入
// Create stages for all missing ancestor shuffle dependencies.
getMissingAncestorShuffleDependencies(shuffleDep.rdd)
.foreach { dep => // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
// that were not already in shuffleIdToMapStage, it's possible that by the time we
// get to a particular dependency in the foreach loop, it's been added to
// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
// SPARK-13902 for more information.
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
/**
* TODO
* 注释:为指定的 ShuffleDependency 创建一个 ShuffleMapStage
* 构建当前Stage。
*/
// Finally, create a stage for the given shuffle dependency.
createShuffleMapStage(shuffleDep, firstJobId)
}
}
总结如下:
1、createResultStage(传入finalRDD获得ResultStage) ->2 2、getOrCreateParentStages(传入rdd获得父stage) ->3->4 ?? ?3、getShuffleDependencies(传入rdd获得宽依赖) ?? ?4、getOrCreateShuffleMapStage(传入宽依赖获得ShuffleMapStage) ->5->6 ?? ??? ?5、getMissingAncestorShuffleDependencies(传入一个rdd获得所有宽依赖) ->3 ?? ??? ?6、createShuffleMapStage(传入宽依赖获得ShuffleMapStage) ->2
2.2.Stage提交
? ? ? ? 核心代码:
|