背景
本文基于SPARK 3.2.1 之前的文章SPARK SHUFFLE中 ShuffleId BlockManagerId 以及 与ESS(External Shuffle Server)交互,我们只是讲了一下大概的shuffle流程,这次来分析一下push-based shuffle, 便于更好的理解spark shuffle中push-based shuffle.
分析
直接跳到ShuffleMapTask的RunTask方法:
override def runTask(context: TaskContext): MapStatus = {
...
dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
之后就会转到以下方法:
...
if (mapStatus.isDefined) {
// Check if sufficient shuffle mergers are available now for the ShuffleMapTask to push
if (dep.shuffleMergeAllowed && dep.getMergerLocs.isEmpty) {
val mapOutputTracker = SparkEnv.get.mapOutputTracker
val mergerLocs =
mapOutputTracker.getShufflePushMergerLocations(dep.shuffleId)
if (mergerLocs.nonEmpty) {
dep.setMergerLocs(mergerLocs)
}
}
// Initiate shuffle push process if push based shuffle is enabled
// The map task only takes care of converting the shuffle data file into multiple
// block push requests. It delegates pushing the blocks to a different thread-pool -
// ShuffleBlockPusher.BLOCK_PUSHER_POOL.
if (!dep.shuffleMergeFinalized) {
manager.shuffleBlockResolver match {
case resolver: IndexShuffleBlockResolver =>
logInfo(s"Shuffle merge enabled with ${dep.getMergerLocs.size} merger locations " +
s" for stage ${context.stageId()} with shuffle ID ${dep.shuffleId}")
logDebug(s"Starting pushing blocks for the task ${context.taskAttemptId()}")
val dataFile = resolver.getDataFile(dep.shuffleId, mapId)
new ShuffleBlockPusher(SparkEnv.get.conf)
.initiateBlockPush(dataFile, writer.getPartitionLengths(), dep, partition.index)
case _ =>
}
}
}
...
首先是如果发现该shuffle阶段没有对应的merge server,就会从mapOutputTracker去获取对应的ShufflePushMergerLocation(整个数据流如下):
MapOutputTrackerWorker.getShufflePushMergerLocations
||
\/
向 MapOutputTrackerMaster发送 GetShufflePushMergerLocations消息
||
\/
MapOutputTrackerMaster 向MapOutputTrackerMaster 发送GetShufflePushMergersMessage消息
||
\/
返回shuffleStatuses.get(shuffleId).map(_.getShufflePushMergerLocations)
而shuffleStatuses的初始化是在DAGSchduler中的,整个的数据流入下:
DAGSchduler调用getOrCreateParentStages创建Stage
||
\/
createShuffleMapStage
||
\/
mapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length,shuffleDep.partitioner.numPartitions) //注册对应的shuffId
||
\/
shuffleStatuses.put(shuffleId, new ShuffleStatus(numMaps, numReduces)) //注册上shuffleId和ShuffleStatus的关系
而一开始ShuffleStatus里面的getShufflePushMergerLocations返回的是空列表, shufflePushMergerLocations的值是在executor注册启动的时候被填充的,如下数据流:
makeOffers
val activeExecutors = executorDataMap.filterKeys(isExecutorActive) //executorDataMap的信息在executor启动的时候就会被填充
scheduler.resourceOffers
||
\/
executorAdded(o.executorId, o.host)
||
\/
dagScheduler.executorAdded(execId, host)
||
\/
dagScheduler向dagScheduler发动ExecutorAdded消息
||
\/
dagScheduler.handleExecutorAdded(execId, host)
||
\/
mapOutputTracker.registerShufflePushMergerLocations(stage.shuffleDep.shuffleId,
stage.shuffleDep.getMergerLocs) //stage.shuffleDep.getMergerLocs的信息是在提交任务的时候填充的, =》 DAGSchudler.submitMissingTasks
||
\/
prepareShuffleServicesForShuffleMapStage
||
\/
getAndSetShufflePushMergerLocations
||
\/
sc.schedulerBackend.getShufflePushMergerLocations
||
\/
val mergerLocations = blockManagerMaster
.getShufflePushMergerLocations(numMergersDesired, scheduler.excludedNodes()) //这里直接通过blockManagerMaster获取到对应的mergerLocations,而blockManagerMaster获取得到的BlockManagerId信息也是在blockMangerinitialize初始化的时候注册上的
这样 在shuffleMapTask运行的时候能获得了mergerLocs位置信息。 再回到shuffleWriterProcessor.write 方法中来,获得了mergerLocs以后,就会调用new ShuffleBlockPusher(SparkEnv.get.conf).initiateBlockPush 方法进行推送, 推动方式是以一致的方式确定这个分组和相应的ESS目的地,从而将属于同一个shuffle分区的不同 mappers的块推到同一ESS。 至此push-based shuffle大概数据流就分析到此了。
|