这里是Yarn的Cluster模式,还有Yarn的Client模式以及StandAlone的Cluster和Client模式,这里先看最经典的;
Yarn-Cluster模式:
Cluster 模式将用于监控和调度的 Driver 模块启动在 Yarn 集群资源中执行。一般应用于实际生产环境。
1) 执行脚本提交任务,实际是启动一个 SparkSubmit 的 JVM 进程;
2) SparkSubmit 类中的 main 方法反射调用 YarnClusterApplication 的 main 方法;
3) YarnClusterApplication 创建 Yarn 客户端(Client类中封装的yarnClient对象),然后向 RM发送执行指令:bin/java ApplicationMaster;
4)随后 ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster
5)ApplicationMaster 启动 Driver 线程,执行用户的作业;
6) AM 向 RM 注册,申请资源;
7) 获取资源后 AM 向 NM 发送指令:bin/java YarnCoarseGrainedExecutorBackend;启动ExecutorBackend;
8)Executor启动后会向Driver进行反向注册,实例化Executor对象,等待任务;
9)Executor全部注册完成后Driver开始执行main函数;之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。

源码解析:
源码分析
1.1、SparkSubmit起点
def main(args: Array[String]): Unit = {
val submit = new SparkSubmit()
submit.doSubmit(args)
}
Spark-submit是一个提交程序;在main方法中val submit = new SparkSubmit()类;
submit.doSubmit(args) ; args为命令行参数;比如–master、–class等;
那么这些参数是如何被使用的呢?
doSubmit中有一个步骤appArgs = parseArguments(args);
parseArguments方法中只有一步,就是构建一个SparkSubmitArguments.Scala对象,也即上图中的橙色框内容;
SparkSubmitArguments对象中有一个方法parse(args.asJava),也即这个对象会把传入的参数进行解析,解析方式为正则表达式;会把传入的args拆分成opt和value两部分;也即指令和内容两部分,然后进行一个匹配操作(handle方法),完成操作的解析;
在appArgs = parseArguments(args)这一步下还有一个步骤:
val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
这里的action是从哪里得到的呢?
还是在SparkSubmitArguments.Scala类中有以下这么一个操作:
action = Option(action).getOrElse(SUBMIT)
if (args.isStandaloneCluster && args.useRest) {
......
}
else {
doRunMain()
}
if (args.proxyUser != null) {
}
else {
runMain(args, uninitLog)
}
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
if (uninitLog) {
Logging.uninitialize()
}
1.2、向Yarn提交应用
val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
......
}
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}
......
try {
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable =>
throw findCause(t)
}
接下来分析prepareSubmitEnvironment方法:
val clusterManager: Int = args.master match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}
......
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
"org.apache.spark.deploy.yarn.YarnClusterApplication"
......
}
private[spark] class YarnClusterApplication extends SparkApplication {
override def start(args: Array[], : Unit = {
conf.remove(JARS)
conf.remove(FILES)
new Client(new ClientArguments(args), conf, null).run()
}
}

1.3、Yarn集群分析:
private val yarnClient = YarnClient.createYarnClient
public static YarnClient createYarnClient() {
YarnClient client = new YarnClientImpl();
return client;
}
protected ApplicationClientProtocol rmClient;
new Client(new ClientArguments(args), conf, null).run();
def run(): Unit = {
this.appId = submitApplication()
if (!launcherBackend.isConnected() && fireAndForget) {
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState
logInfo(s"Application report for $appId (state: $state)")
logInfo(formatReportDetails(report))
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
throw new SparkException(s"Application $appId finished with status: $state")
}
} else {
val YarnAppReport(appState, finalState, diags) = monitorApplication(appId)
if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
diags.foreach { err =>
logError(s"Application diagnostics message: $err")
}
throw new SparkException(s"Application $appId finished with failed status")
}
if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {
throw new SparkException(s"Application $appId is killed")
}
if (finalState == FinalApplicationStatus.UNDEFINED) {
throw new SparkException(s"The final status of application $appId is undefined")
}
}
}
ResourceRequestHelper.validateResources(sparkConf)
var appId: ApplicationId = null
try {
launcherBackend.connect()
yarnClient.init(hadoopConf)
yarnClient.start()
......
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
......
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)
......
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
appId
}
val containerContext = createContainerLaunchContext(newAppResponse)
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
amContainer.setCommands(printableCommands.asJava)
......
amContainer

ApplicationMaster启动
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
val sparkConf = new SparkConf()
if (amArgs.propertiesFile != null) {
Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
sparkConf.set(k, v)
}
}
val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
private val client = new YarnRMClient()
ugi.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = System.exit(master.run())
})
}
if (isClusterMode) {
runDriver()
} else {
runExecutorLauncher()
}
private def runDriver(): Unit = {
addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
userClassThread = startUserApplication()
sparkContext的环境对象,否则进程阻塞无法往后走;
logInfo("Waiting for spark context initialization...")
val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
try {
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
val rpcEnv = sc.env.rpcEnv
val userConf = sc.getConf
val host = userConf.get(DRIVER_HOST_ADDRESS)
val port = userConf.get(DRIVER_PORT)
registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(host, port),
YarnSchedulerBackend.ENDPOINT_NAME)
createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
} else {
throw new IllegalStateException("User did not initialize spark context!")
}
resumeDriver()
userClassThread.join()
}
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
...
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]]
val userThread = new Thread {
override def run(): Unit = {
try {
if (!Modifier.isStatic(mainMethod.getModifiers)) {
logError(s"Could not find static main method in object ${args.userClass}")
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
} else {
mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running user class")
}
......
}
}
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
userThread

黄色的Driver表示线程、绿色的ApplicationMaster表示进程;
try {
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
val rpcEnv = sc.env.rpcEnv
val userConf = sc.getConf
val host = userConf.get(DRIVER_HOST_ADDRESS)
val port = userConf.get(DRIVER_PORT)
registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(host, port),
YarnSchedulerBackend.ENDPOINT_NAME)
createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
} else {
throw new IllegalStateException("User did not initialize spark context!")
}
resumeDriver()
userClassThread.join()
}
allocator.allocateResources()
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()
allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)
if (allocatedContainers.size > 0) {
logDebug(("Allocated containers: %d. Current executor count: %d. " +
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
runningExecutors.size,
numExecutorsStarting.get,
allocateResponse.getAvailableResources))
handleAllocatedContainers(allocatedContainers.asScala)
}
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
......
......
......
runAllocatedContainers(containersToUse)
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
for (container <- containersToUse) {
......
if (runningExecutors.size() < targetNumExecutors) {
numExecutorsStarting.incrementAndGet()
if (launchContainers) {
launcherPool.execute(() => {
try {
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
).run()
......
def run(): Unit = {
logDebug("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(conf)
nmClient.start()
startContainer()
}
def startContainer(): java.util.Map[String, ByteBuffer] = {
......
val commands = prepareCommand()
ctx.setCommands(commands.asJava)
......
try {
nmClient.startContainer(container.get, ctx)
}
......
YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++
Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId,
"--resourceProfileId", resourceProfileId.toString) ++
接下来看最后两步 
def main(args: Array[String]): Unit = {
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.resourcesFileOpt, resourceProfile)
}
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
this.getClass.getCanonicalName.stripSuffix("$"))
CoarseGrainedExecutorBackend.run(backendArgs, createFn)
System.exit(0)
}
def run(
arguments: Arguments,
backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend): Unit = {
......
val executorConf = new SparkConf
val fetcher = RpcEnv.create(
"driverPropsFetcher",
arguments.bindAddress,
arguments.hostname,
-1,
executorConf,
new SecurityManager(executorConf),
......
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
env.rpcEnv.setupEndpoint("Executor",
backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
dispatcher.registerRpcEndpoint(name, endpoint)
}
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
......
var messageLoop: MessageLoop = null
try {
messageLoop = endpoint match {
case e: IsolatedRpcEndpoint =>
new DedicatedMessageLoop(name, e, this)
case _ =>
sharedLoop.register(name, endpoint)
sharedLoop
}
endpoints.put(name, messageLoop)
} catch {
case NonFatal(e) =>
endpointRefs.remove(endpoint)
throw e
}
}
endpointRef
}
private class DedicatedMessageLoop(
name: String,
endpoint: IsolatedRpcEndpoint,
dispatcher: Dispatcher)
extends MessageLoop(dispatcher) {
private val inbox = new Inbox(name, endpoint)
override protected val threadpool = if (endpoint.threadCount() > 1) {
ThreadUtils.newDaemonCachedThreadPool(s"dispatcher-$name", endpoint.threadCount())
} else {
ThreadUtils.newDaemonSingleThreadExecutor(s"dispatcher-$name")
}
......
private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
extends Logging {
inbox =>
@GuardedBy("this")
protected val messages = new java.util.LinkedList[InboxMessage]()
@GuardedBy("this")
private var stopped = false
@GuardedBy("this")
private var enableConcurrent = false
@GuardedBy("this")
private var numActiveThreads = 0
inbox.synchronized {
messages.add(OnStart)
}
......
private[spark] class CoarseGrainedExecutorBackend(
......
override def onStart(): Unit = {
logInfo("Connecting to driver: " + driverUrl)
try {
_resources = parseOrFindResources(resourcesFileOpt)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
extractAttributes, _resources, resourceProfile.id))
}(ThreadUtils.sameThread).onComplete {
case Success(_) =>
self.send(RegisteredExecutor)
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
attributes, resources, resourceProfileId) =>
......
context.reply(true)
}
}(ThreadUtils.sameThread).onComplete {
case Success(_) =>
self.send(RegisteredExecutor)
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
driver.get.send(LaunchedExecutor(executorId))
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
case LaunchedExecutor(executorId) =>
executorDataMap.get(executorId).foreach { data =>
data.freeCores = data.totalCores
}
makeOffers(executorId)

private def runDriver(): Unit = {
......
resumeDriver()
userClassThread.join()
......
后续更新Client模式下的分析;
|