Spark 源码解析 任务提交
1: 代码提交
### 我们提交spark任务的步骤为:
### 1):将IDE中所写的scala/java 代码 打包为一个jar包
### 2):将jar 包上传到 服务器
### 3): 通过spark-submit 运行 命令” ./spark-subimt --class org.apache.examples --master yarn ./exmples/jars/spark-examples_2.12-3.0.0.jar “ 这样的shell 命令通过脚本 将我们的代码 提交到spark的集群运行。
### 那就从脚本入手 看看 脚本的内容为 spark-submit :
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
可以看出 这个shell 脚本其实是调用了 spark-class 脚本 传入了参数 org.apache.deploy.SparkSubmit 这个参数
spark-class 脚本:
#!/usr/bin/env bash
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
// 运行脚本 加载saprk的环境
. "${SPARK_HOME}"/bin/load-spark-env.sh
if [ -n "${JAVA_HOME}" ]; then
//初始化 java的命令
RUNNER="${JAVA_HOME}/bin/java"
else
if [ "$(command -v java)" ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
if [ -d "${SPARK_HOME}/jars" ]; then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
echo "You need to build Spark with the target \"package\" before running this program." 1>&2
exit 1
else
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
build_command() {
"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
set +o posix
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
if [ "$CMD_START_FLAG" == "true" ]; then
CMD+=("$ARG")
else
if [ "$ARG" == $'\0' ]; then
DELIM=''
CMD_START_FLAG="true"
elif [ "$ARG" != "" ]; then
echo "$ARG"
fi
fi
done < <(build_command "$@")
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
echo "${CMD[@]}" | head -n-1 1>&2
exit 1
fi
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
exit $LAUNCHER_EXIT_CODE
fi
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
2 :SparkSubmit 类
从spark-submit 脚本中 我们可以看出来 最后提交任务是运行了 组装了java命令 运行了 org.apache.deploy.SparkSubmit 这个类
查看main 方法 入口
override def main(args: Array[String]): Unit = {
val submit = new SparkSubmit() {
self =>
override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
new SparkSubmitArguments(args) {
override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
}
}
override protected def logInfo(msg: => String): Unit = printMessage(msg)
override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
override def doSubmit(args: Array[String]): Unit = {
try {
super.doSubmit(args)
} catch {
case e: SparkUserAppException => exitFn(e.exitCode)
}
}
}
submit.doSubmit(args)
}
调用了 SparkSubmit的doSubmit方法
def doSubmit(args: Array[String]): Unit = {
val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
因为是提交任务 流程 我们调集 submit(appArgs,uninitLog)
@tailrec private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(args, uninitLog)
}
})
} catch {
case e: Exception =>
if (e.getStackTrace().length == 0) {
error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
} else {
throw e
}
}
} else {
runMain(args, uninitLog)
}
}
if (args.isStandaloneCluster && args.useRest) {
try {
logInfo("Running Spark using the REST application submission protocol.")
doRunMain()
} catch {
case e: SubmitRestConnectionException => logWarning(
s"Master endpoint ${args.master} was not a REST server. " + "Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args, false)
}
} else {
doRunMain()
}
}
我们可以看出来 不论走哪个判断 都会调用runMain() 这个方法
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
if (uninitLog) {
Logging.uninitialize()
}
if (args.verbose) {
logInfo(s"Main class:\n$childMainClass")
logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
logInfo("\n")
}
val loader = if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException => logWarning(s"Failed to load $childMainClass.", e)
if (childMainClass.contains("thriftserver")) {
logInfo(s"Failed to load main class $childMainClass.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
}
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError => logWarning(s"Failed to load $childMainClass: ${e.getMessage()}")
if (e.getMessage.contains("org/apache/hadoop/hive")) {
logInfo(s"Failed to load hive class.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
}
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
}
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
} else {
if (classOf[scala.App].isAssignableFrom(mainClass)) {
logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
new JavaMainApplication(mainClass)
}
@tailrec def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException => if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException => if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable => e
}
try {
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable => throw findCause(t)
}
}
prepareSubmitEnvironment()方法 进行参数的解析
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None): (Seq[String], Seq[String], SparkConf, String) = {
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = new SparkConf()
var childMainClass = ""
val clusterManager: Int = args.master match {
case "yarn" => YARN
case "yarn-client" | "yarn-cluster" => logWarning(
s"Master ${args.master} is deprecated since 2.0." + " Please use master \"yarn\" with specified deploy mode instead.")
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ => error("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}
var deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ => error("Deploy mode must be either client or cluster")
-1
}
if (clusterManager == YARN) {
(args.master, args.deployMode) match {
case ("yarn-cluster", null) => deployMode = CLUSTER
args.master = "yarn"
case ("yarn-cluster", "client") => error("Client deploy mode is not compatible with master \"yarn-cluster\"")
case ("yarn-client", "cluster") => error("Cluster deploy mode is not compatible with master \"yarn-client\"")
case (_, mode) => args.master = "yarn"
}
if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
error("Could not load YARN classes. " + "This copy of Spark may not have been compiled with YARN support.")
}
}
if (clusterManager == KUBERNETES) {
args.master = Utils.checkAndGetK8sMasterUrl(args.master)
if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
error("Could not load KUBERNETES classes. " + "This copy of Spark may not have been compiled with KUBERNETES support.")
}
}
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython => error(
"Cluster deploy mode is currently not supported for python " + "applications on standalone clusters.")
case (STANDALONE, CLUSTER) if args.isR => error(
"Cluster deploy mode is currently not supported for R " + "applications on standalone clusters.")
case (LOCAL, CLUSTER) => error("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) => error("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) => error("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) => error("Cluster deploy mode is not applicable to Spark Thrift server.")
case _ =>
}
(args.deployMode, deployMode) match {
case (null, CLIENT) => args.deployMode = "client"
case (null, CLUSTER) => args.deployMode = "cluster"
case _ =>
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
val isMesosClient = clusterManager == MESOS && deployMode == CLIENT
if (!isMesosCluster && !isStandAloneCluster) {
val resolvedMavenCoordinates = DependencyUtils
.resolveMavenDependencies(args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath, args.ivySettingsPath)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
}
}
if (args.isR && !StringUtils.isBlank(args.jars)) {
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
}
}
args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
val targetDir = Utils.createTempDir()
if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) {
if (args.principal != null) {
if (args.keytab != null) {
require(new File(args.keytab).exists(),
s"Keytab file: ${args.keytab} does not exist")
sparkConf.set(KEYTAB, args.keytab)
sparkConf.set(PRINCIPAL, args.principal)
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}
}
args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
args.files = Option(args.files).map(resolveGlobPaths(_, hadoopConf)).orNull
args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull
args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull
lazy val secMgr = new SecurityManager(sparkConf)
var localPrimaryResource: String = null
var localJars: String = null
var localPyFiles: String = null
if (deployMode == CLIENT) {
localPrimaryResource = Option(args.primaryResource).map {
downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
localJars = Option(args.jars).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
localPyFiles = Option(args.pyFiles).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
}
if (clusterManager == YARN) {
val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
def shouldDownload(scheme: String): Boolean = {
forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) || Try {
FileSystem.getFileSystemClass(scheme, hadoopConf)
}.isFailure
}
def downloadResource(resource: String): String = {
val uri = Utils.resolveURI(resource)
uri.getScheme match {
case "local" | "file" => resource
case e if shouldDownload(e) => val file = new File(targetDir, new Path(uri).getName)
if (file.exists()) {
file.toURI.toString
} else {
downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr)
}
case _ => uri.toString
}
}
args.primaryResource = Option(args.primaryResource).map {
downloadResource
}.orNull
args.files = Option(args.files).map { files =>
Utils.stringToSeq(files).map(downloadResource).mkString(",")
}.orNull
args.pyFiles = Option(args.pyFiles).map { pyFiles =>
Utils.stringToSeq(pyFiles).map(downloadResource).mkString(",")
}.orNull
args.jars = Option(args.jars).map { jars =>
Utils.stringToSeq(jars).map(downloadResource).mkString(",")
}.orNull
args.archives = Option(args.archives).map { archives =>
Utils.stringToSeq(archives).map(downloadResource).mkString(",")
}.orNull
}
if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
} else {
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
}
if (clusterManager != YARN) {
args.files = mergeFileLists(args.files, args.pyFiles)
}
}
if (localPyFiles != null) {
sparkConf.set("spark.submit.pyFiles", localPyFiles)
}
if (args.isR && clusterManager == YARN) {
val sparkRPackagePath = RUtils.localSparkRPackagePath
if (sparkRPackagePath.isEmpty) {
error("SPARK_HOME does not exist for R application in YARN mode.")
}
val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
if (!sparkRPackageFile.exists()) {
error(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
}
val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString
args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")
if (!RUtils.rPackages.isEmpty) {
val rPackageFile = RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
error("Failed to zip all the built R packages.")
}
val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
}
}
if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
error("Distributing R packages with standalone cluster is not supported.")
}
if (args.isR && clusterManager == MESOS && !RUtils.rPackages.isEmpty) {
error("Distributing R packages with mesos cluster is not supported.")
}
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
args.mainClass = "org.apache.spark.api.r.RBackend"
} else {
args.mainClass = "org.apache.spark.deploy.RRunner"
args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
}
}
if (isYarnCluster && args.isR) {
args.files = mergeFileLists(args.files, args.primaryResource)
}
sys.props("SPARK_SUBMIT") = "true"
val options = List[OptionAssigner](
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.submit.deployMode"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.driver.extraLibraryPath"),
OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"),
OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.repositories"),
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.ivy"),
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.excludes"),
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, confKey = "spark.executor.instances"),
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles"),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars"),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives"),
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.files"),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, confKey = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, confKey = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, confKey = "spark.driver.supervise"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"),
OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars"))
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
}
if (localJars != null) {
childClasspath ++= localJars.split(",")
}
}
if (isYarnCluster) {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
if (args.jars != null) {
childClasspath ++= args.jars.split(",")
}
}
if (deployMode == CLIENT) {
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}
for (opt <- options) {
if (opt.value != null && (deployMode & opt.deployMode) != 0 && (clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) {
childArgs += (opt.clOption, opt.value)
}
if (opt.confKey != null) {
sparkConf.set(opt.confKey, opt.value)
}
}
}
if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) {
sparkConf.set(UI_SHOW_CONSOLE_PROGRESS, true)
}
if (!isYarnCluster && !args.isPython && !args.isR) {
var jars = sparkConf.getOption("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sparkConf.set("spark.jars", jars.mkString(","))
}
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = REST_CLUSTER_SUBMIT_CLASS
childArgs += (args.primaryResource, args.mainClass)
} else {
childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
if (args.supervise) {
childArgs += "--supervise"
}
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}
if (clusterManager == YARN) {
if (args.isPython) {
sparkConf.set("spark.yarn.isPython", "true")
}
}
if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
setRMPrincipal(sparkConf)
}
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}
if (isMesosCluster) {
assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
childMainClass = REST_CLUSTER_SUBMIT_CLASS
if (args.isPython) {
childArgs += (args.primaryResource, "")
if (args.pyFiles != null) {
sparkConf.set("spark.submit.pyFiles", args.pyFiles)
}
} else if (args.isR) {
childArgs += (args.primaryResource, "")
} else {
childArgs += (args.primaryResource, args.mainClass)
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}
if (isKubernetesCluster) {
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
if (args.isPython) {
childArgs ++= Array("--primary-py-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
if (args.pyFiles != null) {
childArgs ++= Array("--other-py-files", args.pyFiles)
}
} else if (args.isR) {
childArgs ++= Array("--primary-r-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
} else {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
}
} else {
childArgs ++= Array("--main-class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += ("--arg", arg)
}
}
}
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
}
if (deployMode == CLUSTER) {
sparkConf.remove("spark.driver.host")
}
val pathConfigs = Seq("spark.jars", "spark.files", "spark.yarn.dist.files", "spark.yarn.dist.archives", "spark.yarn.dist.jars")
pathConfigs.foreach { config =>
sparkConf.getOption(config).foreach { oldValue =>
sparkConf.set(config, Utils.resolveURIs(oldValue))
}
}
sparkConf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
val resolvedPyFiles = Utils.resolveURIs(pyFiles)
val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
} else {
resolvedPyFiles
}
sparkConf.set("spark.submit.pyFiles", formattedPyFiles)
}
(childArgs, childClasspath, sparkConf, childMainClass)
}
然后通过反色将我们的代码 封装成SparkApplication 的Spark的任务封装类 然后再包装成 JavaMainApplication来运行
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
} else {
if (classOf[scala.App].isAssignableFrom(mainClass)) {
logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
new JavaMainApplication(mainClass)
}
JavaMainApplication 是 SparkApplication的这个特质的实现 实现了start() 方法。 然后通过反射拿到我们的main方法 然后执行
override def start(args: Array[String], conf: SparkConf): Unit = {
val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
val sysProps = conf.getAll.toMap
sysProps.foreach { case (k, v) => sys.props(k) = v }
mainMethod.invoke(null, args)
}
好了 这就是 Spark提交任务的执行流程了
|