上一期指路
第五期
上一期已经分析到了执行YarnJobClusterEntrypoint中的main方法,我们深入其runClusterEntrypoint继续分析。
1.ClusterEntrypoint#runClusterEntrypoint->
ClusterEntrypoint#startCluster->ClusterEntrypoint#runCluster
private void runCluster(Configuration configuration, PluginManager pluginManager) throws Exception {
synchronized (lock) {
initializeServices(configuration, pluginManager);
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
clusterComponent.getShutDownFuture().whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
null,
true);
}
});
}
}
2.DefaultDispatcherResourceManagerComponentFactory#create
public DispatcherResourceManagerComponent create(
Configuration configuration,
Executor ioExecutor,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler) throws Exception {
LeaderRetrievalService dispatcherLeaderRetrievalService = null;
LeaderRetrievalService resourceManagerRetrievalService = null;
WebMonitorEndpoint<?> webMonitorEndpoint = null;
ResourceManager<?> resourceManager = null;
DispatcherRunner dispatcherRunner = null;
try {
dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));
final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));
final ScheduledExecutorService executor = WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher = updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
final String hostname = RpcUtils.getHostname(rpcService);
resourceManager = resourceManagerFactory.createResourceManager(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
heartbeatServices,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
metricRegistry,
hostname,
ioExecutor);
final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint, ioExecutor);
final PartialDispatcherServices partialDispatcherServices = new PartialDispatcherServices(
configuration,
highAvailabilityServices,
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
() -> MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, hostname),
archivedExecutionGraphStore,
fatalErrorHandler,
historyServerArchivist,
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
ioExecutor);
log.debug("Starting Dispatcher.");
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobGraphStoreFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
log.debug("Starting ResourceManager.");
resourceManager.start();
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
return new DispatcherResourceManagerComponent(
dispatcherRunner,
DefaultResourceManagerService.createFor(resourceManager),
dispatcherLeaderRetrievalService,
resourceManagerRetrievalService,
webMonitorEndpoint,
fatalErrorHandler);
} catch (Exception exception) {
// clean up all started components
if (dispatcherLeaderRetrievalService != null) {
try {
dispatcherLeaderRetrievalService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}
if (resourceManagerRetrievalService != null) {
try {
resourceManagerRetrievalService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
}
final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
if (webMonitorEndpoint != null) {
terminationFutures.add(webMonitorEndpoint.closeAsync());
}
if (resourceManager != null) {
terminationFutures.add(resourceManager.closeAsync());
}
if (dispatcherRunner != null) {
terminationFutures.add(dispatcherRunner.closeAsync());
}
final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeAll(terminationFutures);
try {
terminationFuture.get();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception);
}
}
①highAvailabilityServices.getDispatcherLeaderRetriever()
highAvailabilityServices.getResourceManagerLeaderRetriever()
高可用相关,对应的leader寻回器
②new RpcGatewayRetriever
网关相关,对应组件的网关寻回器
③WebMonitorEndpoint.createExecutorService
构建executor
④getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL)
web ui使用指标获取器的更新间隔,默认为10000L
⑤restEndpointFactory.createRestEndpointwebMonitorEndpoint.start()
web监控终端启动
⑥resourceManagerFactory.createResourceManager
创建rm(flink内部,非yarn中rm)
⑦HistoryServerArchivist.createHistoryServerArchivist
创建history server相关
⑧dispatcherRunnerFactory.createDispatcherRunner
创建并启动Dispatcher ,其中dispatcher会创建和启动JobManager(JobMaster)
⑨resourceManager.start()
启动ResourceManager
⑩resourceManagerRetrievalService.start
dispatcherLeaderRetrievalService.start
启动组件寻回服务
3.ResourceManagerFactory#createResourceManager->ActiveResourceManagerFactorycreateResourceManager->YarnResourceManagerFactory#createResourceManagerDriver
protected ResourceManagerDriver<YarnWorkerNode> createResourceManagerDriver(Configuration configuration, String webInterfaceUrl, String rpcAddress) {
final YarnResourceManagerDriverConfiguration yarnResourceManagerDriverConfiguration = new YarnResourceManagerDriverConfiguration(System.getenv(), rpcAddress, webInterfaceUrl);
return new YarnResourceManagerDriver(
configuration,
yarnResourceManagerDriverConfiguration,
DefaultYarnResourceManagerClientFactory.getInstance(),
DefaultYarnNodeManagerClientFactory.getInstance());
}
其中new YarnResourceManagerDriver
?其实yarn部署模式下的ResourceManagerDriver的实现
点击ResourceManagerDriver进入
其作用是负责向特定的外部rsourceManager请求和释放资源。?
4.一系列跳转,如下
DefaultDispatcherRunnerFactory#createDispatcherRunner->DefaultDispatcherRunner#create->DispatcherRunnerLeaderElectionLifecycleManager#createFor->DispatcherRunnerLeaderElectionLifecycleManager的构造方法->StandaloneLeaderElectionService#start->DefaultDispatcherRunner#grantLeadership->DefaultDispatcherRunner#startNewDispatcherLeaderProcess
private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
stopDispatcherLeaderProcess();
dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
FutureUtils.assertNoException(
previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
}
5.AbstractDispatcherLeaderProcess#start->AbstractDispatcherLeaderProcess#startInternal->JobDispatcherLeaderProcess#onStart->DefaultDispatcherGatewayServiceFactory#create
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter) {
final Dispatcher dispatcher;
try {
dispatcher = dispatcherFactory.createDispatcher(
rpcService,
fencingToken,
recoveredJobs,
(dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(),
PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
dispatcher.start();
return DefaultDispatcherGatewayService.from(dispatcher);
}
我们点击dispatcher.start(),发现是rpcServer.start();
rpc服务启动。即发消息通知底层的 AkkaRpcActor 切换为 START 状态,那么直接看Dispatcher的onStart方法。
?6.Dispatcher#onStart
public void onStart() throws Exception {
try {
startDispatcherServices();
} catch (Throwable t) {
final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
startRecoveredJobs();
this.dispatcherBootstrap = this.dispatcherBootstrapFactory.create(
getSelfGateway(DispatcherGateway.class),
this.getRpcService().getScheduledExecutor() ,
this::onFatalError);
}
①startDispatcherServices
启动Dispatcher服务
②startRecoveredJobs
开始处理恢复之前上传到HDFS的作业图
?这一期分析到了Dispatcher的创建与启动,后面几期会涉及到JobMaster的创建与启动。
总览
这一期涉及的源码流程图如下:
|