Inbox 结构:
private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
extends Logging {
inbox =>
@GuardedBy("this")
protected val messages = new java.util.LinkedList[InboxMessage]()
@GuardedBy("this")
private var stopped = false
@GuardedBy("this")
private var enableConcurrent = false
@GuardedBy("this")
private var numActiveThreads = 0
inbox.synchronized {
messages.add(OnStart)
}
......
}
补充说明:在Rpc通信中,每个消息都有生命周期:如下源码注释说讲: constructor -> onStart -> receive* -> onStop
private[spark] trait RpcEndpoint {......}
Exexutor 的OnStart消息就是 CoarseGrainedExecutorBackend对象的Onstart
override def onStart(): Unit = {
logInfo("Connecting to driver: " + driverUrl)
try {
_resources = parseOrFindResources(resourcesFileOpt)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
extractAttributes, _resources, resourceProfile.id))
}(ThreadUtils.sameThread).onComplete {
case Success(_) =>
self.send(RegisteredExecutor)
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
Executor发送消息之后,Driver会接受消息,然而,Driver是一个线程,那么Driver通过环境变量SparkContext接受请求;如下图所示: Driver处理注册Executor 并响应:
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
attributes, resources, resourceProfileId) =>
if (executorDataMap.contains(executorId)) {
context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))
} else if (scheduler.nodeBlacklist.contains(hostname) ||
isBlacklisted(executorId, hostname)) {
logInfo(s"Rejecting $executorId as it has been blacklisted.")
context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId"))
} else {
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val resourcesInfo = resources.map{ case (k, v) =>
(v.name,
new ExecutorResourceInfo(v.name, v.addresses,
taskResourceNumParts.getOrElse(v.name, 1)))
}
val data = new ExecutorData(executorRef, executorAddress, hostname,
0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes,
resourcesInfo, resourceProfileId)
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
context.reply(true)
}
当Driver发送给Executor注册成功的消息后,Executor接受消息,并创建Executor对象:
private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv,
resourcesFileOpt: Option[String],
resourceProfile: ResourceProfile)
extends IsolatedRpcEndpoint with ExecutorBackend with Logging {
......
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
driver.get.send(LaunchedExecutor(executorId))
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources(taskDesc.taskId) = taskDesc.resources
executor.launchTask(this, taskDesc)
}
......
}
}
到此步:Spark计算环境创建成功:NodeManager ,ResourceManager ,ApplicationManager, Driver,Executor均已创建成功;
最后附上提交流程图:
|