IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark源码解析(七)Action算子解析 -> 正文阅读

[大数据]Spark源码解析(七)Action算子解析

1.任务提交分析

????????这里以org.apache.spark.examples.SparkPi为例。当执行reduce(_+_)方法时,其底层调用了sc.runJob方法。核心代码如下:

/**
  *   注释:(rdd, func, partitions, callSite, resultHandler, properties)
  *   1、应用程序调用 action 算子
  *   2、sparkContext.runJob()
  *   3、dagScheduler.runJob()
  *   4、TaskScheduler.submitTasks(new TaskSet())
  *   5、SchedulerBackEnd.driverEndpoint 提交任务
  */
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

? ? ? ? 其中runJob方法中执行的核心代码:

/**
 * TODO  注释: 提交任务
 *   参数解析:
 *   1、rdd:要在其上运行任务的参数RDD目标RDD
 *   2、func:在RDD的每个分区上运行的函数
 *   3、partitions:要运行的分区的集;某些作业可能不希望在目标RDD的所有分区上进行计算,例如,对于 first() 之类的操作。
 *   4、callSite:在用户程序中调用此作业的位置
 *   5、resultHandler:回调函数,以将每个分区结果传递给Xxx
 *   6、properties:要附加到此作业的scheduler属性,例如fair scheduler pool name
 *
 *   rdd1.xx1().xx2().xx3().xx4() 这里的 rdd = rdd1.xx1().xx2().xx3()
 *   
 */
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

? ? ? ? 其内部核心代码为:

 /**
  * TODO 注释:
  *   第一步:封装一个JobWaiter对象;
  *   第二步:将 JobWaiter 对象赋值给 JobSubmitted 的 listener 属性,
  *          并将 JobSubmitted(DAGSchedulerEvent事件)对象传递给 eventProcessLoop 事件循环处理器。
  *          eventProcessLoop 内部事件消息处理线程将会接收 JobSubmitted 事件,
  *          并调用dagScheduler.handleJobSubmitted(...) 方法来处理事件;
  *   第三步:返回 JobWaiter 对象。
  */
 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
 /**
  * TODO 注释:这是提交任务运行
  *   eventProcessLoop 就是当初 DAGScheduler 在初始化的时候,创建的一个 DAGSchedulerEventProcessLoop
  *   这个组件主要负责:任务的提交执行
  *   把 JobSubmitted 这个消息,放入了 eventQueue 队列中
  */
 eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))
 // TODO 注释: 返回结果对象的引用
 waiter

? ? ? ? 通过eventProcessLoop.post(...)将任务放入了eventQueue队列中,有通过eventProcessLoop.start()方法将任务提交。前面有见到以下代码是等待任务的提交,正好有对应上:

/**
 * TODO 注释: driver 中,初始化了一个  dagSchedudelr
 *   它里面又初始化了一个 eventThread 专门用来处理  JobSubmitted
 */
// Exposed for testing.
private[spark] val eventThread = new Thread(name) {
	setDaemon(true)
	
	override def run(): Unit = {
		try {
			while (!stopped.get) {
				// TODO 注释:获取消息
				// TODO 注释:一定要注意:当 sparkContext 还没有初始化好的时候,是不执行 sc.runJob 提交任务的。
				// TODO 注释:当执行 sc.runJob(sc) 的时候,就会提交 Job 到这儿来。
				val event = eventQueue.take()
				try {
					/**
					 * TODO 注释:根据事件的类型,调用不同的 handleXXX 方法来进行处理。
					 *   当接收到任务提交的时候: event = JobSubmitted
					 */
					onReceive(event)
				} catch {
					case NonFatal(e) => try {
						onError(e)
					} catch {
						case NonFatal(e) => logError("Unexpected error in " + name, e)
					}
				}
			}
		} catch {
			case ie: InterruptedException => // exit even if eventQueue is not empty
			case NonFatal(e) => logError("Unexpected error in " + name, e)
		}
	}
}

? ? ? ? 在这里通过onReceive进行任务的提交,任务提交给了DAGScheduler

2.Stage切分与提交

2.1.Stage切分

? ? ? ? 任务提交核心代码如下,包含了2个方面,Stage切分与Task分发与执行。在DAGScheduler.handleJobSubmitted(...)中有如下代码:

finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)

????????这个finalRDD就是rdd链条中的最后一个RDD,也就是触发sc.runJob()方法执行的RDD。必然是针对某个RDD调用了一个action算子才触发执行的,则该RDD就是finalRDD。

? ? ? ? 注意以下概念:

  • ShuffleMapStage ?+ ?ResultStage
  • ShuffleMapTask + ResultTask
  • ShuffleDependency + NarrowDependency

????????在createResultStage(...)方法,返回所有的ResultStage,其中包含了所有的父Stage。如下注释:

/**
?* TODO 注释: Stage 切分
?* ? 这个 finalRDD 就是 rdd链条中的最后一个 RDD,也就是触发 sc.runJob() 方法执行的RDD
?* ? 必然是针对某个 RDD 调用了一个 action 算子才触发执行的,则该 RDD 就是 finallRDD
?*
?* ? stage切分的核心方法:createResultStage
?* ? 返回的是一个 ResultStage对象,但是这个对象中,会包含他的所有的 父stage
?* ? 这样做的最大好处:容错!
?* ? spark DAG引擎:血脉关系 ?RDD之间的依赖被构建成了 dependency ,也被构建成了 stage 之间的依赖关系
?*/

? ? ? ? ?可查看下面的详细代码:

/**
 * TODO Create a ResultStage associated with the provided jobId.
 *  进行Stage切分的详细方法实现
 */
private def createResultStage(rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int,
	callSite: CallSite): ResultStage = {

	checkBarrierStageWithDynamicAllocation(rdd)
	checkBarrierStageWithNumSlots(rdd)
	checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)

	// TODO 注释:获得父stage,若没有shuffle则返回空List
	// TODO 注释:获取当前Stage的parent Stage,这个方法是划分Stage的核心实现
	// 所有的父类stage都已经构建完成并返回给parents。
	val parents = getOrCreateParentStages(rdd, jobId)

	// TODO 注释: finalStage=resultStage 的 stageID  这里返回的是最后一个stage的Id
	val id = nextStageId.getAndIncrement()

	// TODO 注释:创建当前最后的ResultStage
	// TODO 注释:parents 所有的 父 stage
	val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)

	// TODO 注释:将 ResultStage 与 stageId 相关联, 保存在 map 中
	// TODO 注释:stageIdToStage = new HashMap[Int, Stage]
	stageIdToStage(id) = stage

	// TODO 注释:更新该job中包含的stage
	updateJobIdStageIdMaps(jobId, stage)

	// TODO 注释:返回ResultStage
	stage
}

? ? ? ? 在上面的getOrCreateParentStages(rdd, jobId)代码,获取父Stage,若没有父Stage,则返回空List。所有的父类stage都已经构建完成并返回给parents。调用的方法是以下代码:

getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList

? ? ? ? 虽然只有一行代码,但是做了很多事情。

  1. 遍历 RDD 的依赖关系,找到第一个 ShuffleDependency (可能多个,也可能没有)。然后放入 HashSet 并返回
  2. 如果获取不到 ShuffleDependency,逻辑在此终止返回空 list
  3. 里面会创建当前 ShuffleDependency 的所有父ShuffleMapStage

? ? ? ? 1.getShuffleDependencies(rdd),这里的rdd是指finalRDD。其实这一步只会返回当前RDD的紧邻的父依赖Stage。具体的可以查看此方法的注释。

/**
 * Returns shuffle dependencies that are immediate parents of the given RDD.
 *
 * This function will not return more distant ancestors.
 * For example, if C has a shuffle dependency on B which has a shuffle dependency on A:
 * A <-- B <-- C
 * calling this function with rdd C will only return the B <-- C dependency.
 * This function is scheduler-visible for the purpose of unit testing.
 *
 */

? ? ? ? 2.getOrCreateShuffleMapStage(shuffleDep, firstJobId)这里用递归的方式,最终是将所有的Stage放在以下2个Map当中。

// TODO 注释:自增id对应stage
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
// TODO 注释:自增id对应ShuffleMapStage
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]

? ? ? ? 具体的可以看下这个方法:

/**
 * TODO 如果 shuffleIdToMapStage 中存在 shuffle,则获取 shuffle map stage。
 *  否则,如果 shuffle map stage 不存在,该方法将创建 shuffle map stage
 *  以及任何丢失的 parent shuffle map stage。
 *
 *  第一次来到这里传进来的是左到右首个shuffleDep,没有父stage,
 *  shuffleIdToMapStage也没有记录,会调 getMissingAncestorShuffleDependencies 并创建 stage
 *  之后进来可直接根据 shuffleIdToMapStage 提取到 ShuffleMapStage
 *
 * Gets a shuffle map stage if one exists in shuffleIdToMapStage.
 * Otherwise, if the shuffle map stage doesn't already exist,
 * this method will create the shuffle map stage in addition to any missing ancestor shuffle map stages.
 */
private def getOrCreateShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = {

	// TODO 注释: 根据每个 ShuffleDep 的 shuffleID 来获取 Stage 对象
	shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
		// TODO 注释:如果有,则直接返回
		case Some(stage) => stage

		// TODO 注释:如果还有 ShuffleDependency 没有构建 ShuffleMapStage 则创建一个
		// TODO 注释:如果这个stage没有构建
		// TODO 注释:第一件事:先把这个stage的所有父stage都构建出来
		// TODO 注释:然后再次构建当前stage
		case None =>
			// TODO 注释:给当前 stage 的所有父stage 创建 ShuffleMapStage
			// TODO 注释:举例:C <---B <-- A
			// 这里使用了递归。将所有的父依赖,即stage都放入
			// Create stages for all missing ancestor shuffle dependencies.
			getMissingAncestorShuffleDependencies(shuffleDep.rdd)
				.foreach { dep => // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
					// that were not already in shuffleIdToMapStage, it's possible that by the time we
					// get to a particular dependency in the foreach loop, it's been added to
					// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
					// SPARK-13902 for more information.
					if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
						createShuffleMapStage(dep, firstJobId)
					}
				}

			/**
			 * TODO
			 *   注释:为指定的 ShuffleDependency 创建一个 ShuffleMapStage
			 *      构建当前Stage。
			 */
			// Finally, create a stage for the given shuffle dependency.
			createShuffleMapStage(shuffleDep, firstJobId)
	}
}

总结如下:

1、createResultStage(传入finalRDD获得ResultStage) ->2
2、getOrCreateParentStages(传入rdd获得父stage) ->3->4
?? ?3、getShuffleDependencies(传入rdd获得宽依赖)
?? ?4、getOrCreateShuffleMapStage(传入宽依赖获得ShuffleMapStage) ->5->6
?? ??? ?5、getMissingAncestorShuffleDependencies(传入一个rdd获得所有宽依赖) ->3
?? ??? ?6、createShuffleMapStage(传入宽依赖获得ShuffleMapStage) ->2

2.2.Stage提交

? ? ? ? 核心代码:

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-05 17:25:10  更:2021-08-05 17:25:16 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 20:14:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码