IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark 源码阅读 02:从 Spark-Submit 到 Driver 启动 -> 正文阅读

[大数据]Spark 源码阅读 02:从 Spark-Submit 到 Driver 启动

Spark 版本:3.0.0

找到入口

  • 理解一个项目最好的切入点就是找到一个入口。
    比如跟 Spark 的交互方式,如何把任务提交到 Spark 集群。通常使用 spark-submit 脚本来提交任务。它在 Spark 源码的 bin 目录下

在这里插入图片描述

bin文件夹中在存放了 spark-shell 等其他入口方式。

  • 我们来看一下 spark-submit 的具体内容。

在这里插入图片描述

可以看到是一个shell脚本,加载的类是 org.apache.spark.deploy.SparkSubmit$@ 把执行的参数带进去。接下来我们找到具体的类 SparkSubmit,在 deploy 这个包下。

SparkSubmit Object

上一步的 spark-submit 会创建一个Java进程,执行SparkSumbit.scala 文件的逻辑,这个文件有1300多行代码,一上来看哪里呢?还是抓主线,找入口,我们找到 main 函数。

main方法是JVM执行的入口。

在这里插入图片描述

  override def main(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    val uninitLog = initializeLogIfNecessary(true, silent = true)

    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

main函数中,主要做了这几件事

  • 准备参数
  • 匹配任务的执行类型

1. 准备参数

在这里插入图片描述

这部分会 parse 命令行中的参数,如 --master local[*] 转成 SparkSubmitArguments 对象中的成员变量。同时检查参数输入的正不正确等逻辑。

SparkSubmitArguments的继承关系

2. 执行任务

如果是提交任务,着重看submit函数。

  @tailrec
  private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    // 该方法准备执行任务所需的一些环境变量等,返回四元祖
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

    if (args.isStandaloneCluster && args.useRest) {
      try {
        ...
        doRunMain()
      } catch {
        // Fail over to use the legacy submission gateway
        ...
      }
    // In all other modes, just run the main class as prepared
    } else {
      doRunMain()
    }

返回的四元组分别是:

  • childArgs:the arguments for the child process 子进程参数
  • childClasspath:a list of classpath entries for the child 子进程classpath列表
  • sparkConf:a map of system properties 系统属性map
  • childMainClass:the main class for the child 子进程main方法

着重看一下 childMainClass

2.1 部署模式的差异

// 模式 1  
if (deployMode == CLIENT) {
      childMainClass = args.mainClass
      if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
        childClasspath += localPrimaryResource
      }
      if (localJars != null) { childClasspath ++= localJars.split(",") }
    }
...
  if (args.isStandaloneCluster) {
      // 模式 2
      if (args.useRest) {
        childMainClass = REST_CLUSTER_SUBMIT_CLASS
        childArgs += (args.primaryResource, args.mainClass)
      } else {
        // 模式 3
        // In legacy standalone cluster mode, use Client as a wrapper around the user class
        childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
        
...
// 模式 4
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
  if (isYarnCluster) {
      childMainClass = YARN_CLUSTER_SUBMIT_CLASS
      if (args.isPython) {
  • 如果部署模式为client,直接运行我们设置的主类的名字

    • 比如 com.chixiaodou.wordcount
  • 如果是StandaloneCluster,且使用了rest风格,

    • REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
  • 如果是StandaloneCluster, 没有使用rest

    • STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
  • 如果是Yarn集群上运行,则为

    • YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication"

代码后面还有其他的部署模式,不再罗列了。

2.2 runMain方法

    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
            }
          })
        } catch {
          case e: Exception =>
            ...
        }
      } else {
        runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
      }
    }

接下来执行doRunMain, 把准备好的参数传递到runMain方法中。

runMain方法执行 child class 的main 方法,使用 submit函数中准备好的四元组。

  private def runMain(
      childArgs: Seq[String],
      childClasspath: Seq[String],
      sparkConf: SparkConf,
      childMainClass: String,
      verbose: Boolean): Unit = {

...
    
    var mainClass: Class[_] = null

    try {
      // 上一步准备environment的时候,选择了一个childMainClass,在
      // 这里加载这个 childMainClass 类
      // 通过一个类的字符串名称,构建一个类,也就是反射的方式创建一个类
      mainClass = Utils.classForName(childMainClass)
    } catch {...}

    // 根据 mainClass,构造 SparkApplication 对象
    // 两种情况,1. mainClass有没有去继承 classOf[SparkApplication],如果有的话,反射创建一个实例并转换成asInstanceOf[SparkApplication]
    // 2. mainClass没有继承,通过new JavaMainApplication(mainClass) 构造
    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.newInstance().asInstanceOf[SparkApplication]
    } else {
      // SPARK-4170
      if (classOf[scala.App].isAssignableFrom(mainClass)) {
        printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
      }
      new JavaMainApplication(mainClass)
    }

    ...

    try {
      // 关键,调用 SparkApplication.start()
      app.start(childArgs.toArray, sparkConf)
    } catch {
      case t: Throwable =>
        findCause(t) match {
          case SparkUserAppException(exitCode) =>
            System.exit(exitCode)

          case t: Throwable =>
            throw t
        }
    }
  }

在这一步,生成 Application,调用 Start 方法。最后这里逻辑比较复杂,我们来一点点掰开梳理

  • 假设我们以 StandaloneCluster, 没有使用 rest 的方式启动集群,那么mainClass = classOf[ClientApp].getName(),这个可以看上面 2.1 提到的详解
  • 所以这里app.start(childArgs.toArray, sparkConf) 调用的应该是ClientApp, 跳到Client.scala文件看看start()都做了什么

代码执行到这里,Spark-Submit的任务就完成了。我们看一下上面提及的所有调用的 UML 图。接下来我们以 Standalone Client为例,接下去展开分析。

在这里插入图片描述

3. Driver - Rpc 环境准备

private[spark] class ClientApp extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
    val driverArgs = new ClientArguments(args)

    if (!conf.contains("spark.rpc.askTimeout")) {
      conf.set("spark.rpc.askTimeout", "10s")
    }
    Logger.getRootLogger.setLevel(driverArgs.logLevel)

    val rpcEnv =
      RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

    val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
      map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
    rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))

    rpcEnv.awaitTermination()
  }

}

这段代码具体做了如下几件事:

  • 创建RPC通信环境 rpcEnv
  • 获取 Master 的通信邮箱 RpcEndpointRef,用于和Master通信
  • 注册RpcEndpoint,调用onStart方法。让 clientEndpoint 可以和 masterEndpoint 通信

代码中涉及 Spark RPC 通信的知识,RPC是一种进程间的通信方式。如果对这部分知识没有了解的话,会看得有点一头雾水。这部分知识也非常庞大,需要另外开一个文档来详细介绍。

3.1 Spark RPC 设计

这里就先简单介绍一下,让这段代码看起来不那么尴尬。

在这里插入图片描述

Spark 3.0 的 RPC 框架是基于 Netty,Netty 使用经典的 Actor 模型做消息传递。图中画了若干个组建。

  • RpcEnv:为RpcEndpoint提供了一个处理消息的环境,负责Endpoint的生命周期管理,注册Endpoint,消息间的路由,停止Endpoint等。
  • RpcEndpoint:代表具体的通信节点。例如Master,Worker,DriverEndpoint等,它们都实现该接口。一个RpcEndpoint的生命周期是create -> onStart -> receive* -> onStopreceivereceiveAndReply 分别用来接收其他 endpoint 发过来的 sendask 消息。
  • RpcEndpointRef:顾名思义,是一个RpcEndpoint的引用。当我们想向master,worker发送消息的时候,需要先获取到如 master Endpoint的引用。
  • RpcAddress:表示远程 RpcEndpointRef 的地址,包含 Host 和 Port。

Spark是一个分布式的系统,想要跟其他的端点通信,需要按照如上的步骤,建立通信环境,通信地址及引用。接下来才可以通过发送消息请求的方式,在其他 Endpoint 执行如 launchDriverregisterWorker 等动作。

4. 启动 Driver

接上面 ClientApp start 继续往下看。在 RpcEnv 中构建一个 ClientEndpoint

private[spark] class ClientApp extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
    ...
		rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
    ...}
}

而我们前面又讲过,一个RpcEndpoint的会调用onStart()。进入 ClientEndpointonStart() 进行查看。

4.1 向 Master 发送请求

private class ClientEndpoint(
    override val rpcEnv: RpcEnv,
    driverArgs: ClientArguments,
    masterEndpoints: Seq[RpcEndpointRef],
    conf: SparkConf)
  extends ThreadSafeRpcEndpoint with Logging {
    ...
    override def onStart(): Unit = {
    driverArgs.cmd match {
      // 匹配到 launch driver
      case "launch" =>
      
      ....
      		// 敲重点,这里重点记忆一下,DriverWrapper 作为 mainClass
      		val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
          ...
      		// 准备通信的请求信息
          val driverDescription = new DriverDescription(
          	driverArgs.jarUrl,
          	driverArgs.memory,
          	driverArgs.cores,
          	driverArgs.supervise,
          	command)
         //向master发送提交启动 Driver 的请求
        asyncSendToMasterAndForwardReply[SubmitDriverResponse](
          RequestSubmitDriver(driverDescription))
      ...
      
  

onStart()中匹配到 launch这个动作,并向 MasterEndpoint 发送 RequestSubmitDriver消息。

4.2 Master

接下来我们跳到 Master.scala(package org.apache.spark.deploy.master) 去看看 Master 节点如何处理这个请求。

在Spark的Rpc设计中,receivereceiveAndReply分别用来接收 sendask 类型的消息。

Sends a one-way asynchronous message. Fire-and-forget semantics.

def send(message: Any): Unit

  • 发送单向的异步消息,发送了就不再返回任何东西。

Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to receive the reply within the specified timeout.

def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]:

  • 发送消息出去之后,返回一个 [[Future]],并在规定的timeout时间内接收回复。

5. Master 接收消息并回复

RequestSubmitDriver 在下面这个方法中调用。这里调用了我们上面提到的 ask()方法。也就是说会触发 Master 的 receiveAndReply()

  /**   * Send the message to master and forward the reply to self asynchronously.   */  private def asyncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {    for (masterEndpoint <- masterEndpoints) {      masterEndpoint.ask[T](message).onComplete {        case Success(v) => self.send(v)        case Failure(e) =>          logWarning(s"Error sending messages to master $masterEndpoint", e)      }(forwardMessageExecutionContext)    }  }

Master 的 receiveAndReply()方法。

private[deploy] class Master(    override val rpcEnv: RpcEnv,    address: RpcAddress,    webUiPort: Int,    val securityMgr: SecurityManager,    val conf: SparkConf)  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {    ....      // 重点  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {    case RequestSubmitDriver(description) =>      ...        // 根据请求的description创建一个 DriverInfo        val driver = createDriver(description)        persistenceEngine.addDriver(driver)        // 重点:先把driver存一份到 waiting list        waitingDrivers += driver        drivers.add(driver)        // 重点关注        schedule()
  • 构建 DriverInfo,保存后续 Driver 启动所需的信息
  • driver append 到 waiting list,方便后续有 worker 资源了再去创建
  • 执行 schedule()

把Driver 信息准备好之后,最后来到 schedule()方法,该方法中,会把所有的 waitingDrivers等待被分配的driver一一分配出去,分配到alive的worker中。

private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) {
      return
    }
    // Drivers take strict precedence over executors
    // 打散 worker
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    for (driver <- waitingDrivers.toList) { 
      var launched = false
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          // 关键:启动 driver
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    startExecutorsOnWorkers()
  }

我们来看 launchDriver(),又是一个远程调用。

  private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    worker.addDriver(driver)
    driver.worker = Some(worker)
    // 远程调用
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    driver.state = DriverState.RUNNING
  }

这里调用了worker endpoint 的 send 方法,因此要看接收方的 receive() 详情。同理,因为这个是往 Worker 发消息,那就去 Worker.scala 看看 Worker 怎么处理这个消息。

6. Worker 接收消息

终于扒到这个非常关键的信息了。终于要开始构建我们的Driver了。

private[deploy] class Worker(
    override val rpcEnv: RpcEnv,
    webUiPort: Int,
		...
    val securityMgr: SecurityManager)
  extends ThreadSafeRpcEndpoint with Logging {
    ...
      override def receive: PartialFunction[Any, Unit] = synchronized {
            // 启动 Driver
            case LaunchDriver(driverId, driverDesc) =>
              logInfo(s"Asked to launch driver $driverId")
        			// 创建 DriverRunner
              val driver = new DriverRunner(...)
              drivers(driverId) = driver
              driver.start()
							// 记录 core 和 memory 使用了多少
              coresUsed += driverDesc.cores
              memoryUsed += driverDesc.mem

接下去就是比较细节的启动工作。Driver 启动的详细信息又可以单开一个篇幅来讲了。

至此,我们从找到 Spark 的入口, spark 提交任务开始,准备参数,执行submit任务,根据特定的部署模式,启动相应的 Spark Application,又通过了解Spark的远程调用设计,摸索到 ClientMasterWorker 之间如何发送启动 Driver 的信息。一步一步找到 launchDriver 的入口。

最后还是画一张 UML 图来更加清晰的展现整体交互。

在这里插入图片描述

常驻小尾巴

都看到这里了,不关注一下嘛 👇 👇 👇

在这里插入图片描述

我的免费星球,欢迎来看我的日常碎碎念

在这里插入图片描述

参考资料

  • 陈凯 https://zhuanlan.zhihu.com/p/84506391

  • 简书 https://www.jianshu.com/p/4d4964c505fe?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation

  • 尚硅谷 P131 源码阅读 https://www.bilibili.com/video/BV11A411L7CK?p=131&spm_id_from=pageDriver

  • Solve https://cloud.tencent.com/developer/article/1574348

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-30 12:07:06  更:2021-08-30 12:07:30 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 15:50:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码