IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SPARK push-based shuffle mapTask是怎么获取ESS列表信息 -> 正文阅读

[大数据]SPARK push-based shuffle mapTask是怎么获取ESS列表信息

背景

本文基于SPARK 3.2.1
之前的文章SPARK SHUFFLE中 ShuffleId BlockManagerId 以及 与ESS(External Shuffle Server)交互,我们只是讲了一下大概的shuffle流程,这次来分析一下push-based shuffle,
便于更好的理解spark shuffle中push-based shuffle.

分析

直接跳到ShuffleMapTask的RunTask方法:

override def runTask(context: TaskContext): MapStatus = {
    ...
    dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)

之后就会转到以下方法:

...
if (mapStatus.isDefined) {
        // Check if sufficient shuffle mergers are available now for the ShuffleMapTask to push
        if (dep.shuffleMergeAllowed && dep.getMergerLocs.isEmpty) {
          val mapOutputTracker = SparkEnv.get.mapOutputTracker
          val mergerLocs =
            mapOutputTracker.getShufflePushMergerLocations(dep.shuffleId)
          if (mergerLocs.nonEmpty) {
            dep.setMergerLocs(mergerLocs)
          }
        }
        // Initiate shuffle push process if push based shuffle is enabled
        // The map task only takes care of converting the shuffle data file into multiple
        // block push requests. It delegates pushing the blocks to a different thread-pool -
        // ShuffleBlockPusher.BLOCK_PUSHER_POOL.
        if (!dep.shuffleMergeFinalized) {
          manager.shuffleBlockResolver match {
            case resolver: IndexShuffleBlockResolver =>
              logInfo(s"Shuffle merge enabled with ${dep.getMergerLocs.size} merger locations " +
                s" for stage ${context.stageId()} with shuffle ID ${dep.shuffleId}")
              logDebug(s"Starting pushing blocks for the task ${context.taskAttemptId()}")
              val dataFile = resolver.getDataFile(dep.shuffleId, mapId)
              new ShuffleBlockPusher(SparkEnv.get.conf)
                .initiateBlockPush(dataFile, writer.getPartitionLengths(), dep, partition.index)
            case _ =>
          }
        }
      }
...

首先是如果发现该shuffle阶段没有对应的merge server,就会从mapOutputTracker去获取对应的ShufflePushMergerLocation(整个数据流如下):

MapOutputTrackerWorker.getShufflePushMergerLocations
                 ||
                 \/
             向 MapOutputTrackerMaster发送 GetShufflePushMergerLocations消息
                 ||
                 \/
             MapOutputTrackerMaster 向MapOutputTrackerMaster 发送GetShufflePushMergersMessage消息
                 ||
                 \/
             返回shuffleStatuses.get(shuffleId).map(_.getShufflePushMergerLocations)

而shuffleStatuses的初始化是在DAGSchduler中的,整个的数据流入下:

DAGSchduler调用getOrCreateParentStages创建Stage
                ||
                \/
                createShuffleMapStage
                ||
                \/
                mapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length,shuffleDep.partitioner.numPartitions) //注册对应的shuffId
                ||
                \/
                shuffleStatuses.put(shuffleId, new ShuffleStatus(numMaps, numReduces)) //注册上shuffleId和ShuffleStatus的关系

而一开始ShuffleStatus里面的getShufflePushMergerLocations返回的是空列表,
shufflePushMergerLocations的值是在executor注册启动的时候被填充的,如下数据流:

   makeOffers
   val activeExecutors = executorDataMap.filterKeys(isExecutorActive) //executorDataMap的信息在executor启动的时候就会被填充
   scheduler.resourceOffers
             ||
             \/
             executorAdded(o.executorId, o.host) 
             ||
             \/
             dagScheduler.executorAdded(execId, host)
             ||
             \/
             dagScheduler向dagScheduler发动ExecutorAdded消息
             ||
             \/
             dagScheduler.handleExecutorAdded(execId, host)
             ||
             \/
             mapOutputTracker.registerShufflePushMergerLocations(stage.shuffleDep.shuffleId,
              stage.shuffleDep.getMergerLocs)    //stage.shuffleDep.getMergerLocs的信息是在提交任务的时候填充的,  =》  DAGSchudler.submitMissingTasks
                                                                                                                             ||
                                                                                                                             \/
                                                                                                                             prepareShuffleServicesForShuffleMapStage
                                                                                                                             ||
                                                                                                                             \/
                                                                                                                             getAndSetShufflePushMergerLocations
                                                                                                                             ||
                                                                                                                             \/
                                                                                                                             sc.schedulerBackend.getShufflePushMergerLocations
                                                                                                                             ||
                                                                                                                             \/
                                                                                                                             val mergerLocations = blockManagerMaster
      .getShufflePushMergerLocations(numMergersDesired, scheduler.excludedNodes()) //这里直接通过blockManagerMaster获取到对应的mergerLocations,而blockManagerMaster获取得到的BlockManagerId信息也是在blockMangerinitialize初始化的时候注册上的
                      
  

这样 在shuffleMapTask运行的时候能获得了mergerLocs位置信息。
再回到shuffleWriterProcessor.write方法中来,获得了mergerLocs以后,就会调用new ShuffleBlockPusher(SparkEnv.get.conf).initiateBlockPush方法进行推送,
推动方式是以一致的方式确定这个分组和相应的ESS目的地,从而将属于同一个shuffle分区的不同 mappers的块推到同一ESS。
至此push-based shuffle大概数据流就分析到此了。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-01 23:28:20  更:2022-04-01 23:28:28 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 14:37:13-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码