worker向master注册
-
worker在启动之后,就会向master进行注册 -
对于worker状态为DEAD的,过滤掉。在compeleRecover()中,master在一定时间期限已经完成了恢复,但是发现其中的worker为UNKNOWN的情况下,对worker进行remove,将worker的状态设置为DEAD,如果过了很长时间worker又莫名其妙的向master进行注册的情况下,直接过滤掉。 -
对于worker状态为UNKNOWN,master会将旧的worker信息给清理掉,替换成新的worker信息。比如说master刚启动的时候,会向worker发送新的地址的时候,master会将该worker状态设置为unknown,worker向master返回注册信息的时候,master会将旧的worker信息给清理掉,替换成新的worker信息 -
将worker加入内存缓存中(HashMap),用持久化引擎将worker信息持久化,可能是文件系统,可能是zookeeper -
调用schedule()方法进行调度
进去源码看看
找到register相关的代码
private def registerWorker(worker: WorkerInfo): Boolean = {
workers.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
workers -= w
}
val workerAddress = worker.endpoint.address
if (addressToWorker.contains(workerAddress)) {
val oldWorker = addressToWorker(workerAddress)
if (oldWorker.state == WorkerState.UNKNOWN) {
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}
workers += worker
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
if (reverseProxy) {
webUi.addProxyTargets(worker.id, worker.webUiAddress)
}
true
}
driver向master注册
- 用spark-submit提交sparkApplication的时候,dirver首先就会向master进行注册,将driver信息放入到内存缓存中,也就是hashmap中
- 加入等待调度队列,也就是ArrayBuffer
- 用持久化引擎将driver信息持久化,可能是文件系统,可能是zookeeper
- 调用schedule()方法进行调度
case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
schedule()
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
private def createDriver(desc: DriverDescription): DriverInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
new DriverInfo(now, newDriverId(date), desc, date)
}
application向master进行注册(registerApplication()方法)
- Driver启动好之后,会执行我们的application代码,执行sparkContext的初始化,底层的SparkDeploySchedulerBackend,会通过AppClient内部的线程,ClientEndPoint发送RegisterAppliction,到master进行Application进行注册。
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
masterRef.send(RegisterApplication(appDescription, self))
} catch {
case ie: InterruptedException =>
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
})
}
}
在Master中找到RegisterApplication这个东西
case RegisterApplication(description, driver) =>
if (state == RecoveryState.STANDBY) {
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, driver)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
schedule()
}
- 将application信息放入到内存缓存中,也就是hashmap中
- 将application加入等待的调度队列,也就是ArrayBuffer
- 用持久化引擎将application信息持久化,可能是文件系统,可能是zookeeper
进到createApplication
private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):
ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
val appId = newApplicationId(date)
new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
}
看看registerApplication
private def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.address
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
if (reverseProxy) {
webUi.addProxyTargets(app.id, app.desc.appUiUrl)
}
}
|