IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Yarn Cluster模式下 spark提交流程和源码解析 -> 正文阅读

[大数据]Yarn Cluster模式下 spark提交流程和源码解析

这里是Yarn的Cluster模式,还有Yarn的Client模式以及StandAlone的Cluster和Client模式,这里先看最经典的;

Yarn-Cluster模式:

Cluster 模式将用于监控和调度的 Driver 模块启动在 Yarn 集群资源中执行。一般应用于实际生产环境。
   
1) 执行脚本提交任务,实际是启动一个 SparkSubmit 的 JVM 进程;
2) SparkSubmit 类中的 main 方法反射调用 YarnClusterApplication 的 main 方法;
3) YarnClusterApplication 创建 Yarn 客户端(Client类中封装的yarnClient对象),然后向 RM发送执行指令:bin/java ApplicationMaster4)随后 ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster
5ApplicationMaster 启动 Driver 线程,执行用户的作业;
6) AM 向 RM 注册,申请资源;
7) 获取资源后 AM 向 NM 发送指令:bin/java YarnCoarseGrainedExecutorBackend;启动ExecutorBackend8Executor启动后会向Driver进行反向注册,实例化Executor对象,等待任务;    
9Executor全部注册完成后Driver开始执行main函数;之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。

在这里插入图片描述

源码解析:

源码分析

1.1、SparkSubmit起点

  def main(args: Array[String]): Unit = {
    val submit = new SparkSubmit()
    submit.doSubmit(args)
  }

Spark-submit是一个提交程序;在main方法中val submit = new SparkSubmit()类;

submit.doSubmit(args) ; args为命令行参数;比如–master、–class等;

那么这些参数是如何被使用的呢?

doSubmit中有一个步骤appArgs = parseArguments(args);

parseArguments方法中只有一步,就是构建一个SparkSubmitArguments.Scala对象,也即上图中的橙色框内容;

SparkSubmitArguments对象中有一个方法parse(args.asJava),也即这个对象会把传入的参数进行解析,解析方式为正则表达式;会把传入的args拆分成opt和value两部分;也即指令和内容两部分,然后进行一个匹配操作(handle方法),完成操作的解析;

在appArgs = parseArguments(args)这一步下还有一个步骤:

	val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }

这里的action是从哪里得到的呢?

还是在SparkSubmitArguments.Scala类中有以下这么一个操作:

action = Option(action).getOrElse(SUBMIT) //第一次提交的时候,这里会返回一个Submit,也即appArgs.action = Submit;
    //上面那一步自然会执行case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) //开始提交;
    //此时,这里的submit方法中会先做一个判断
    if (args.isStandaloneCluster && args.useRest) {
        ......
    }
    else {
      doRunMain()
    }

//其中doRunMain()方法也有一个判断:
if (args.proxyUser != null) { //命令行参数是否有代理用户;这里是没有传入的;
    
}
else {
        runMain(args, uninitLog)
}       

//进入runMain方法中:
 private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) //注意,这里的 childMainClass非常重要;
    // Let the main class re-initialize the logging system once it starts.
    if (uninitLog) {
      Logging.uninitialize()
    }
     //val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)非常重要
     

1.2、向Yarn提交应用

 val loader = getSubmitClassLoader(sparkConf)
    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }

    var mainClass: Class[_] = null

    try {
      mainClass = Utils.classForName(childMainClass) //反射机制;通过childMainClass类名获取类的信息;
    } catch {
      ......
    }

    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication] //判断mainClass是否继承至SparkApplication类,如果继承了,则直接通过mainClass的构造器创建实例并转化为SparkApplication类;如果没有继承,则创建一个JavaMainApplication(mainClass)对象;          
    } else {
      new JavaMainApplication(mainClass)
    }

......
    
    try {
      app.start(childArgs.toArray, sparkConf) //不论是哪种结果,这里都会start对应的application;
    } catch {
      case t: Throwable =>
        throw findCause(t)
    }

//接下来分析childMainClass到底是什么;(实际上是之前提到的prepareSubmitEnvironment(args)方法的返回值)

接下来分析prepareSubmitEnvironment方法:

//prepareSubmitEnvironment方法:

//方法返回值是:(childArgs, childClasspath, sparkConf, childMainClass)

//找到对应的代码
val clusterManager: Int = args.master match { //这里就是判断集群环境的代码;
      case "yarn" => YARN 
      case m if m.startsWith("spark") => STANDALONE
      case m if m.startsWith("mesos") => MESOS
      case m if m.startsWith("k8s") => KUBERNETES
      case m if m.startsWith("local") => LOCAL 
      case _ =>
        error("Master must either be yarn or start with spark, mesos, k8s, or local")
        -1
    }

......
    
if (isYarnCluster) { //714行
      childMainClass = YARN_CLUSTER_SUBMIT_CLASS
          //这个值实际上就是:YARN_CLUSTER_SUBMIT_CLASS =
    "org.apache.spark.deploy.yarn.YarnClusterApplication"
      ......
          
    }
//所以之前我们要找的childMainClass就是org.apache.spark.deploy.yarn.YarnClusterApplication;这里可以通过引入Yarn的依赖spark-yarn_2.12后查找:
//以下是查找到的代码
private[spark] class YarnClusterApplication extends SparkApplication {

  override def start(args: Array[], :  Unit = {
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    conf.remove(JARS)
    conf.remove(FILES)

    new Client(new ClientArguments(args), conf, null).run() //这里会创建一个客户端对象;
  }

}
//所以根据之前的代码mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]会创建一个YarnClusterApplication对象;(即图中下部橙框部分内容)

在这里插入图片描述

1.3、Yarn集群分析:

//继续衔接刚才的代码:
//Client类中有一个非常重要的变量:
    private val yarnClient = YarnClient.createYarnClient
        //进入YarnClient.createYarnClient方法中:
    public static YarnClient createYarnClient() {
        YarnClient client = new YarnClientImpl();
        return client;
    }
//进入YarnClientImpl类:
//这里可以看到一个变量创建:
protected ApplicationClientProtocol rmClient; //(Resource Manager Client)

//所以此时我们的yarnClient和rmClient都创建完成了;
//记下来继续关注之前Client处的代码:
new Client(new ClientArguments(args), conf, null).run()//进入run方法;
    
    def run(): Unit = {
    this.appId = submitApplication() //提交应用程序,返回全局Yarn的应用ID,后续可以通过这个ID查看和操作Yarn;
    if (!launcherBackend.isConnected() && fireAndForget) {
      val report = getApplicationReport(appId)
      val state = report.getYarnApplicationState
      logInfo(s"Application report for $appId (state: $state)")
      logInfo(formatReportDetails(report))
      if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
        throw new SparkException(s"Application $appId finished with status: $state")
      }
    } else {
      val YarnAppReport(appState, finalState, diags) = monitorApplication(appId)
      if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
        diags.foreach { err =>
          logError(s"Application diagnostics message: $err")
        }
        throw new SparkException(s"Application $appId finished with failed status")
      }
      if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {
        throw new SparkException(s"Application $appId is killed")
      }
      if (finalState == FinalApplicationStatus.UNDEFINED) {
        throw new SparkException(s"The final status of application $appId is undefined")
      }
    }
  }

//进入submitApplication()方法: (第一步)
    ResourceRequestHelper.validateResources(sparkConf)

    var appId: ApplicationId = null
    try {
      launcherBackend.connect()
      yarnClient.init(hadoopConf)
      yarnClient.start()  //这里其实就是在启动yarnClient;
          ......
       // Get a new application from our RM
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      appId = newAppResponse.getApplicationId() //获得rmApplication
          ......
       // Set up the appropriate contexts to launch our AM
      val containerContext = createContainerLaunchContext(newAppResponse) //这里面主要是yarnClient传输过来的一些指令,rmClient根据这些指令去节点上运行NodeManager;(第二步)
      val appContext = createApplicationSubmissionContext(newApp, containerContext)   //创建容器的启动环境和提交环境
          ......
      // Finally, submit and monitor the application
      logInfo(s"Submitting application $appId to ResourceManager")    
      yarnClient.submitApplication(appContext) //建立rmClient和YarnClient之间的连接
      launcherBackend.setAppId(appId.toString)
      reportLauncherState(SparkAppHandle.State.SUBMITTED)

      appId //返回appID;
    }

//简单剖析一下图中第二步操作:
 val containerContext = createContainerLaunchContext(newAppResponse)
    
     //1000行左右有这么一步:
     val commands = prefixEnv ++
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++ amArgs ++
      Seq(
        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
     //可以看到这里的"/bin/java"是要启动了一个java进程;
     // TODO: it would be nicer to just make sure there are no null commands here
      val printableCommands = commands.map(s => if (s == null) "null" else s).toList
      amContainer.setCommands(printableCommands.asJava)
     //这里的commands会被包装:
     ......
     amContainer //返回amContainer,之后会发送给ResourceManager,之后RM会去节点上运行封装的这些指令,开启NodeManager;
     

在这里插入图片描述

ApplicationMaster启动

//进入org.apache.spark.deploy.yarn.ApplicationMaster的伴生对象中找到main方法:
def main(args: Array[String]): Unit = {
    SignalUtils.registerLogger(log)
    val amArgs = new ApplicationMasterArguments(args) //把命令行参数进行封装;
    val sparkConf = new SparkConf()
    if (amArgs.propertiesFile != null) {
      Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
        sparkConf.set(k, v)
      }
    }
    
    //进入ApplicationMasterArguments(args),有这么一个方法:private def parseArgs(inputArgs: List[String]): Unit = {...}
   //与之前一样,这里还是按照模式匹配的方式,对args进行解析;
    
    //回到ApplicationMaster的分析中:
    //在main方法之下,有
    val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
    master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
    
        //进入ApplicationMaster对象中:
        //找到内容:
        private val client = new YarnRMClient() //这其实就是连接RM和ApplicationMaster的客户端;(图中有展示)
        
    //回到ApplicationMaster的分析中:
        ugi.doAs(new PrivilegedExceptionAction[Unit]() {
      override def run(): Unit = System.exit(master.run()) 
    })
  }
//进入run方法中:
//263行:
     if (isClusterMode) {
        runDriver() //关键代码
      } else {
        runExecutorLauncher()
      }

		//进入runDriver();
private def runDriver(): Unit = {
    addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
    userClassThread = startUserApplication() //启动一个应用程序,结合下方的ThreadUtils.awaitResult,这里必须要准备好
sparkContext的环境对象,否则进程阻塞无法往后走;
    // This a bit hacky, but we need to wait until the spark.driver.port property has
    // been set by the Thread executing the user class.
    logInfo("Waiting for spark context initialization...")
    val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
    try {
      val sc = ThreadUtils.awaitResult(sparkContextPromise.future, //线程的阻塞功能,等待sparkContext结果;
        Duration(totalWaitTime, TimeUnit.MILLISECONDS))
      if (sc != null) {
        val rpcEnv = sc.env.rpcEnv

        val userConf = sc.getConf
        val host = userConf.get(DRIVER_HOST_ADDRESS)
        val port = userConf.get(DRIVER_PORT)
        registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)

        val driverRef = rpcEnv.setupEndpointRef(
          RpcAddress(host, port),
          YarnSchedulerBackend.ENDPOINT_NAME)
        createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
      } else {
        // Sanity check; should never happen in normal operation, since sc should only be null
        // if the user app did not create a SparkContext.
        throw new IllegalStateException("User did not initialize spark context!")
      }
      resumeDriver()
      userClassThread.join()
    }
    
    //进入startUserApplication():
    private def startUserApplication(): Thread = {
    logInfo("Starting the user application in a separate Thread")
       ...
    val mainMethod = userClassLoader.loadClass(args.userClass) //主方法;userClass实际上是ApplicationMasterArguments类的一个变量,搜索定位会发现其实这里对应的就是前面的"--Class"的内容;
      .getMethod("main", classOf[Array[String]]
       
    val userThread = new Thread {
      override def run(): Unit = {   
        try {
          if (!Modifier.isStatic(mainMethod.getModifiers)) { //判断main方法是不是静态的;
            logError(s"Could not find static main method in object ${args.userClass}") //不是静态的会报错;
            finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
          } else {
            mainMethod.invoke(null, userArgs.toArray) //如果是静态的,则这里会调用main方法;执行这个方法之后,SparkContext对象会开始初始化,然后之前的线程阻塞就是结束,继续往下走;
            finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
            logDebug("Done running user class")
          }
            ......
       }
    }
                 
    userThread.setContextClassLoader(userClassLoader)
    userThread.setName("Driver") //这个线程的名字就是Driver;
    userThread.start() //第三步;这里会激活override def run(): Unit = {   的run方法;
    userThread    
                 

在这里插入图片描述

黄色的Driver表示线程、绿色的ApplicationMaster表示进程;

//接着前面runDriver方法中线程阻塞后的代码内容:
try {
      val sc = ThreadUtils.awaitResult(sparkContextPromise.future, //线程的阻塞功能,等待sparkContext结果;
        Duration(totalWaitTime, TimeUnit.MILLISECONDS))
      if (sc != null) {
        val rpcEnv = sc.env.rpcEnv //通信环境;

        val userConf = sc.getConf
        val host = userConf.get(DRIVER_HOST_ADDRESS)
        val port = userConf.get(DRIVER_PORT)
        registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId) //第四步:注册AM,申请资源;

        val driverRef = rpcEnv.setupEndpointRef(
          RpcAddress(host, port),
          YarnSchedulerBackend.ENDPOINT_NAME)
        createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf) //创建分配器;
      } else {
        // Sanity check; should never happen in normal operation, since sc should only be null
        // if the user app did not create a SparkContext.
        throw new IllegalStateException("User did not initialize spark context!")
      }
      resumeDriver()
      userClassThread.join()
    }

//进入createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)方法;
//有这么一行代码:
allocator.allocateResources() //第五步:Yarn返回可以使用的资源;
    
    //进入allocator.allocateResources()方法:
    val allocateResponse = amClient.allocate(progressIndicator) //返回可用信息;

    val allocatedContainers = allocateResponse.getAllocatedContainers() //返回可用容器
    
    allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)

    if (allocatedContainers.size > 0) { //如果可分配容器数量大于0
      logDebug(("Allocated containers: %d. Current executor count: %d. " +
        "Launching executor count: %d. Cluster resources: %s.")
        .format( //format函数;
          allocatedContainers.size,
          runningExecutors.size,
          numExecutorsStarting.get,
          allocateResponse.getAvailableResources))

      handleAllocatedContainers(allocatedContainers.asScala) //关键代码
    }

//进入handleAllocatedContainers(allocatedContainers.asScala)方法:
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = { //对可用的containers进行分配;
    val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)

    // Match incoming requests by host(同一台主机)
        ......
        
    // Match remaining by rack(机架). Because YARN's RackResolver swallows thread interrupts   //根据rack分配
        ......
        
        ......//(分配好之后)
        
        runAllocatedContainers(containersToUse) //运行已分配的容器 //481行
        
        //进入runAllocatedContainers(containersToUse)方法:
        private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
    for (container <- containersToUse) { //遍历containers
        ......
        
      if (runningExecutors.size() < targetNumExecutors) { 
          //如果运行的Executors < 目标的Executors,会启动containers;
        numExecutorsStarting.incrementAndGet()
        if (launchContainers) {
          launcherPool.execute(() => { //线程池;
            try {
              new ExecutorRunnable(  //启动Executors;
                Some(container),
                conf,
                sparkConf,
                driverUrl,
                executorId,
                executorHostname,
                executorMemory,
                executorCores,
                appAttemptId.getApplicationId.toString,
                securityMgr,
                localResources,
                ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported
              ).run()
              ......
                  //进入run()方法;
  def run(): Unit = {
    logDebug("Starting Executor Container")
    nmClient = NMClient.createNMClient() //创建nmClient,也即开始寻找对应NM节点;
    nmClient.init(conf) //初始化;
    nmClient.start() //启动nmClinet;
    startContainer() //启动容器;(传入环境信息)
  }
        
   //启动容器是要做什么呢?             
   //进入startContainer()方法:
                def startContainer(): java.util.Map[String, ByteBuffer] = {
                    ......
                    val commands = prepareCommand() //101行; //准备指令;
                        
                    ctx.setCommands(commands.asJava) //传入指令;
                    ......
                     try {
                      nmClient.startContainer(container.get, ctx) //124行; //启动容器;
                    }
                    ......
   //传入的是哪些指令呢? 
   //进入prepareCommand():
   	YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) //203行;
    val commands = prefixEnv ++
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ //这里又是一个/bin/java
      javaOpts ++
      Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend", //(第六步)这一行内容很重要;/bin/java启动的这个线程是Executer通信后台;
        "--driver-url", masterAddress,
        "--executor-id", executorId,
        "--hostname", hostname,
        "--cores", executorCores.toString,
        "--app-id", appId,
        "--resourceProfileId", resourceProfileId.toString) ++
                      

接下来看最后两步
在这里插入图片描述

//承接第六步中的org.apache.spark.executor.YarnCoarseGrainedExecutorBackend;搜索对应字条,进入对应伴生对象中;
def main(args: Array[String]): Unit = { //Backend指后台;
    val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
      CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
      new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
        arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
        arguments.resourcesFileOpt, resourceProfile)
    }
    val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
      this.getClass.getCanonicalName.stripSuffix("$"))
    CoarseGrainedExecutorBackend.run(backendArgs, createFn) //重点关注这一行代码,创建的后台对象和参数都传入了这里;
    System.exit(0)
  }
//进入CoarseGrainedExecutorBackend.run(backendArgs, createFn)方法:
def run(
    arguments: Arguments,
    backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
      CoarseGrainedExecutorBackend): Unit = { //后面会用到backendCreateFn对象;注意输出值是CoarseGrainedExecutorBackend,而CoarseGrainedExecutorBackend是new一个YarnCoarseGrainedExecutorBackend得到的;
    ......
    val executorConf = new SparkConf
      val fetcher = RpcEnv.create( //在通信环境中,建立于Driver的连接;
        "driverPropsFetcher",
        arguments.bindAddress,
        arguments.hostname,
        -1,
        executorConf,
        new SecurityManager(executorConf),
          ......
      val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
        arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false) //创建Executor环境    
          
       env.rpcEnv.setupEndpoint("Executor", //Endpoint表示终端; //在通信环境中安装一个通信终端;
        backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))    //结合前面的解释,这里的backendCreateFn是一个YarnCoarseGrainedExecutorBackend的对象,并作为终端,所以在图中YarnCoarseGrainedExecutorBackend其实就是Executor终端;
        
          //看一下setupEndpoint到底在做什么:
  override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
    dispatcher.registerRpcEndpoint(name, endpoint) //注册rpc的通信终端,参数为名称("Executor")和终端对象;
  }
        
        //进入registerRpcEndpoint方法:
  def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    val addr = RpcEndpointAddress(nettyEnv.address, name) //通信地址;
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) //通信引用;
        ......
    var messageLoop: MessageLoop = null //消息循环器;
      try {
        messageLoop = endpoint match { 
          case e: IsolatedRpcEndpoint =>
            new DedicatedMessageLoop(name, e, this) //进行匹配,终端匹配成功的话,会创建一个消息循环器;
          case _ =>
            sharedLoop.register(name, endpoint)
            sharedLoop
        }
        endpoints.put(name, messageLoop)
      } catch {
        case NonFatal(e) =>
          endpointRefs.remove(endpoint)
          throw e
      }
    }
    endpointRef
  }    
        
        //进入DedicatedMessageLoop(name, e, this)查看消息循环器到底是什么?
private class DedicatedMessageLoop(
    name: String,
    endpoint: IsolatedRpcEndpoint,
    dispatcher: Dispatcher)
  extends MessageLoop(dispatcher) { //继承至MessageLoop;

  private val inbox = new Inbox(name, endpoint) //创建了一个Inbox,后面会讲到;

  override protected val threadpool = if (endpoint.threadCount() > 1) { //线程池;
    ThreadUtils.newDaemonCachedThreadPool(s"dispatcher-$name", endpoint.threadCount())
  } else {
    ThreadUtils.newDaemonSingleThreadExecutor(s"dispatcher-$name")
  }
      ......

          //进入Inbox中查看:
private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
  extends Logging { //收件箱;

  inbox =>  // Give this an alias so we can use it more clearly in closures.

  @GuardedBy("this")
  protected val messages = new java.util.LinkedList[InboxMessage]() //信息;

  /** True if the inbox (and its associated endpoint) is stopped. */
  @GuardedBy("this")
  private var stopped = false

  /** Allow multiple threads to process messages at the same time. */
  @GuardedBy("this")
  private var enableConcurrent = false

  /** The number of threads processing messages for this inbox. */
  @GuardedBy("this")
  private var numActiveThreads = 0

  // OnStart should be the first message to process
  inbox.synchronized {
    messages.add(OnStart) //这里发送的消息就是OnStart;发送给自己;这里是什么意思呢,其实RpcEndpoint作为一个通信终端,有参数constructor -> onStart -> receive -> onStop;表示一个通信周期;
  }
          ......
              
              //进入之前讲过的CoarseGrainedExecutorBackend类中:
private[spark] class CoarseGrainedExecutorBackend( 
                ......
  override def onStart(): Unit = { //这里有一个对应的onStart方法
    logInfo("Connecting to driver: " + driverUrl)
    try {
      _resources = parseOrFindResources(resourcesFileOpt)
    } catch {
      case NonFatal(e) =>
        exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
    }
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref) //得到了driver;
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
        extractAttributes, _resources, resourceProfile.id)) //第七步:注册Executor; //这里通过driver向连接的Driver发送了一个注册Executor的请求;
      }(ThreadUtils.sameThread).onComplete {
      case Success(_) =>
        self.send(RegisteredExecutor)
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }
              
              //根据第七步,Driver收到了注册Executor的消息,Driver本身是一个线程,所以是由他的环境对象SparkContext接收:
             //搜索SparkContext:进入其Scala类;
              //在205行有一行代码:private var _schedulerBackend: SchedulerBackend = _,这个对象是一个通信后台;
              //进入这个对象,CRTL + H找到它的集群模式;CoarseGrainedSchedulerBackend:
              //140行:override def onStart(): Unit = {
              //149行:override def receive: PartialFunction[Any, Unit] = {
              //205行:
    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { //接收到了注册Executor的消息;

      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
          attributes, resources, resourceProfileId) =>
          ......
          context.reply(true) //第八步:返回注册成功;
        }
              
             //回到之前讲过的CoarseGrainedExecutorBackend类中:
              
              //95行处:
    }(ThreadUtils.sameThread).onComplete {
      case Success(_) =>
        self.send(RegisteredExecutor) //给自己发送一个RegisteredExecutor消息;(记住CoarseGrainedExecutorBackend实际上就是我们图中的YarnCoarseGrainedExecutorBackend,即终端对象;
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }
              //147行处:
  override def receive: PartialFunction[Any, Unit] = {
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver") //自己接收到了RegisteredExecutor消息;
      try {
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false, //第九步:终端中创建Executor;
          resources = _resources)
        driver.get.send(LaunchedExecutor(executorId)) //发送LaunchedExecutor(executorId)消息;
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }
              //回到刚才的class CoarseGrainedSchedulerBackend类中,搜索LaunchedExecutor:
      //198行:(receive方法中的字段)
      case LaunchedExecutor(executorId) =>
        executorDataMap.get(executorId).foreach { data =>
          data.freeCores = data.totalCores  //添加核数;
        }
        makeOffers(executorId) //进行操作;                 

在这里插入图片描述

//回到最初的ApplicationMaster.Scala中:
//搜索runDriver方法:
private def runDriver(): Unit = {
    ......//(包含之前的创建线程、等待过程等步骤)这里是环境准备和资源调度过程;
   resumeDriver() //这里就是让Driver线程继续执行,即开始逻辑计算(即开始SparkContext后的代码逻辑);这里是计算过程;
   userClassThread.join()
        ......
        //从源码中可以看出,申请资源和计算逻辑是两条线进行的;

后续更新Client模式下的分析;

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-06 09:53:50  更:2021-08-06 09:56:51 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年10日历 -2025/10/25 5:56:57-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码