IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark 源码解析大全(1) 任务的提交 -> 正文阅读

[大数据]Spark 源码解析大全(1) 任务的提交

Spark 源码解析 任务提交

1: 代码提交

	### 我们提交spark任务的步骤为:

	### 	1):将IDE中所写的scala/java 代码 打包为一个jar包  

	### 	2):将jar 包上传到 服务器  

	### 	3): 通过spark-submit 运行 命令” ./spark-subimt --class org.apache.examples --master yarn ./exmples/jars/spark-examples_2.12-3.0.0.jar “  这样的shell 命令通过脚本 将我们的代码 提交到spark的集群运行。 

	### 那就从脚本入手 看看 脚本的内容为 spark-submit :
if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

可以看出 这个shell 脚本其实是调用了 spark-class 脚本 传入了参数 org.apache.deploy.SparkSubmit 这个参数

spark-class 脚本:

#!/usr/bin/env bash

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi
// 运行脚本 加载saprk的环境
. "${SPARK_HOME}"/bin/load-spark-env.sh

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
 //初始化 java的命令
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ "$(command -v java)" ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

# Find Spark jars.
if [ -d "${SPARK_HOME}/jars" ]; then
  SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
  SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi

if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
  echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
  echo "You need to build Spark with the target \"package\" before running this program." 1>&2
  exit 1
else
  LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

# For tests
if [[ -n "$SPARK_TESTING" ]]; then
  unset YARN_CONF_DIR
  unset HADOOP_CONF_DIR
fi

# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
#
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
# command array and checks the value to see if the launcher succeeded.
build_command() {
  "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}

# Turn off posix mode since it does not allow process substitution
set +o posix
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
  if [ "$CMD_START_FLAG" == "true" ]; then
    CMD+=("$ARG")
  else
    if [ "$ARG" == $'\0' ]; then
      # After NULL character is consumed, change the delimiter and consume command string.
      DELIM=''
      CMD_START_FLAG="true"
    elif [ "$ARG" != "" ]; then
      echo "$ARG"
    fi
  fi
done < <(build_command "$@")

COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}

# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
  echo "${CMD[@]}" | head -n-1 1>&2
  exit 1
fi
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
  exit $LAUNCHER_EXIT_CODE
fi

CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

2 :SparkSubmit 类

从spark-submit 脚本中 我们可以看出来 最后提交任务是运行了 组装了java命令 运行了 org.apache.deploy.SparkSubmit 这个类

查看main 方法 入口

    override def main(args: Array[String]): Unit = {
        val submit = new SparkSubmit() {
            self =>
            
            override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
                new SparkSubmitArguments(args) {
                    override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
                    
                    override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
                }
            }
            
            override protected def logInfo(msg: => String): Unit = printMessage(msg)
            
            override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
            
          
            override def doSubmit(args: Array[String]): Unit = {
                try {
                    /**
                     *   依旧是调用了父类的doSubmit方法 
                     */
                    super.doSubmit(args)
                } catch {
                    case e: SparkUserAppException => exitFn(e.exitCode)
                }
            }
            
        }
        
        /**
         *   doSubmit  调用
         */
        submit.doSubmit(args)
    }

调用了 SparkSubmit的doSubmit方法

    def doSubmit(args: Array[String]): Unit = {
        // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
        // be reset before the application starts.
        val uninitLog = initializeLogIfNecessary(true, silent = true)
        
        val appArgs = parseArguments(args)
        if (appArgs.verbose) {
            logInfo(appArgs.toString)
        }
        
        /**
         *   根据你的 spark-submit 的命令来决定到底执行那个方法
         *   $CLASS 参数
         */
        appArgs.action match {
            //  提交 app 运行
            case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
            // 停止任务
            case SparkSubmitAction.KILL => kill(appArgs)
            // 获取状态
            case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
            // 输出答应版本号码
            case SparkSubmitAction.PRINT_VERSION => printVersion()
        }
    }

因为是提交任务 流程 我们调集 submit(appArgs,uninitLog)

  /**
     * Submit the application using the provided parameters, ensuring to first wrap
     * in a doAs when --proxy-user is specified.
     */
    @tailrec private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
        
        def doRunMain(): Unit = {
            if (args.proxyUser != null) {
                val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser())
                try {
                    proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
                        override def run(): Unit = {
                            runMain(args, uninitLog)
                        }
                    })
                } catch {
                    case e: Exception => // Hadoop's AuthorizationException suppresses the exception's stack trace, which
                        // makes the message printed to the output by the JVM not very helpful. Instead,
                        // detect exceptions with empty stack traces here, and treat them differently.
                        if (e.getStackTrace().length == 0) {
                            error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
                        } else {
                            throw e
                        }
                }
            } else {
               
                runMain(args, uninitLog)
            }
        }
        
        // In standalone cluster mode, there are two submission gateways:
        //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
        //   (2) The new REST-based gateway introduced in Spark 1.3
        // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
        // to use the legacy gateway if the master endpoint turns out to be not a REST server.
        if (args.isStandaloneCluster && args.useRest) {
            try {
             
                logInfo("Running Spark using the REST application submission protocol.")
                doRunMain()
            } catch {
                // Fail over to use the legacy submission gateway
                case e: SubmitRestConnectionException => logWarning(
                    s"Master endpoint ${args.master} was not a REST server. " + "Falling back to legacy submission gateway instead.")
                    args.useRest = false
                    submit(args, false)
            } // In all other modes, just run the main class as prepared
        } else {
         
            doRunMain()
        }
    }
    

我们可以看出来 不论走哪个判断 都会调用runMain() 这个方法

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
        
        /**
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释: 通过设置适当的类路径,系统属性和应用程序参数来准备启动环境,以便基于集群管理和部署模式运行子主类。
         */
        // Let the main class re-initialize the logging system once it starts.
        val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
        if (uninitLog) {
            Logging.uninitialize()
        }
        
        if (args.verbose) {
            logInfo(s"Main class:\n$childMainClass")
            logInfo(s"Arguments:\n${childArgs.mkString("\n")}") // sysProps may contain sensitive information, so redact before printing
            logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
            logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
            logInfo("\n")
        }
        
        val loader = if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
            new ChildFirstURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader)
        } else {
            new MutableURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader)
        }
        Thread.currentThread.setContextClassLoader(loader)
        
        for (jar <- childClasspath) {
            addJarToClasspath(jar, loader)
        }
        
        var mainClass: Class[_] = null
        
        try {
            mainClass = Utils.classForName(childMainClass)
        } catch {
            case e: ClassNotFoundException => logWarning(s"Failed to load $childMainClass.", e)
                if (childMainClass.contains("thriftserver")) {
                    logInfo(s"Failed to load main class $childMainClass.")
                    logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
                }
                throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
            case e: NoClassDefFoundError => logWarning(s"Failed to load $childMainClass: ${e.getMessage()}")
                if (e.getMessage.contains("org/apache/hadoop/hive")) {
                    logInfo(s"Failed to load hive class.")
                    logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
                }
                throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
        }
        
        /**
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释: 完整的代码:
         *   val app: SparkApplication = new JavaMainApplication(mainClass)
         *   封装一个 SparkApplication
         */
        val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
            mainClass.newInstance().asInstanceOf[SparkApplication]
        } else {
            // SPARK-4170
            if (classOf[scala.App].isAssignableFrom(mainClass)) {
                logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
            }
            
            // TODO_MA 注释: 运行主类
            new JavaMainApplication(mainClass)
        }
        
        @tailrec def findCause(t: Throwable): Throwable = t match {
            case e: UndeclaredThrowableException => if (e.getCause() != null) findCause(e.getCause()) else e
            case e: InvocationTargetException => if (e.getCause() != null) findCause(e.getCause()) else e
            case e: Throwable => e
        }
        
        /**
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释: 启动应用程序
         */
        try {
            app.start(childArgs.toArray, sparkConf)
        } catch {
            case t: Throwable => throw findCause(t)
        }
    }

prepareSubmitEnvironment()方法 进行参数的解析

private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments,
        conf: Option[HadoopConfiguration] = None): (Seq[String], Seq[String], SparkConf, String) = {
        // Return values
        val childArgs = new ArrayBuffer[String]()
        val childClasspath = new ArrayBuffer[String]()
        val sparkConf = new SparkConf()
        var childMainClass = ""
        
        /**
         *   注释:设置集群管理器,从这个列表中可以得到信息:spark目前支持的集群管理器包含:YARN,STANDLONE,MESOS,KUBERNETES,LOCAL,
         *   在 spark-submit 参数的 --master 中指定。
         */
        // Set the cluster manager
        val clusterManager: Int = args.master match {
            case "yarn" => YARN
            case "yarn-client" | "yarn-cluster" => logWarning( // TODO_MA 注释:spark2.0之前可以使用yarn-cleint,yarn-cluster作为--master参数,从spark2.0起,不再支持,这里默认自动转化为yarn,并给出警告信息。
                s"Master ${args.master} is deprecated since 2.0." + " Please use master \"yarn\" with specified deploy mode instead.")
                YARN
            case m if m.startsWith("spark") => STANDALONE
            case m if m.startsWith("mesos") => MESOS
            case m if m.startsWith("k8s") => KUBERNETES
            case m if m.startsWith("local") => LOCAL
            case _ => error("Master must either be yarn or start with spark, mesos, k8s, or local")
                -1
        }
        
        /**
         *   设置部署模式--deploy-mode,默认为client模式。
         */
        // Set the deploy mode; default is client mode
        var deployMode: Int = args.deployMode match {
            case "client" | null => CLIENT
            case "cluster" => CLUSTER
            case _ => error("Deploy mode must be either client or cluster")
                -1
        }
        
        /**
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释:由于 "yarn-cluster" 和 "yarn-client" 方式已被弃用,因此封装了 --master 和 --deploy-mode。
         *   如果只指定了一个 --master 和 --deploy-mode,我们有一些逻辑来推断它们之间的关系;如果它们不一致,我们可以提前退出。
         */
        // Because the deprecated way of specifying "yarn-cluster" and "yarn-client" encapsulate both
        // the master and deploy mode, we have some logic to infer the master and deploy mode
        // from each other if only one is specified, or exit early if they are at odds.
        if (clusterManager == YARN) {
            (args.master, args.deployMode) match {
                case ("yarn-cluster", null) => deployMode = CLUSTER
                    args.master = "yarn"
                case ("yarn-cluster", "client") => error("Client deploy mode is not compatible with master \"yarn-cluster\"")
                case ("yarn-client", "cluster") => error("Cluster deploy mode is not compatible with master \"yarn-client\"")
                case (_, mode) => args.master = "yarn"
            }
            
            // 如果我们想去使用YARN的话,必须确保它包含在我们产品中。
            // Make sure YARN is included in our build if we're trying to use it
            if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
                error("Could not load YARN classes. " + "This copy of Spark may not have been compiled with YARN support.")
            }
        }
        
        if (clusterManager == KUBERNETES) {
            args.master = Utils.checkAndGetK8sMasterUrl(args.master) // Make sure KUBERNETES is included in our build if we're trying to use it
            if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
                error("Could not load KUBERNETES classes. " + "This copy of Spark may not have been compiled with KUBERNETES support.")
            }
        }
        
        // TODO_MA 注释:下边的一些模式是不支持,尽早让它们失败。
        // Fail fast, the following modes are not supported or applicable
        (clusterManager, deployMode) match {
            case (STANDALONE, CLUSTER) if args.isPython => error(
                "Cluster deploy mode is currently not supported for python " + "applications on standalone clusters.")
            case (STANDALONE, CLUSTER) if args.isR => error(
                "Cluster deploy mode is currently not supported for R " + "applications on standalone clusters.")
            case (LOCAL, CLUSTER) => error("Cluster deploy mode is not compatible with master \"local\"")
            case (_, CLUSTER) if isShell(args.primaryResource) => error("Cluster deploy mode is not applicable to Spark shells.")
            case (_, CLUSTER) if isSqlShell(args.mainClass) => error("Cluster deploy mode is not applicable to Spark SQL shell.")
            case (_, CLUSTER) if isThriftServer(args.mainClass) => error("Cluster deploy mode is not applicable to Spark Thrift server.")
            case _ =>
        }
        
        // Update args.deployMode if it is null. It will be passed down as a Spark property later.
        (args.deployMode, deployMode) match {
            case (null, CLIENT) => args.deployMode = "client"
            case (null, CLUSTER) => args.deployMode = "cluster"
            case _ =>
        }
        val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
        val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
        val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
        val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
        val isMesosClient = clusterManager == MESOS && deployMode == CLIENT
        
        if (!isMesosCluster && !isStandAloneCluster) {
            // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
            // too for packages that include Python code
            val resolvedMavenCoordinates = DependencyUtils
                .resolveMavenDependencies(args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath, args.ivySettingsPath)
            
            if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
                args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
                if (args.isPython || isInternal(args.primaryResource)) {
                    args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
                }
            }
            
            // install any R packages that may have been passed through --jars or --packages.
            // Spark Packages may contain R source code inside the jar.
            if (args.isR && !StringUtils.isBlank(args.jars)) {
                RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
            }
        }
        
        args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
        val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
        val targetDir = Utils.createTempDir()
        
        // assure a keytab is available from any place in a JVM
        if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) {
            if (args.principal != null) {
                if (args.keytab != null) {
                    require(new File(args.keytab).exists(),
                        s"Keytab file: ${args.keytab} does not exist") // Add keytab and principal configurations in sysProps to make them available
                    // for later use; e.g. in spark sql, the isolated class loader used to talk
                    // to HiveMetastore will use these settings. They will be set as Java system
                    // properties and then loaded by SparkConf
                    sparkConf.set(KEYTAB, args.keytab)
                    sparkConf.set(PRINCIPAL, args.principal)
                    UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
                }
            }
        }
        
        // Resolve glob path for different resources.
        args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
        args.files = Option(args.files).map(resolveGlobPaths(_, hadoopConf)).orNull
        args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull
        args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull
        
        lazy val secMgr = new SecurityManager(sparkConf)
        
        // In client mode, download remote files.
        var localPrimaryResource: String = null
        var localJars: String = null
        var localPyFiles: String = null
        if (deployMode == CLIENT) {
            localPrimaryResource = Option(args.primaryResource).map {
                downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
            }.orNull
            localJars = Option(args.jars).map {
                downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
            }.orNull
            localPyFiles = Option(args.pyFiles).map {
                downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
            }.orNull
        }
        
        // When running in YARN, for some remote resources with scheme:
        //   1. Hadoop FileSystem doesn't support them.
        //   2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes".
        // We will download them to local disk prior to add to YARN's distributed cache.
        // For yarn client mode, since we already download them with above code, so we only need to
        // figure out the local path and replace the remote one.
        if (clusterManager == YARN) {
            val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
            
            def shouldDownload(scheme: String): Boolean = {
                forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) || Try {
                    FileSystem.getFileSystemClass(scheme, hadoopConf)
                }.isFailure
            }
            
            def downloadResource(resource: String): String = {
                val uri = Utils.resolveURI(resource)
                uri.getScheme match {
                    case "local" | "file" => resource
                    case e if shouldDownload(e) => val file = new File(targetDir, new Path(uri).getName)
                        if (file.exists()) {
                            file.toURI.toString
                        } else {
                            downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr)
                        }
                    case _ => uri.toString
                }
            }
            
            args.primaryResource = Option(args.primaryResource).map {
                downloadResource
            }.orNull
            args.files = Option(args.files).map { files =>
                Utils.stringToSeq(files).map(downloadResource).mkString(",")
            }.orNull
            args.pyFiles = Option(args.pyFiles).map { pyFiles =>
                Utils.stringToSeq(pyFiles).map(downloadResource).mkString(",")
            }.orNull
            args.jars = Option(args.jars).map { jars =>
                Utils.stringToSeq(jars).map(downloadResource).mkString(",")
            }.orNull
            args.archives = Option(args.archives).map { archives =>
                Utils.stringToSeq(archives).map(downloadResource).mkString(",")
            }.orNull
        }
        
        // If we're running a python app, set the main class to our specific python runner
        if (args.isPython && deployMode == CLIENT) {
            if (args.primaryResource == PYSPARK_SHELL) {
                args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
            } else {
                // If a python file is provided, add it to the child arguments and list of files to deploy.
                // Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
                args.mainClass = "org.apache.spark.deploy.PythonRunner"
                args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
            }
            if (clusterManager != YARN) {
                // The YARN backend handles python files differently, so don't merge the lists.
                args.files = mergeFileLists(args.files, args.pyFiles)
            }
        }
        
        if (localPyFiles != null) {
            sparkConf.set("spark.submit.pyFiles", localPyFiles)
        }
        
        // In YARN mode for an R app, add the SparkR package archive and the R package
        // archive containing all of the built R libraries to archives so that they can
        // be distributed with the job
        if (args.isR && clusterManager == YARN) {
            val sparkRPackagePath = RUtils.localSparkRPackagePath
            if (sparkRPackagePath.isEmpty) {
                error("SPARK_HOME does not exist for R application in YARN mode.")
            }
            val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
            if (!sparkRPackageFile.exists()) {
                error(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
            }
            val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString
            
            // Distribute the SparkR package.
            // Assigns a symbol link name "sparkr" to the shipped package.
            args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")
            
            // Distribute the R package archive containing all the built R packages.
            if (!RUtils.rPackages.isEmpty) {
                val rPackageFile = RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
                if (!rPackageFile.exists()) {
                    error("Failed to zip all the built R packages.")
                }
                
                val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString // Assigns a symbol link name "rpkg" to the shipped package.
                args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
            }
        }
        
        // TODO: Support distributing R packages with standalone cluster
        if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
            error("Distributing R packages with standalone cluster is not supported.")
        }
        
        // TODO: Support distributing R packages with mesos cluster
        if (args.isR && clusterManager == MESOS && !RUtils.rPackages.isEmpty) {
            error("Distributing R packages with mesos cluster is not supported.")
        }
        
        // If we're running an R app, set the main class to our specific R runner
        if (args.isR && deployMode == CLIENT) {
            if (args.primaryResource == SPARKR_SHELL) {
                args.mainClass = "org.apache.spark.api.r.RBackend"
            } else {
                // If an R file is provided, add it to the child arguments and list of files to deploy.
                // Usage: RRunner <main R file> [app arguments]
                args.mainClass = "org.apache.spark.deploy.RRunner"
                args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
                args.files = mergeFileLists(args.files, args.primaryResource)
            }
        }
        
        if (isYarnCluster && args.isR) {
            // In yarn-cluster mode for an R app, add primary resource to files
            // that can be distributed with the job
            args.files = mergeFileLists(args.files, args.primaryResource)
        }
        
        // Special flag to avoid deprecation warnings at the client
        sys.props("SPARK_SUBMIT") = "true"
        
        // A list of rules to map each argument to system properties or command-line options in
        // each deploy mode; we iterate through these below
        val options = List[OptionAssigner]( // All cluster managers
            OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
            OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.submit.deployMode"),
            OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"),
            OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"),
            OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.driver.memory"),
            OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.driver.extraClassPath"),
            OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.driver.extraJavaOptions"),
            OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.driver.extraLibraryPath"),
            
            // Propagate attributes for dependency resolution at the driver side
            OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"),
            OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.repositories"),
            OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.ivy"),
            OptionAssigner(args.packagesExclusions, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.excludes"),
            
            // Yarn only
            OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"),
            OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, confKey = "spark.executor.instances"),
            OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles"),
            OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars"),
            OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files"),
            OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives"),
            OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"),
            OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),
            
            // Other options
            OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.executor.cores"),
            OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.executor.memory"),
            OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.cores.max"),
            OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.files"),
            OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
            OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.jars"),
            OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, confKey = "spark.driver.memory"),
            OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, confKey = "spark.driver.cores"),
            OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, confKey = "spark.driver.supervise"),
            OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"),
            
            // An internal option used only for spark-shell to add user jars to repl's classloader,
            // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
            // remote jars, so adding a new option to only specify local jars for spark-shell internally.
            OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars"))
        
        // In client mode, launch the application main class directly
        // In addition, add the main application jar and any added jars (if any) to the classpath
        if (deployMode == CLIENT) {
            childMainClass = args.mainClass
            if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
                childClasspath += localPrimaryResource
            }
            if (localJars != null) {
                childClasspath ++= localJars.split(",")
            }
        } // Add the main application jar and any added jars to classpath in case YARN client
        // requires these jars.
        // This assumes both primaryResource and user jars are local jars, or already downloaded
        // to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be
        // added to the classpath of YARN client.
        if (isYarnCluster) {
            if (isUserJar(args.primaryResource)) {
                childClasspath += args.primaryResource
            }
            if (args.jars != null) {
                childClasspath ++= args.jars.split(",")
            }
        }
        
        if (deployMode == CLIENT) {
            if (args.childArgs != null) {
                childArgs ++= args.childArgs
            }
        }
        
        // Map all arguments to command-line options or system properties for our chosen mode
        for (opt <- options) {
            if (opt.value != null && (deployMode & opt.deployMode) != 0 && (clusterManager & opt.clusterManager) != 0) {
                if (opt.clOption != null) {
                    childArgs += (opt.clOption, opt.value)
                }
                if (opt.confKey != null) {
                    sparkConf.set(opt.confKey, opt.value)
                }
            }
        }
        
        // In case of shells, spark.ui.showConsoleProgress can be true by default or by user.
        if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) {
            sparkConf.set(UI_SHOW_CONSOLE_PROGRESS, true)
        }
        
        // Add the application jar automatically so the user doesn't have to call sc.addJar
        // For YARN cluster mode, the jar is already distributed on each node as "app.jar"
        // For python and R files, the primary resource is already distributed as a regular file
        if (!isYarnCluster && !args.isPython && !args.isR) {
            var jars = sparkConf.getOption("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
            if (isUserJar(args.primaryResource)) {
                jars = jars ++ Seq(args.primaryResource)
            }
            sparkConf.set("spark.jars", jars.mkString(","))
        }
        
        // In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
        // All Spark parameters are expected to be passed to the client through system properties.
        if (args.isStandaloneCluster) {
            if (args.useRest) {
                childMainClass = REST_CLUSTER_SUBMIT_CLASS
                childArgs += (args.primaryResource, args.mainClass)
            } else {
                // In legacy standalone cluster mode, use Client as a wrapper around the user class
                childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
                if (args.supervise) {
                    childArgs += "--supervise"
                }
                Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
                Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
                childArgs += "launch"
                childArgs += (args.master, args.primaryResource, args.mainClass)
            }
            if (args.childArgs != null) {
                childArgs ++= args.childArgs
            }
        }
        
        // Let YARN know it's a pyspark app, so it distributes needed libraries.
        if (clusterManager == YARN) {
            if (args.isPython) {
                sparkConf.set("spark.yarn.isPython", "true")
            }
        }
        
        if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
            setRMPrincipal(sparkConf)
        }
        
        /**
         *  如果提交到 YARN 执行,则运行主类为:YARN_CLUSTER_SUBMIT_CLASS
         * 如果我们将任务提交到YARN集群去执行 那么就是为 YARN_CLUSTER_SUBMIT_CLASS
         * 这个变量所对应的类 运行。
		 * 
		 * 对于YARN和K8S模式 后面更新博文细讲
         */
        // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
        if (isYarnCluster) {
            childMainClass = YARN_CLUSTER_SUBMIT_CLASS
            if (args.isPython) {
                childArgs += ("--primary-py-file", args.primaryResource)
                childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
            } else if (args.isR) {
                val mainFile = new Path(args.primaryResource).getName
                childArgs += ("--primary-r-file", mainFile)
                childArgs += ("--class", "org.apache.spark.deploy.RRunner")
            } else {
                if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
                    childArgs += ("--jar", args.primaryResource)
                }
                childArgs += ("--class", args.mainClass)
            }
            if (args.childArgs != null) {
                args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
            }
        }
        
        if (isMesosCluster) {
            assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
            childMainClass = REST_CLUSTER_SUBMIT_CLASS
            if (args.isPython) {
                // Second argument is main class
                childArgs += (args.primaryResource, "")
                if (args.pyFiles != null) {
                    sparkConf.set("spark.submit.pyFiles", args.pyFiles)
                }
            } else if (args.isR) {
                // Second argument is main class
                childArgs += (args.primaryResource, "")
            } else {
                childArgs += (args.primaryResource, args.mainClass)
            }
            if (args.childArgs != null) {
                childArgs ++= args.childArgs
            }
        }
        
        if (isKubernetesCluster) {
            childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
            if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
                if (args.isPython) {
                    childArgs ++= Array("--primary-py-file", args.primaryResource)
                    childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
                    if (args.pyFiles != null) {
                        childArgs ++= Array("--other-py-files", args.pyFiles)
                    }
                } else if (args.isR) {
                    childArgs ++= Array("--primary-r-file", args.primaryResource)
                    childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
                } else {
                    childArgs ++= Array("--primary-java-resource", args.primaryResource)
                    childArgs ++= Array("--main-class", args.mainClass)
                }
            } else {
                childArgs ++= Array("--main-class", args.mainClass)
            }
            if (args.childArgs != null) {
                args.childArgs.foreach { arg =>
                    childArgs += ("--arg", arg)
                }
            }
        }
        
        // Load any properties specified through --conf and the default properties file
        for ((k, v) <- args.sparkProperties) {
            sparkConf.setIfMissing(k, v)
        }
        
        // Ignore invalid spark.driver.host in cluster modes.
        if (deployMode == CLUSTER) {
            sparkConf.remove("spark.driver.host")
        }
        
        // Resolve paths in certain spark properties
        val pathConfigs = Seq("spark.jars", "spark.files", "spark.yarn.dist.files", "spark.yarn.dist.archives", "spark.yarn.dist.jars")
        pathConfigs.foreach { config => // Replace old URIs with resolved URIs, if they exist
            sparkConf.getOption(config).foreach { oldValue =>
                sparkConf.set(config, Utils.resolveURIs(oldValue))
            }
        }
        
        // Resolve and format python file paths properly before adding them to the PYTHONPATH.
        // The resolving part is redundant in the case of --py-files, but necessary if the user
        // explicitly sets `spark.submit.pyFiles` in his/her default properties file.
        sparkConf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
            val resolvedPyFiles = Utils.resolveURIs(pyFiles)
            val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
                PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
            } else {
                // Ignoring formatting python path in yarn and mesos cluster mode, these two modes
                // support dealing with remote python files, they could distribute and add python files
                // locally.
                resolvedPyFiles
            }
            sparkConf.set("spark.submit.pyFiles", formattedPyFiles)
        }
        
        (childArgs, childClasspath, sparkConf, childMainClass)
    }

然后通过反色将我们的代码 封装成SparkApplication 的Spark的任务封装类 然后再包装成 JavaMainApplication来运行

val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
            mainClass.newInstance().asInstanceOf[SparkApplication]
        } else {
            // SPARK-4170
            if (classOf[scala.App].isAssignableFrom(mainClass)) {
                logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
            }
            
            //运行主类
            new JavaMainApplication(mainClass)
        }

JavaMainApplication 是 SparkApplication的这个特质的实现 实现了start() 方法。 然后通过反射拿到我们的main方法 然后执行

override def start(args: Array[String], conf: SparkConf): Unit = {
        
        /**		通过反射获取 自定义业务逻辑主类的 main 方法实例
         *
         *   new Array[String](0).getClass = $CLASS  =  SparkPi
         */
        val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
        if (!Modifier.isStatic(mainMethod.getModifiers)) {
            throw new IllegalStateException("The main method in the given main class must be static")
        }
        
        val sysProps = conf.getAll.toMap
        sysProps.foreach { case (k, v) => sys.props(k) = v }
        
        /**
         *   才会跳转到 我们自己写的代码的 main 方法中去
         */
        mainMethod.invoke(null, args)
    }

好了 这就是 Spark提交任务的执行流程了

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-23 11:32:08  更:2021-09-23 11:34:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 11:48:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码