Spark 版本:3.0.0
找到入口
- 理解一个项目最好的切入点就是找到一个入口。
比如跟 Spark 的交互方式,如何把任务提交到 Spark 集群。通常使用 spark-submit 脚本来提交任务。它在 Spark 源码的 bin 目录下
bin 文件夹中在存放了 spark-shell 等其他入口方式。
- 我们来看一下
spark-submit 的具体内容。
可以看到是一个shell脚本,加载的类是 org.apache.spark.deploy.SparkSubmit 。 $@ 把执行的参数带进去。接下来我们找到具体的类 SparkSubmit ,在 deploy 这个包下。
SparkSubmit Object
上一步的 spark-submit 会创建一个Java进程,执行SparkSumbit.scala 文件的逻辑,这个文件有1300多行代码,一上来看哪里呢?还是抓主线,找入口,我们找到 main 函数。
main 方法是JVM执行的入口。
override def main(args: Array[String]): Unit = {
val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
printStream.println(appArgs)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
在main 函数中,主要做了这几件事
1. 准备参数
这部分会 parse 命令行中的参数,如 --master local[*] 转成 SparkSubmitArguments 对象中的成员变量。同时检查参数输入的正不正确等逻辑。
2. 执行任务
如果是提交任务,着重看submit 函数。
@tailrec
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
if (args.isStandaloneCluster && args.useRest) {
try {
...
doRunMain()
} catch {
...
}
} else {
doRunMain()
}
返回的四元组分别是:
childArgs :the arguments for the child process 子进程参数childClasspath :a list of classpath entries for the child 子进程classpath列表sparkConf :a map of system properties 系统属性mapchildMainClass :the main class for the child 子进程main方法
着重看一下 childMainClass 。
2.1 部署模式的差异
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
}
if (localJars != null) { childClasspath ++= localJars.split(",") }
}
...
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = REST_CLUSTER_SUBMIT_CLASS
childArgs += (args.primaryResource, args.mainClass)
} else {
childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
...
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
if (args.isPython) {
-
如果部署模式为client ,直接运行我们设置的主类的名字
- 比如
com.chixiaodou.wordcount -
如果是StandaloneCluster ,且使用了rest 风格,
REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName() -
如果是StandaloneCluster , 没有使用rest
STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName() -
如果是Yarn 集群上运行,则为
YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication"
代码后面还有其他的部署模式,不再罗列了。
2.2 runMain方法
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
...
}
} else {
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
}
}
接下来执行doRunMain , 把准备好的参数传递到runMain 方法中。
runMain 方法执行 child class 的main 方法,使用 submit 函数中准备好的四元组。
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sparkConf: SparkConf,
childMainClass: String,
verbose: Boolean): Unit = {
...
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {...}
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
} else {
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
new JavaMainApplication(mainClass)
}
...
try {
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
}
在这一步,生成 Application,调用 Start 方法。最后这里逻辑比较复杂,我们来一点点掰开梳理
- 假设我们以
StandaloneCluster , 没有使用 rest 的方式启动集群,那么mainClass = classOf[ClientApp].getName() ,这个可以看上面 2.1 提到的详解 - 所以这里
app.start(childArgs.toArray, sparkConf) 调用的应该是ClientApp , 跳到Client.scala 文件看看start() 都做了什么
代码执行到这里,Spark-Submit的任务就完成了。我们看一下上面提及的所有调用的 UML 图。接下来我们以 Standalone Client为例,接下去展开分析。
3. Driver - Rpc 环境准备
private[spark] class ClientApp extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val driverArgs = new ClientArguments(args)
if (!conf.contains("spark.rpc.askTimeout")) {
conf.set("spark.rpc.askTimeout", "10s")
}
Logger.getRootLogger.setLevel(driverArgs.logLevel)
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination()
}
}
这段代码具体做了如下几件事:
- 创建RPC通信环境
rpcEnv - 获取 Master 的通信邮箱 RpcEndpointRef,用于和Master通信
- 注册
RpcEndpoint ,调用onStart 方法。让 clientEndpoint 可以和 masterEndpoint 通信
代码中涉及 Spark RPC 通信的知识,RPC是一种进程间的通信方式。如果对这部分知识没有了解的话,会看得有点一头雾水。这部分知识也非常庞大,需要另外开一个文档来详细介绍。
3.1 Spark RPC 设计
这里就先简单介绍一下,让这段代码看起来不那么尴尬。
Spark 3.0 的 RPC 框架是基于 Netty,Netty 使用经典的 Actor 模型 做消息传递。图中画了若干个组建。
RpcEnv :为RpcEndpoint提供了一个处理消息的环境,负责Endpoint的生命周期管理,注册Endpoint,消息间的路由,停止Endpoint等。RpcEndpoint :代表具体的通信节点。例如Master,Worker,DriverEndpoint等,它们都实现该接口。一个RpcEndpoint的生命周期是create -> onStart -> receive* -> onStop 。receive 和receiveAndReply 分别用来接收其他 endpoint 发过来的 send 和 ask 消息。RpcEndpointRef :顾名思义,是一个RpcEndpoint的引用。当我们想向master,worker发送消息的时候,需要先获取到如 master Endpoint的引用。RpcAddress :表示远程 RpcEndpointRef 的地址,包含 Host 和 Port。
Spark是一个分布式的系统 ,想要跟其他的端点通信,需要按照如上的步骤,建立通信环境,通信地址及引用。接下来才可以通过发送消息请求的方式,在其他 Endpoint 执行如 launchDriver ,registerWorker 等动作。
4. 启动 Driver
接上面 ClientApp start 继续往下看。在 RpcEnv 中构建一个 ClientEndpoint 。
private[spark] class ClientApp extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
...
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
...}
}
而我们前面又讲过,一个RpcEndpoint的会调用onStart() 。进入 ClientEndpoint 的 onStart() 进行查看。
4.1 向 Master 发送请求
private class ClientEndpoint(
override val rpcEnv: RpcEnv,
driverArgs: ClientArguments,
masterEndpoints: Seq[RpcEndpointRef],
conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging {
...
override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
....
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
...
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))
...
在onStart() 中匹配到 launch 这个动作,并向 MasterEndpoint 发送 RequestSubmitDriver 消息。
4.2 Master
接下来我们跳到 Master.scala (package org.apache.spark.deploy.master) 去看看 Master 节点如何处理这个请求。
在Spark的Rpc设计中,receive 和 receiveAndReply 分别用来接收 send 和 ask 类型的消息。
Sends a one-way asynchronous message. Fire-and-forget semantics.
def send(message: Any): Unit
Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to receive the reply within the specified timeout.
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] :
- 发送消息出去之后,返回一个 [[Future]],并在规定的timeout时间内接收回复。
5. Master 接收消息并回复
RequestSubmitDriver 在下面这个方法中调用。这里调用了我们上面提到的 ask() 方法。也就是说会触发 Master 的 receiveAndReply() 。
private def asyncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = { for (masterEndpoint <- masterEndpoints) { masterEndpoint.ask[T](message).onComplete { case Success(v) => self.send(v) case Failure(e) => logWarning(s"Error sending messages to master $masterEndpoint", e) }(forwardMessageExecutionContext) } }
Master 的 receiveAndReply() 方法。
private[deploy] class Master( override val rpcEnv: RpcEnv, address: RpcAddress, webUiPort: Int, val securityMgr: SecurityManager, val conf: SparkConf) extends ThreadSafeRpcEndpoint with Logging with LeaderElectable { ....
- 构建 DriverInfo,保存后续 Driver 启动所需的信息
- driver append 到 waiting list,方便后续有 worker 资源了再去创建
- 执行
schedule()
把Driver 信息准备好之后,最后来到 schedule() 方法,该方法中,会把所有的 waitingDrivers 等待被分配的driver一一分配出去,分配到alive的worker中。
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
我们来看 launchDriver() ,又是一个远程调用。
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
driver.state = DriverState.RUNNING
}
这里调用了worker endpoint 的 send 方法,因此要看接收方的 receive() 详情。同理,因为这个是往 Worker 发消息,那就去 Worker.scala 看看 Worker 怎么处理这个消息。
6. Worker 接收消息
终于扒到这个非常关键的信息了。终于要开始构建我们的Driver了。
private[deploy] class Worker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
...
val securityMgr: SecurityManager)
extends ThreadSafeRpcEndpoint with Logging {
...
override def receive: PartialFunction[Any, Unit] = synchronized {
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(...)
drivers(driverId) = driver
driver.start()
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
接下去就是比较细节的启动工作。Driver 启动的详细信息又可以单开一个篇幅来讲了。
至此,我们从找到 Spark 的入口 , spark 提交任务开始,准备参数,执行submit任务,根据特定的部署模式,启动相应的 Spark Application ,又通过了解Spark的远程调用设计,摸索到 Client ,Master ,Worker 之间如何发送启动 Driver 的信息。一步一步找到 launchDriver 的入口。
最后还是画一张 UML 图来更加清晰的展现整体交互。
常驻小尾巴
都看到这里了,不关注一下嘛 👇 👇 👇
我的免费星球,欢迎来看我的日常碎碎念
参考资料
-
陈凯 https://zhuanlan.zhihu.com/p/84506391 -
简书 https://www.jianshu.com/p/4d4964c505fe?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation -
尚硅谷 P131 源码阅读 https://www.bilibili.com/video/BV11A411L7CK?p=131&spm_id_from=pageDriver -
Solve https://cloud.tencent.com/developer/article/1574348
|