IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark CoarseGrainedExecutorBackend 启动流程 -> 正文阅读

[大数据]Spark CoarseGrainedExecutorBackend 启动流程

我的原创地址:https://dongkelun.com/2020/12/26/sparkCoarseGrainedExecutorBackendStartedProcess/

前言

最近在进行Spark任务调度的源码学习,最开始对CoarseGrainedExecutorBackend的启动流程不是很清楚,所以带着这个疑问继续深入学习,终于弄清楚了
CoarseGrainedExecutorBackend是如何启动的,并且对Spark任务调度源码的了解更深入了一点。本篇主要是带着这个疑问以standalone模式总结一下
CoarseGrainedExecutorBackend启动的主要的流程,并不对每一部分的源码进行深入详细的总结。

SparkContext 初始化

首先从SparkContext的初始化开始,一般我们写Spark代码也是首先创建SparkContext。其中会执行:

    // Create and start the scheduler 这个master是在sparkSubmit.Main方法得到
    // 返回(StandaloneSchedulerBackend,TaskSchedulerImpl)
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts

createTaskScheduler 根据master模式匹配判断返回

/**
   * Create a task scheduler based on a given master URL.
   * Return a 2-tuple of the scheduler backend and the task scheduler.
   */
  private def createTaskScheduler(
      sc: SparkContext,
      master: String,
      deployMode: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    // Ensure that default executor's resources satisfies one or more tasks requirement.
    // This function is for cluster managers that don't set the executor cores config, for
    // others its checked in ResourceProfile.
    def checkResourcesPerTask(executorCores: Int): Unit = {
      val taskCores = sc.conf.get(CPUS_PER_TASK)
      if (!sc.conf.get(SKIP_VALIDATE_CORES_TESTING)) {
        validateTaskCpusLargeEnough(sc.conf, executorCores, taskCores)
      }
      val defaultProf = sc.resourceProfileManager.defaultResourceProfile
      ResourceUtils.warnOnWastedResources(defaultProf, sc.conf, Some(executorCores))
    }

    master match {
      case "local" =>
        checkResourcesPerTask(1)
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)
        ......
      // standalone模式
      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_CLUSTER_REGEX(numWorkers, coresPerWorker, memoryPerWorker) =>
        checkResourcesPerTask(coresPerWorker.toInt)
        // Check to make sure memory requested <= memoryPerWorker. Otherwise Spark will just hang.
        val memoryPerWorkerInt = memoryPerWorker.toInt
        if (sc.executorMemory > memoryPerWorkerInt) {
          throw new SparkException(
            "Asked to launch cluster with %d MiB RAM / worker but requested %d MiB/worker".format(
              memoryPerWorkerInt, sc.executorMemory))
        }

        // For host local mode setting the default of SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED
        // to false because this mode is intended to be used for testing and in this case all the
        // executors are running on the same host. So if host local reading was enabled here then
        // testing of the remote fetching would be secondary as setting this config explicitly to
        // false would be required in most of the unit test (despite the fact that remote fetching
        // is much more frequent in production).
        sc.conf.setIfMissing(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, false)

        val scheduler = new TaskSchedulerImpl(sc)
        val localCluster = new LocalSparkCluster(
          numWorkers.toInt, coresPerWorker.toInt, memoryPerWorkerInt, sc.conf)
        val masterUrls = localCluster.start()
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
          localCluster.stop()
        }
        (backend, scheduler)

其中TaskScheduler目前只有TaskSchedulerImpl这个实现类

然后会执行

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()

new StandaloneSchedulerBackend()

在执行val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls),会先在父类CoarseGrainedSchedulerBackend中执行

val driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint())
protected def createDriverEndpoint(): DriverEndpoint = new DriverEndpoint()

其中DriverEndpoint的onstart方法

 override def onStart(): Unit = {
      // Periodically revive offers to allow delay scheduling to work
      // 调度程序为了运行任务而重新提供work资源的间隔长度。
      val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L)
      //每隔1秒,给自己发一个ReviveOffers,发给receive函数
      reviveThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
        Option(self).foreach(_.send(ReviveOffers))
      }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
    }

receive方法

override def receive: PartialFunction[Any, Unit] = {
      case StatusUpdate(executorId, taskId, state, data, resources) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          executorDataMap.get(executorId) match {
            case Some(executorInfo) =>
              val rpId = executorInfo.resourceProfileId
              val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
              val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
              executorInfo.freeCores += taskCpus
              resources.foreach { case (k, v) =>
                executorInfo.resourcesInfo.get(k).foreach { r =>
                  r.release(v.addresses)
                }
              }
              makeOffers(executorId)
            case None =>
              // Ignoring the update since we don't know about the executor.
              logWarning(s"Ignored task status update ($taskId state $state) " +
                s"from unknown executor with ID $executorId")
          }
        }

      case ReviveOffers =>
        makeOffers()

即不停的调用makeOffers

// Make fake resource offers on all executors
    //  在逻辑上,让所有Executor都成为计算资源的提供者
    //  makeOffers()是提交taskSet执行的关键方法,它会被DriverEndpoint每秒调用一次,
    //  如果有任何runJob等产生task的动作就会被提交到各个节点去执行==》由于是初始化,
    //  先记住这个方法是会不断被触发的,等运行runJob(),再跟踪进来
    private def makeOffers(): Unit = {
      // Make sure no executor is killed while some task is launching on it
      val taskDescs = withLock {

        // executorDataMap:HashMap[String, ExecutorData]保存executorId和ExecutorData的,
        // executorDataMap的值是在,CoarseGrainedExecutorBackend这个RpcEndpoint初始化时在onStart方法,会给DriverEndpoint发送RegisterExecutor注入进去的
        // Filter out executors under killing //筛选当前active状态的所有的executors
        //CoarseGrainedExecutorBackend的初始化是由StandaloneSchedulerBackend.start方法,触发去执行的
        val activeExecutors = executorDataMap.filterKeys(isExecutorActive)
        val workOffers = activeExecutors.map {
          case (id, executorData) =>
            // 每个active的executor都创建一个 WorkerOffer
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
              Some(executorData.executorAddress.hostPort),
              executorData.resourcesInfo.map { case (rName, rInfo) =>
                (rName, rInfo.availableAddrs.toBuffer)
              }, executorData.resourceProfileId)
        }.toIndexedSeq
        // 此方法由集群调用,在集群的slave准备资源。根据TaskSet的优先级,以轮询的方式发送到任务,以保证负载均衡
        // TaskSchedulerImpl.resourceOffers生成资源分配的二维数组:Seq[ArrayBuffer[TaskDescription](o.cores)],
        // 会被resourceOfferSingleTaskSet调用,
        scheduler.resourceOffers(workOffers, true)
      }
      if (taskDescs.nonEmpty) {
        launchTasks(taskDescs)
      }
    }

TaskSchedulerImpl.start()

  override def start(): Unit = {
    //backend是通过将initialize,将StandaloneSchedulerBackend注入给backend成员
    backend.start()
    //spark.speculation:如果设置为“true”,则tasks会推测性执行。 这意味着如果一个或多个任务在一个阶段缓慢运行,它们将被重新启动。
    if (!isLocal && conf.get(SPECULATION_ENABLED)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleWithFixedDelay(
        () => Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() },
        SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }

backend.start()

  override def start(): Unit = {
    super.start()

    // SPARK-21159. The scheduler backend should only try to connect to the launcher when in client
    // mode. In cluster mode, the code that submits the application to the Master needs to connect
    // to the launcher instead.
    if (sc.deployMode == "client") {
      launcherBackend.connect()
    }

    // The endpoint for executors to talk to us 得到sparkDriver对应的url
    // spark.driver.host 、 spark.driver.port是在SparkEnv中设置进去的
    // s"spark://$name@${rpcAddress.host}:${rpcAddress.port}"
    //spark://CoarseGrainedScheduler@DRIVER_HOST_ADDRESS:DRIVER_PORT
    val driverUrl = RpcEndpointAddress(
      sc.conf.get(config.DRIVER_HOST_ADDRESS),
      sc.conf.get(config.DRIVER_PORT),
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    //是给CoarseGrainedExecutorBackend使用的,如何知道是给CoarseGrainedExecutorBackend?
    // 是在val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",...
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.get(config.EXECUTOR_JAVA_OPTIONS)
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.get(config.EXECUTOR_CLASS_PATH)
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.get(config.EXECUTOR_LIBRARY_PATH)
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains(IS_TESTING.key)) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    // 使用一些必要的配置启动executors,以便与调度程序一起注册
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    //sc.ui就是SparkUI,得到sparkUI的http地址 http://host:4040,这个4040就是spark的作业运行时job相关信息
    val webUrl = sc.ui.map(_.webUrl).getOrElse("")
    // 每个CoarseGrainedExecutorBackend的core的个数 --executor-cores 或spark.executor.cores
    val coresPerExecutor = conf.getOption(config.EXECUTOR_CORES.key).map(_.toInt)
    // If we're using dynamic allocation, set our initial executor limit to 0 for now.
    // ExecutorAllocationManager will send the real initial limit to the Master later.
    val initialExecutorLimit =
      if (Utils.isDynamicAllocationEnabled(conf)) {
        Some(0)
      } else {
        None
      }
    val executorResourceReqs = ResourceUtils.parseResourceRequirements(conf,
      config.SPARK_EXECUTOR_PREFIX)
    val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit,
      resourceReqsPerExecutor = executorResourceReqs)
    //创建AppClient,传入相应启动参数,可以看出会将org.apache.spark.executor.CoarseGrainedExecutorBackend启动起来
    //发送信息给Worker,使用Jdk的ProcessBuilder.start()来启动CoarseGrainedExecutorBackend
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }

上面的start方法的关键点是:

val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
然后将command放在appdesc里
val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit,
      resourceReqsPerExecutor = executorResourceReqs)
//创建AppClient,传入相应启动参数,可以看出会将org.apache.spark.executor.CoarseGrainedExecutorBackend启动起来
//发送信息给Worker,使用Jdk的ProcessBuilder.start()来启动CoarseGrainedExecutorBackend
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()      

client.start()

def start(): Unit = {
    // Just launch an rpcEndpoint; it will call back into the listener.
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

ClientEndpoint的 onstart()

 override def onStart(): Unit = {
      try {
        //发消息给master来注册app(其实质是发送消息给所有masters,
        // 一旦跟一个master连接成功,就cancel与其他master的连接) masterRef.send(RegisterApplication(appDescription,self))
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }

registerWithMaster()


    /**
     * Register with all masters asynchronously. It will call `registerWithMaster` every
     * REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
     * Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
     *
     * nthRetry means this is the nth attempt to register with master.
     * 异步注册所有master.每20秒(REGISTRATION_TIMEOUT_SECONDS)执行一次registerWithMaster,直到超过3次(REGISTRATION_RETRIES)
     * 一旦连上master就会将所有调度线程会被取消掉
     * 参数nthRetry:表示尝试向master注册的次数
     */
    private def registerWithMaster(nthRetry: Int): Unit = {
      // tryRegisterAllMasters()发消息给master来注册app(其实质是发送消息给所有masters,
      // 一旦跟一个master连接成功,就cancel与其他master的连接)
      // registerMasterFutures:newAtomicReference[Array[JFuture[_]]],将Future存起来方便取消线程
      registerMasterFutures.set(tryRegisterAllMasters())
      // registrationRetryTimer:AtomicReference[JScheduledFuture[_]],方便联接到的master时,将其它的线程通过ScheduledFuture.cancel()掉
      // 先延迟20秒,然后每20秒执行一次。延迟20s是为了给tryRegisterAllMasters()线程池去直接注册,如果注册成功之后registered会为true
      registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
        override def run(): Unit = {
          // registered:AtomicBoolean(false)默认是false
          // 当tryRegisterAllMasters()的线程池注册成功之后会将registered设置true,然后将它里面的线程取消掉,同时将当前调度线程池也关掉
          if (registered.get) {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerMasterThreadPool.shutdownNow()
          } else if (nthRetry >= REGISTRATION_RETRIES) {
            // 重复注册3次,还是没有成功,就认为master死了,会执行sparkContext.stop
            markDead("All masters are unresponsive! Giving up.")
          } else {
            // 如果nthRetry小于3,则先将tryRegisterAllMasters线程池中的线程取消掉,然后回调一下自己,并将nthRetry加1
            registerMasterFutures.get.foreach(_.cancel(true))
            registerWithMaster(nthRetry + 1)
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
    }

tryRegisterAllMasters()


    /**
     *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
     *
     * tryRegisterAllMasters()这个是一个缓存线程池,spark自己实现ThreadPoolExecutor,该线程池也是一个守护线程池,主线程停止,它也停止
     *
     * 异步注册所有master和返回Future集合进行取消
     */
    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        // spark重写了缓存线程池,该池是守护线程池,主线程退出,它也消失,jdk默认提供的缓存线程池也是ThreadPoolExecutor重新构造了一下
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            //取得masterRpcEndPoint,然后发送注册信息
            // RpcEnv是在sparkEnv创建出来的,而MasterRpcEndPoint是通过Master自己的RpcEnv中创建出来的。
            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            //需要注意的是:这里的appDescription包含了app的具体信息,包括command信息,里面有启动类CoarseGrainedExecutorBackend;这里的self是ClientEndpoint本身
            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }

这里的Master.ENDPOINT_NAME为

val ENDPOINT_NAME = "Master"

Master

private[deploy] class Master(
    override val rpcEnv: RpcEnv,
    address: RpcAddress,
    webUiPort: Int,
    val securityMgr: SecurityManager,
    val conf: SparkConf)
  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {

可以看出来Master也是RpcEndpoint

Master.receive()

override def receive: PartialFunction[Any, Unit] = {
    case RegisterApplication(description, driver) =>
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, driver)
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        driver.send(RegisteredApplication(app.id, self))
        schedule()
      }

Master.schedule()


  /**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  private def schedule(): Unit = {
    // 如果不是ALIVE状态返回
    if (state != RecoveryState.ALIVE) {
      return
    }
    // Drivers take strict precedence over executors
    //打乱Worker顺序,避免Driver集中
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    //遍历waitingDrivers的副本
    //我们以轮循方式将worker分配给每个等待的driver。 对于每个driver,我们从分配给driver的最后一个worker开始,
    // 然后继续进行,直到我们探索了所有活着的worker。
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var isClusterIdle = true
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmpty // drivers和executors是否都为空
        numWorkersVisited += 1
        if (canLaunchDriver(worker, driver.desc)) { //通过判断是否有足够的内存、CPU core、资源判断是否可以LaunchDriver
          val allocated = worker.acquireResources(driver.desc.resourceReqs)
          driver.withResources(allocated)
          launchDriver(worker, driver) // 启动Driver
          waitingDrivers -= driver
          launched = true
        }
        curPos = (curPos + 1) % numWorkersAlive
      }
      if (!launched && isClusterIdle) {
        logWarning(s"Driver ${driver.id} requires more resource than any of Workers could have.")
      }
    }
    // 启动worker上的executor
    startExecutorsOnWorkers()
  }

Master.startExecutorsOnWorkers()


  /**
   * Schedule and launch executors on workers
   */
  private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    // waitingApps信息主要是我们通过命令行传入的core和memory信息
    // startExecutorsOnWorkers方法的职责是调度waitingApps,即将core和memory分配到具体的Worker
    for (app <- waitingApps) {
      val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
      // If the cores left is less than the coresPerExecutor,the cores left will not be allocated
      if (app.coresLeft >= coresPerExecutor) {
        // Filter out workers that don't have enough resources to launch an executor
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canLaunchExecutor(_, app.desc))
          .sortBy(_.coresFree).reverse
        val appMayHang = waitingApps.length == 1 &&
          waitingApps.head.executors.isEmpty && usableWorkers.isEmpty
        if (appMayHang) {
          logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
        }
        //为每个worker分配的core数
        val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

        // Now that we've decided how many cores to allocate on each worker, let's allocate them
        // 根据分配好的assignedCores,在相应的worker节点启动executor
        for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
          allocateWorkerResourceToExecutors(
            app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
        }
      }
    }
  }

Master.allocateWorkerResourceToExecutors()


  /**
   * Allocate a worker's resources to one or more executors.
   * @param app the info of the application which the executors belong to
   * @param assignedCores number of cores on this worker for this application
   * @param coresPerExecutor number of cores per executor
   * @param worker the worker info
   */
  private def allocateWorkerResourceToExecutors(
      app: ApplicationInfo,
      assignedCores: Int,
      coresPerExecutor: Option[Int],
      worker: WorkerInfo): Unit = {
    // If the number of cores per executor is specified, we divide the cores assigned
    // to this worker evenly among the executors with no remainder.
    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
    // 如果指定了executor的核数,executor core平均分配:numExecutors=assignedCores/一个executor所需要的core数
    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
    for (i <- 1 to numExecutors) {
      val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor)
      val exec = app.addExecutor(worker, coresToAssign, allocated)
      //启动executor
      launchExecutor(worker, exec)
      app.state = ApplicationState.RUNNING
    }
  }

launchExecutor()

  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    //给worker发送LaunchExecutor信息
    worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id,
      exec.application.desc, exec.cores, exec.memory, exec.resources))
    //给Driver发送executor信息,用于Driver的4040端口显示
    exec.application.driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }

Worker.receive()

 override def receive: PartialFunction[Any, Unit] = synchronized {
     ......
     case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else if (decommissioned) {
        logWarning("Asked to launch an executor while decommissioned. Not launching executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

          // Create the executor's working directory
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
          }

          // Create local dirs for the executor. These are passed to the executor via the
          // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
          // application finishes.
          val appLocalDirs = appDirectories.getOrElse(appId, {
            val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)
            val dirs = localRootDirs.flatMap { dir =>
              try {
                val appDir = Utils.createDirectory(dir, namePrefix = "executor")
                Utils.chmod700(appDir)
                Some(appDir.getAbsolutePath())
              } catch {
                case e: IOException =>
                  logWarning(s"${e.getMessage}. Ignoring this directory.")
                  None
              }
            }.toSeq
            if (dirs.isEmpty) {
              throw new IOException("No subfolder can be created in " +
                s"${localRootDirs.mkString(",")}.")
            }
            dirs
          })
          appDirectories(appId) = appLocalDirs
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            webUi.scheme,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            workerUri,
            conf,
            appLocalDirs,
            ExecutorState.LAUNCHING,
            resources_)
          executors(appId + "/" + execId) = manager
          //执行ExecutorRunner的start方法
          manager.start()
          coresUsed += cores_
          memoryUsed += memory_
          addResourcesUsed(resources_)
        } catch {
          case e: Exception =>
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))
        }
      }

manager.start() (ExecutorRunner)

  private[worker] def start(): Unit = {
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      override def run(): Unit = { fetchAndRunExecutor() }
    }
    workerThread.start()
    // Shutdown hook that kills actors on shutdown.
    shutdownHook = ShutdownHookManager.addShutdownHook { () =>
      // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
      // be `ExecutorState.LAUNCHING`. In this case, we should set `state` to `FAILED`.
      if (state == ExecutorState.LAUNCHING) {
        state = ExecutorState.FAILED
      }
      killProcess(Some("Worker shutting down")) }
  }

ExecutorRunner.fetchAndRunExecutor()

前面提到过appDesc.command包含类名org.apache.spark.executor.CoarseGrainedExecutorBackend,builder.start()会执行Linux命令,
启动CoarseGrainedExecutorBackend

  /**
   * Download and run the executor described in our ApplicationDescription
   * fetchAndRunExecutor方法中将收到的信息拼接为Linux命令,然后使用ProcessBuilder执行Linux命令
   * 启动CoarseGrainedExecutorBackend
   */
  private def fetchAndRunExecutor(): Unit = {
    try {
      val resourceFileOpt = prepareResourcesFile(SPARK_EXECUTOR_PREFIX, resources, executorDir)
      // Launch the process
      val arguments = appDesc.command.arguments ++ resourceFileOpt.map(f =>
        Seq("--resourcesFile", f.getAbsolutePath)).getOrElse(Seq.empty)
      val subsOpts = appDesc.command.javaOpts.map {
        Utils.substituteAppNExecIds(_, appId, execId.toString)
      }
      //拼接Linux命令
      val subsCommand = appDesc.command.copy(arguments = arguments, javaOpts = subsOpts)
      val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf),
        memory, sparkHome.getAbsolutePath, substituteVariables)
      val command = builder.command()
      val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala.toSeq)
        .mkString("\"", "\" \"", "\"")
      logInfo(s"Launch command: $redactedCommand")

      builder.directory(executorDir)
      builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
      // In case we are running this from within the Spark Shell, avoid creating a "scala"
      // parent process for the executor command
      builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

      // Add webUI log urls
      val baseUrl =
        if (conf.get(UI_REVERSE_PROXY)) {
          s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
        } else {
          s"$webUiScheme$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
        }
      builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
      builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

      //执行Linux命令
      process = builder.start()
      val header = "Spark Executor Command: %s\n%s\n\n".format(
        redactedCommand, "=" * 40)

      // Redirect its stdout and stderr to files
      val stdout = new File(executorDir, "stdout")
      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

      val stderr = new File(executorDir, "stderr")
      Files.write(header, stderr, StandardCharsets.UTF_8)
      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

      state = ExecutorState.RUNNING
      worker.send(ExecutorStateChanged(appId, execId, state, None, None))
      // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
      // or with nonzero exit code
      val exitCode = process.waitFor()
      state = ExecutorState.EXITED
      val message = "Command exited with code " + exitCode
      worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
    } catch {
      case interrupted: InterruptedException =>
        logInfo("Runner thread for executor " + fullId + " interrupted")
        state = ExecutorState.KILLED
        killProcess(None)
      case e: Exception =>
        logError("Error running executor", e)
        state = ExecutorState.FAILED
        killProcess(Some(e.toString))
    }
  }

CoarseGrainedExecutorBackend.main()

最后启动CoarseGrainedExecutorBackend执行main

 def main(args: Array[String]): Unit = {
    val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
      CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
      new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
        arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
        env, arguments.resourcesFileOpt, resourceProfile)
    }
    run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
    System.exit(0)
  }
  def parseArguments(args: Array[String], classNameForEntry: String): Arguments = {
    var driverUrl: String = null
    var executorId: String = null
    var bindAddress: String = null
    var hostname: String = null
    var cores: Int = 0
    var resourcesFileOpt: Option[String] = None
    var appId: String = null
    var workerUrl: Option[String] = None
    val userClassPath = new mutable.ListBuffer[URL]()
    var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID

    var argv = args.toList
    while (!argv.isEmpty) {
      argv match {
        case ("--driver-url") :: value :: tail =>
          driverUrl = value
          argv = tail
        case ("--executor-id") :: value :: tail =>
          executorId = value
          argv = tail
        case ("--bind-address") :: value :: tail =>
          bindAddress = value
          argv = tail
        case ("--hostname") :: value :: tail =>
          hostname = value
          argv = tail
        case ("--cores") :: value :: tail =>
          cores = value.toInt
          argv = tail
        case ("--resourcesFile") :: value :: tail =>
          resourcesFileOpt = Some(value)
          argv = tail
        case ("--app-id") :: value :: tail =>
          appId = value
          argv = tail
        case ("--worker-url") :: value :: tail =>
          // Worker url is used in spark standalone mode to enforce fate-sharing with worker
          workerUrl = Some(value)
          argv = tail
        case ("--user-class-path") :: value :: tail =>
          userClassPath += new URL(value)
          argv = tail
        case ("--resourceProfileId") :: value :: tail =>
          resourceProfileId = value.toInt
          argv = tail
        case Nil =>
        case tail =>
          // scalastyle:off println
          System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
          // scalastyle:on println
          printUsageAndExit(classNameForEntry)
      }
    }

    if (hostname == null) {
      hostname = Utils.localHostName()
      log.info(s"Executor hostname is not provided, will use '$hostname' to advertise itself")
    }

    if (driverUrl == null || executorId == null || cores <= 0 || appId == null) {
      printUsageAndExit(classNameForEntry)
    }

    if (bindAddress == null) {
      bindAddress = hostname
    }

    Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl,
      userClassPath, resourcesFileOpt, resourceProfileId)
  }
def run(
      arguments: Arguments,
      backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
        CoarseGrainedExecutorBackend): Unit = {

    Utils.initDaemon(log)

    SparkHadoopUtil.get.runAsSparkUser { () =>
      // Debug code
      Utils.checkHost(arguments.hostname)

      // Bootstrap to fetch the driver's Spark properties.
      val executorConf = new SparkConf
      val fetcher = RpcEnv.create(
        "driverPropsFetcher",
        arguments.bindAddress,
        arguments.hostname,
        -1,
        executorConf,
        new SecurityManager(executorConf),
        numUsableCores = 0,
        clientMode = true)

      var driver: RpcEndpointRef = null
      val nTries = 3
      for (i <- 0 until nTries if driver == null) {
        try {
          driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
        } catch {
          case e: Throwable => if (i == nTries - 1) {
            throw e
          }
        }
      }

      val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId))
      val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))
      fetcher.shutdown()

      // Create SparkEnv using properties we fetched from the driver.
      val driverConf = new SparkConf()
      for ((key, value) <- props) {
        // this is required for SSL in standalone mode
        if (SparkConf.isExecutorStartupConf(key)) {
          driverConf.setIfMissing(key, value)
        } else {
          driverConf.set(key, value)
        }
      }

      cfg.hadoopDelegationCreds.foreach { tokens =>
        SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
      }

      driverConf.set(EXECUTOR_ID, arguments.executorId)
      //
      val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
        arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
      // 注册CoarseGrainedExecutorBackend,名字为Executor
      env.rpcEnv.setupEndpoint("Executor",
        backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
      // 注册WorkerWatcher,用于关闭CoarseGrainedExecutorBackend进程
      arguments.workerUrl.foreach { url =>
        env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
      }
      env.rpcEnv.awaitTermination()
    }
  }

参考

https://blog.csdn.net/luyllyl/category_7506344.html

相关阅读

https://dongkelun.com/2020/12/23/sparkRPC/

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-20 15:11:21  更:2021-08-20 15:12:26 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:21:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码