IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark源码跟踪(二)stage划分 -> 正文阅读

[大数据]spark源码跟踪(二)stage划分

一,stage类图

在这里插入图片描述

二,源码跟踪

入口

val sparkConnf=new SparkConf().setAppName("wordCount").setMaster("local[3]")
val sparkContext=new SparkContext(sparkConnf)

val rdd=sparkContext.parallelize(Array("hello thank you thank you very much are you ok"),3)
val words_rdd=rdd.flatMap(word=> word.split(" "))
val keyValue_rdd=words_rdd.map(word=>(word,1))
val result=keyValue_rdd.reduceByKey((x,y)=>x+y)
result.collect().map(x=> println(x._1+":"+x._2))
sparkContext.stop()

点击result.collect() 跟踪方法调用。关键代码截图:

在这里插入图片描述
在这里插入图片描述
出现了一个名为eventThread的线程。

// Exposed for testing.
private[spark] val eventThread = new Thread(name) {
  setDaemon(true)


  override def run(): Unit = {
    try {
      while (!stopped.get) {
        val event = eventQueue.take()
        try {
          onReceive(event)
        } catch {
          case NonFatal(e) =>
            try {
              onError(e)
            } catch {
              case NonFatal(e) => logError("Unexpected error in " + name, e)
            }
        }
      }
    } catch {
      case ie: InterruptedException => // exit even if eventQueue is not empty
      case NonFatal(e) => logError("Unexpected error in " + name, e)
    }
  }
}

eventThread是一个守护线程,启动后从eventQueue中获取上文中post函数提交的对象,并通过子类的onReceive函数处理。

JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,Utils.cloneProperties(properties))

在这里插入图片描述
在这里插入图片描述

/**
 * Create a ResultStage associated with the provided jobId.
 */
private def createResultStage(
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
  checkBarrierStageWithDynamicAllocation(rdd)
  checkBarrierStageWithNumSlots(rdd)
  checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
  val parents = getOrCreateParentStages(rdd, jobId)
  val id = nextStageId.getAndIncrement()
  val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(jobId, stage)
  stage
}

val parents = getOrCreateParentStages(rdd, jobId)

/**
 * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
 * the provided firstJobId.
 */
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
  getShuffleDependencies(rdd).map { shuffleDep =>
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)
  }.toList
}

getShuffleDependencies(rdd) 会从后往前查找rdd每条依赖链上的最后一级ShuffleDependency


/**
 * Returns shuffle dependencies that are immediate parents of the given RDD.
 *
 * This function will not return more distant ancestors.  For example, if C has a shuffle
 * dependency on B which has a shuffle dependency on A:
 *
 * A <-- B <-- C
 *
 * calling this function with rdd C will only return the B <-- C dependency.
 *
 * This function is scheduler-visible for the purpose of unit testing.
 */
private[scheduler] def getShuffleDependencies(
    rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
  val parents = new HashSet[ShuffleDependency[_, _, _]]
  val visited = new HashSet[RDD[_]]
  val waitingForVisit = new ListBuffer[RDD[_]]
  waitingForVisit += rdd
  while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.remove(0)
    if (!visited(toVisit)) {
      visited += toVisit
      toVisit.dependencies.foreach {
        case shuffleDep: ShuffleDependency[_, _, _] =>
          parents += shuffleDep
        case dependency =>
          waitingForVisit.prepend(dependency.rdd)
      }
    }
  }
  parents
}

在这里插入图片描述
如上图的依赖关系会返回ShuffleDependency1,ShuffleDependency3组成的hashset。
继续跟踪getOrCreateShuffleMapStage(shuffleDep, firstJobId)

/**
 * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
 * shuffle map stage doesn't already exist, this method will create the shuffle map stage in
 * addition to any missing ancestor shuffle map stages.
 */
private def getOrCreateShuffleMapStage(
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {
  shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
    case Some(stage) =>
      stage


    case None =>
      // Create stages for all missing ancestor shuffle dependencies.
      getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
        // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
        // that were not already in shuffleIdToMapStage, it's possible that by the time we
        // get to a particular dependency in the foreach loop, it's been added to
        // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
        // SPARK-13902 for more information.
        if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
          createShuffleMapStage(dep, firstJobId)
        }
      }
      // Finally, create a stage for the given shuffle dependency.
      createShuffleMapStage(shuffleDep, firstJobId)
  }
}

getMissingAncestorShuffleDependencies函数会根据依赖链以上文找到的最后一级ShuffleDependency的父RDD为起点从后往前追溯,获取所有的ShuffleDependency组成队列返回,prepend函数将新的元素插入到队列的开始位置,所以返回的ancestors队列中祖先的ShuffleDependency在队首,子ShuffleDependency在队尾,与DAG中顺序一致。
在这里插入图片描述

createShuffleMapStage函数会按照DAG中ShuffleDependency出现的顺序依次被调用创建ShuffleMapStage,并存储在名为stageIdToStage的HashMap[Int, Stage]中,key依次递增。如果有多个依赖分支,则一个一个分支创建,祖先ShuffleMapStage编号小于子ShuffleMapStage编号

private val nextStageId = new AtomicInteger(0)

private[scheduler] val stageIdToStage = new HashMap[Int, Stage]

/**
 * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a
 * previously run stage generated the same shuffle data, this function will copy the output
 * locations that are still available from the previous shuffle to avoid unnecessarily
 * regenerating data.
 */
def createShuffleMapStage[K, V, C](
    shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
  val rdd = shuffleDep.rdd
  checkBarrierStageWithDynamicAllocation(rdd)
  checkBarrierStageWithNumSlots(rdd)
  checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
  val numTasks = rdd.partitions.length
  val parents = getOrCreateParentStages(rdd, jobId)
  val id = nextStageId.getAndIncrement()
  val stage = new ShuffleMapStage(
    id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)


  stageIdToStage(id) = stage
  shuffleIdToMapStage(shuffleDep.shuffleId) = stage
  updateJobIdStageIdMaps(jobId, stage)


  if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo(s"Registering RDD ${rdd.id} (${rdd.getCreationSite}) as input to " +
      s"shuffle ${shuffleDep.shuffleId}")
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
  }
  stage
}

三,结论

ShuffleMapStage与ShuffleDependency的一一对应;
总stage的数量=ShuffleMapStage总数 + 1(一个ResultStage)。
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-22 11:00:15  更:2021-10-22 11:00:52 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 5:10:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码