上一期指路
上一期
上上一期和上一期主要讲Flink内部的组件rm、jm、Dispatcher介绍,但其实rm和jm的启动还没有分析,只是在上上一期分析了rm的创建以及上一期分析的jm的创建,所以这一期主要是一个善后的内容,主要用于补充rm和jm的真正启动。
从之前的我们分析到的这个函数,即DefaultDispatcherResourceManagerComponentFactory中的create开始
1.DefaultDispatcherResourceManagerComponentFactory#create
resourceManager.start();
rpcServer.start()
rpc服务启动。即发消息通知底层的 AkkaRpcActor 切换为 START 状态,那么接下来就会执行ResourceManager中的onStart方法
2.ResourceManager#onStart->ResourceManager#startResourceManagerServices
private void startResourceManagerServices() throws Exception {
try {
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
initialize();
leaderElectionService.start(this);
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
registerTaskExecutorMetrics();
} catch (Exception e) {
handleStartResourceManagerServicesException(e);
}
}
3.ActiveResourceManager#initialize->AbstractResourceManagerDriver#initialize->YarnResourceManagerDriver#initializeInternal
protected void initializeInternal() throws Exception {
final YarnContainerEventHandler yarnContainerEventHandler = new YarnContainerEventHandler();
try {
resourceManagerClient = yarnResourceManagerClientFactory.createResourceManagerClient(
yarnHeartbeatIntervalMillis,
yarnContainerEventHandler);
resourceManagerClient.init(yarnConfig);
resourceManagerClient.start();
final RegisterApplicationMasterResponse registerApplicationMasterResponse = registerApplicationMaster();
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
taskExecutorProcessSpecContainerResourcePriorityAdapter =
new TaskExecutorProcessSpecContainerResourcePriorityAdapter(
registerApplicationMasterResponse.getMaximumResourceCapability(),
ExternalResourceUtils.getExternalResources(flinkConfig, YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
} catch (Exception e) {
throw new ResourceManagerException("Could not start resource manager client.", e);
}
nodeManagerClient = yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
nodeManagerClient.init(yarnConfig);
nodeManagerClient.start();
}
?
4.StandaloneLeaderElectionService#start->ResourceManager#grantLeadership->ResourceManager#tryAcceptLeadership->ResourceManager#startServicesOnLeadership
private void startServicesOnLeadership() {
startHeartbeatServices();
slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
onLeadership();
}
rm启动总览
这一部分涉及的源码总览如下:
那么关于jm的启动,我们需要从上一期的如下开始:
1.JobManagerRunnerImpl#start->StandaloneLeaderElectionService#start->JobManagerRunnerImpl#grantLeadership->JobManagerRunnerImpl#verifyJobSchedulingStatusAndStartJobManager->JobManagerRunnerImpl#startJobMaster->JobMaster#start
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
// make sure we receive RPC and async calls
start();
return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
}
2.JobMaster#startJobExecution
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
validateRunsInMainThread();
checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
if (Objects.equals(getFencingToken(), newJobMasterId)) {
log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
return Acknowledge.get();
}
setNewFencingToken(newJobMasterId);
startJobMasterServices();
log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
resetAndStartScheduler();
return Acknowledge.get();
}
?
?3.JobMaster#startJobMasterServices
private void startJobMasterServices() throws Exception {
startHeartbeatServices();
// start the slot pool make sure the slot pool now accepts messages for this leader
slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
// try to reconnect to previously known leader
reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
// the slot pool will start requesting slots
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
}
?jm启动总览
这一部分涉及到的源码流程图如下:
下一期讲YarnTaskExecutorRunner的启动,我们下期见!
|