IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 从源码角度分区Yarn Cluster任务提交流程 -> 正文阅读

[大数据]从源码角度分区Yarn Cluster任务提交流程

Yarn Cluster任务提交分为三个部分:

  1. 用户编写好的Spark应用程序提交到Yarn上(截止到ApplicationMaster启动Driver那一步)
  2. Driver对用户的应用程序进行App->Job->Stage->Task划分
  3. Driver分发Task到Executor上

首先,我将自己总结的详细流程介绍一下,

然后,从源码部分一步一步解释上面的具体实现。

Yarn Cluster任务提交流程详细流程

先来一张图:

在这里插入图片描述

图上不全面,但可以作为参考。

(1)用户编写好的Spark应用程序提交到Yarn上

  1. 用户执行spark下的spark-submit提交程序

  2. SparkSubmit里的main方法开启yarnClient进程

  3. 在yarnClient中,首先与向Yarn服务器发送请求,ResourceManager生成新的作业id后返回给yarnClient

  4. 在yarnClient中,将Application的上下文信息封装到containerContext,然后通过yarnClient.submitApplication提交到Yarn服务器

  5. 当ResourceManager收到yarnClien的submitApplciation() 的请求时, 就将该请求发给调度器(scheduler), 调度器分配 container, 然后ResourceManager在该container内启动ApplicationMaster进程

    这里注意Cluster模式下,container启动的是ExecutorLauncher进程,但不管是什么方式,它实际上都是运行的ApplicationMaster进程,只不过client模式下是从ExecutorLauncher的main方法启动ApplicationMaster,而cluster执行是启动直接ApplicationMaster进程,并且cluster模式还会在ApplicationMaster启动driver线程

  6. 在ApplicationMaster进程中,会开启一个名为Driver的线程(等Executor注册后才会执行)。此外,ApplicationMaster还会作为client向ResourceManager进行注册以申请资源(即申请Executor),在申请资源时,ApplicationMaster首先获取已分配的资源详情(这里有机架感知和最近距离原则),然后比较应用需要的executor个数和正在运行的executor个数,如果应用需要的executor个数大于正在运行的executor个数,则通过ResourceManager在合适的Container上启动对应数量的CoarseGrainedExecutorBackend进程,以满足程序运行的需要。

  7. 在CoarseGrainedExecutorBackend进程中,会通过setupEndpoint方法命名一个Executor通信终端,通过这个通信终端与Driver进行交互。它首先执行onStart方法反向注册到Driver上,然后Driver就知道有哪些Executor提供服务了。

  8. 当所有的Executor全部注册完成后,Driver线程开始执行用户提交的应用程序的main方法

(2)Driver对用户的应用程序进行App->Job->Stage->Task划分(主要是在DAGScheduler类中执行的)

  1. Driver线程执行用户提交的应用程序的main方法,它会把当前提交的作业封装成JobSubmitted对应放到eventQueue里,然后,会开启线程从eventQueue获取JobSubmitted,然后调用onReceive方法进行处理。

  2. 在onReceive的doOnReceive中执行dagScheduler.handleJobSubmitted去处理提交的作业。

  3. 在handleJobSubmitted里面会对job进行阶段划分,具体如下:
    创建一个ResultStage,同时根据宽依赖来创建ShuffleMapStage,具体做法是:获取RDD最近的一个宽依赖(从后往前搜索的),再根据宽依赖创建ShufferMapStage,如果除了最近上级之外,还有祖先的宽依赖,那么把祖先宽依赖ShufferMapStage也创建了。现在所有的阶段都创建完了,然后划分Task。

  4. 划分Task的做法是这样的:首先获取最前面的阶段(没有父阶段的阶段),根据当前阶段的类型来创建任务,如果是ShuffleMapStage,则创建ShuffleMapTask,如果是ResultStage,则创建ResultTask。
    创建Task的时候,并不是一个阶段创建一个Task,而是根据当前阶段最后一个RDD的分区数遍历,根据分区来创建Task的,每一个分区创建一个Task(看上面的partitionsToCompute),

  5. 创建完任务后放进集合里面去,然后做判断,如果任务数大于0,说明有Task需要处理,则会调用submitTask进行处理

  6. submitTasks中传入的参数是TaskSet,因此会先将所有的Task封装成TaskSet,然后在执行submitTasks提交任务,TaskSet包含的信息如下:

    • 有哪些Task
    • 这些Task所属于的阶段
    • Task的优先级
  7. submitTasks方法是如何执行的呢?即Driver如何分发Task到Executor上的,具体看下面:

(3)Driver分发Task到Executor上(主要是在DAGScheduler类中执行的)

submitTasks是抽象方法,具体实现是在TaskSchedulerImpl

  1. 创建了TaskSetManager对TaskSet进行封装,为了方便做资源调度

  2. 将封装好的TaskSetManager放到资源调度器中,driver上的任务调度器有两种类型:

    • 先进先出:FIFOSchedulableBuilder(默认根据FIFO进行任务调度)
    • 公平:FairSchedulableBuilder
    • 注意:这是Spark本身调度机制,即driver上任务调度器,Yarn的资源调度在启动ApplicationMaster和ExecutorBackend的时候就已经调度完了
  3. 调用reviveOffers方法(该方法是抽象方法,在driver端底层通信类的实现是CoarseGrainedSchedulerBackend),在reviveOffers中通过执行driverEndpoint.send(ReviveOffers)给Driver终端发送一条标记位ReviveOffers的消息

  4. Driver终端会通过receive方法(即CoarseGrainedSchedulerBackend的receive方法),匹配到ReviveOffers后通过makeOffers进行处理,具体处理过程如下:

    • 获取可用的Executor
    • 将Task进行序列化,然后封装成LaunchTask对象(LaunchTask是一个样例类),发送给合适的Executor
  5. Executor端接收Task(在Executor端底层通信的类是CoarseGrainedExecutorBackend),具体处理如下:

    • Executor接收到LaunchTask,进行反序列化
    • 从线程池中获取线程,根据任务任务类型的不同,从而执行不同的任务,任务分为ShuffleMapTask和ResultTask

?

第一部分:用户编写好的Spark应用程序提交到Yarn上

需要注意的是,跟踪源码的时候添加一下依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-yarn_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

下面为了便于分析,这里只列出关键步骤。tab符号代表进入到方法里面,用-、*、>、~代表不同的方法层级。

(1)用户编写好应用程序执行提交命令:

bin/spark-submit \
--class com.wxler.spark.WordCount \
--master yarn \
--deploy-mode cluster \
WordCount.jar \
/input \
/output

(2)底层运行 bin/java org.apache.spark.deploy.SparkSubmit "$@"

"$@"是把所有的参数拿过来

(3)运行SparkSubmit(全类名为org/apache/spark/deploy/SparkSubmit.scala

-main方法
	//对参数进行封装
	*val appArgs = new SparkSubmitArguments(args)
	//提交
	*submit(appArgs)
		//准备提交环境
		//childMainClass有两种取值
		//cluster模式:childMainClass = "org.apache.spark.deploy.yarn.Client"
		//client模式:	childMainClass = "com.wxler.spark.WordCount"
		>val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
		
		>doRunMain()
			&runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
				//根据字符串类型的全类名获取类对象
				~ Class.forName(childMainClass)
				//根据类对象获取当前类的main方法
				~val mainMethod = mainClass.getMethod("main")
				//调用指定类的主方法
				~mainMethod.invoke(null, childArgs.toArray)	
				//下面进入org.apache.spark.deploy.yarn.Client执行main方法

(4)运行Client(全类名为org.apache.spark.deploy.yarn.Client)

-main方法
	//对参数进行封装
	*val args = new ClientArguments(argStrings)
	//运行run方法
	*new Client(args, sparkConf).run() //此时会yarnClient = YarnClient.createYarnClient
		//发送请求给Yarn服务器,获取AppId
		>this.appId = submitApplication() 
			//yarnClient向Yarn服务发送请求,ResourceManager分配新的作业id
            &val newApp = yarnClient.createApplication()
            &val newAppResponse = newApp.getNewApplicationResponse()
            &appId = newAppResponse.getApplicationId()
            //创建containerContext对象,containerContext里面封装了运行Application的上下文环境
			//这里注意在不同的模式下启动进行不同
			//cluster模式:org.apache.spark.deploy.yarn.ApplicationMaster
			//client模式:org.apache.spark.deploy.yarn.ExecutorLauncher
			&val containerContext = createContainerLaunchContext(newAppResponse)
			&val appContext = createApplicationSubmissionContext(newApp, containerContext)
			//提交用户应用程序到Yarn
			yarnClient.submitApplication(appContext)
			//里面会和ResourceManager通讯,申请启动ApplicationMaster,随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster进程,在ApplicationMaster启动Driver线程,具体看org.apache.spark.deploy.yarn.ApplicationMaster

(5)运行ApplicationMaster(全类名为org.apache.spark.deploy.yarn.ApplicationMaster

-main方法
	//对参数参数进行封装
	*val amArgs = new ApplicationMasterArguments(args)
	// master里维护了AMRMClient
	*master = new ApplicationMaster(amArgs, new YarnRMClient)
	*master.run()
		>runDriver(securityMgr)
			&userClassThread = startUserApplication()
                //userClass即WordCount
                ~mainMethod = userClassLoader.loadClass(args.userClass).getMethod("main")
                //开启driver线程
                ~userThread = new Thread
					#mainMethod.invoke
			&registerAM()
                //client(AM)注册到RM上,申请可用资源
                ~allocator=client.register
                //分配资源	
				~allocator.allocateResources()
					//获取需要多少资源
					+val allocatedContainers = allocateResponse.getAllocatedContainers()
            		//分配资源(也有最近距离原则和机架感知)
            		+ handleAllocatedContainers(allocatedContainers)
            			@runAllocatedContainers(containersToUse)
            				//判断是否需要启动Executor
            				>if (numExecutorsRunning < targetNumExecutors) 
            					>ExecutorRunnable.run
            						>startContainer()
            							>prepareCommand()
            								>bin/java org.apache.spark.executor.CoarseGrainedExecutorBackend (粗粒度的Executor)
            				
            //注意这里的join,阻塞主线程
            &userClassThread.join()
            	

targetNumExecutors是应用需要的executor个数,numExecutorsRunning是正在运行的executor个数。targetNumExecutors的个数可以由--num-executors(默认为2)指定。

(6)运行CoarseGrainedExecutorBackend(全类名org.apache.spark.executor.CoarseGrainedExecutorBackend

-main方法
	*run
		//Executor是ExecutorBackend进程上的一个通信终端名称
		env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend) 
			//在endpoint里面,通过执行onStart方法反向注册到Driver上,然后Driver就知道有哪些Executor提供服务了
			onStart
				logInfo("Connecting to driver: " + driverUrl)
				rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap{...}

再补充一点,driver收到executor的消息后会给excutor回复(具体在CoarseGrainedSchedulerBackend类里)

Executor收到Driver的回复,这就表示注册成功了,然后创建Executor对象,等drive将任务发过来后来执行task(具体在CoarseGrainedExecutorBackend)。

到这里,我们已经把所有需要的Executor都启动完了,并且开启了driver线程来运行用户程序,下面是Driver对用户的应用程序进行App->Job->Stage->Task划分

第二部分:Driver对用户的应用程序进行App->Job->Stage->Task划分

这里就不一一贴上代码了

一旦遇到一个行动算子,就会提交一个job,即执行runJob操作,它会把当前提交的作业封装成JobSubmitted对应放到eventQueue里,然后,会开启线程从eventQueue获取JobSubmitted进行处理,然后调用onReceive方法进行处理。

onReceive的doOnReceive中执行dagScheduler.handleJobSubmitted去处理提交的作业。

在handleJobSubmitted里面会对job进行阶段划分,具体如下:

创建一个ResultStage,同时根据宽依赖来创建ShuffleMapStage,具体做法是:获取RDD最近的一个宽依赖(从后往前搜索的),再根据宽依赖创建ShufferMapStage,如果除了最近上级之外,还有祖先的宽依赖,那么把祖先宽依赖ShufferMapStage也创建了。现在所有的阶段都创建完了,然后划分Task。

首先获取最前面的阶段(没有父阶段的阶段),根据当前阶段的类型来创建任务,如下:

//在DAGScheduler类下:
stage match {
case stage: ShuffleMapStage =>
  partitionsToCompute.map { id =>
	val locs = taskIdToLocations(id)
	val part = stage.rdd.partitions(id)
	new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
	  taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
	  Option(sc.applicationId), sc.applicationAttemptId)
  }

case stage: ResultStage =>
  partitionsToCompute.map { id =>
	val p: Int = stage.partitions(id)
	val part = stage.rdd.partitions(p)
	val locs = taskIdToLocations(id)
	new ResultTask(stage.id, stage.latestInfo.attemptId,
	  taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
	  Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
  }
}

如果是ShuffleMapStage,则创建ShuffleMapTask,如果是ResultStage,则创建ResultTask。

创建Task的时候,并不是一个阶段创建一个Task,而是根据当前阶段最后一个RDD的分区数遍历,根据分区来创建Task的,每一个分区创建一个Task(看上面的partitionsToCompute),

创建完任务后放进集合里面去,然后做判断,如果任务数大于0,说明有Task需要处理,则会调用submitTask进行处理(如下):

//在DAGScheduler.scala类下:
if (tasks.size > 0) {
  logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
  stage.pendingPartitions ++= tasks.map(_.partitionId)
  logDebug("New pending partitions: " + stage.pendingPartitions)
  taskScheduler.submitTasks(new TaskSet(
	tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}

submitTasks中传入的参数是TaskSet,因此会先将所有的Task封装成TaskSet,然后在执行submitTasks提交任务,TaskSet包含的信息如下:

private[spark] class TaskSet(
	val tasks: Array[Task[_]], //有哪些Task
	val stageId: Int,  //这些Task的阶段
	val stageAttemptId: Int, 
	val priority: Int,   //Task的优先级
	val properties: Properties) {
  val id: String = stageId + "." + stageAttemptId

  override def toString: String = "TaskSet " + id
}

上面这些操作都是在Driver端进行的,具体如何将任务分发到Executor执行呢?看下面Driver分发Task到Executor上,其实就是看submitTasks里面的执行细节

第三部分:Driver分发Task到Executor上

(1)Driver提交Task到Executor(在org.apache.spark.scheduler.DAGScheduler类里面)

-执行submitMissingTasks方法
	//每一个Task对应处理一个分区的数据,将多个Task放到TaskSet中进行提交
	//该方法是抽象方法,具体实现在org.apache.spark.scheduler.TaskSchedulerImpl
	*taskScheduler.submitTasks(new TaskSet)
		//在Task之前,创建了TaskSetManager对TaskSet进行封装,为了方便做资源调度
		>val manager = createTaskSetManager(taskSet, maxTaskFailures)
		//将封装好的TaskSetManager放到资源调度器中【这是Spark本身调度机制,即driver上任务调度器,Yarn的资源调度在启动ApplicationMaster和Executor的时候就已经调度完了】
		//manager.taskSet.properties有两种
		//先进先出: org.apache.spark.scheduler.FIFOSchedulableBuilder(默认根据FIFO进行任务调度)
		//公平:org.apache.spark.scheduler.FairSchedulableBuilder
		>schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
		
		//触发底层资源调度,该方法是抽象方法,在driver端底层通信的类是CoarseGrainedSchedulerBackend
		>backend.reviveOffers()
			//给Driver终端发送一条标记位ReviveOffers的消息,【一般是Task提交的时候,或者是资源变化的时候,会发送给driver重新做一些调度】
			&driverEndpoint.send(ReviveOffers)
			&Driver终端会通过receive的方法,接收消息并对其进行处理
				#case ReviveOffers =>makeOffers()	
					//(下面两行)获取可用的Executor
					~val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
					~val workOffers = activeExecutors.map
					
					//scheduler.resourceOffers(workOffers) 决定Task应该交给哪个Executor处理【具体分配到哪个Executor我们不用关心,现在关心的是怎么提交到Executor上去大的】
					~运行Task launchTasks(scheduler.resourceOffers(workOffers))
						//因为要将Task提交到Executor运行,所以需要进行序列化
						+val serializedTask = ser.serialize(task)
						//给Executor终端发送Task
						+executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

(2)Executor端接收Task并运行(在Executor端底层通信的类是CoarseGrainedExecutorBackend

-receive
	//对接收到的消息类型进行匹配,匹配LaunchTask
	*case LaunchTask(data)
		//对接收到的Task进行反序列化
		>val taskDesc = ser.deserialize[TaskDescription](data.value)
		//运行Task
		>executor.launchTask
			//从线程池中获取线程,执行task
			&threadPool.execute(tr)
				#TaskRunner.run
					~task.run
						+runTask(context)
							//根据任务类型的不同,从而执行不同的任务
							++ShuffleMapTask
							++ResultTask

思考:为什么要分不同的Task呢?
因为根据阶段的不同,运行的Task也不一样,比如一个job分两个阶段:stage0和stage1,中间要经过shuffle,
stage0处理后磁盘写数据,stage1向磁盘读数据进行处理,所以它们完成的功能不一样。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-03 11:16:33  更:2021-08-03 11:17:31 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/22 4:24:02-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码