Flink主节点启动
ClusterEntryPoint:集群启动入口
Flink主从架构:主节点JobManager + 从节点 : TaskManager
JobManager是Flink集群的主节点,主要包括三大组件:
1. ResourceManager
Flink的集群资源管理器,只有一个,负责Slot的管理和申请等工作,也负责心跳服务
2. Dispatcher
负责接收用户提交的JobGraph,然后启动一个JobMaster,JobMaster类似于Yarn集群中的AppMaster角色。内部有一个持久服务:JobGraphStore,用来存储提交到JobManager的Job的信息,也用作主节点宕机之后做Job恢复用
3. WebMonitorEndpoint Rest服务器 = netty服务器
里面维护了很多很多的 Handler,也还会启动一个 Netty 服务端,用来接收外部的 rest 请求。
如果客户端通过 flink run 的方式来提交一个 job 到 flink 集群,最终,
是由 WebMonitorEndpoint 来接收处理,经过路由解析处理之后决定使用哪一个 Handler 来执行处理
例如:submitJob ===> JobSubmitHandler
Router 路由器 绑定了一大堆 Handler
? 当 client 通过 rest 方式提交一个 job 到集群运行的时候(客户端会把该 Job 构建成一个 JobGragh 对象)是由 WebMonitorEndpoint 来接收处 理的,WebMonitorEndpoint 内部会通过 Router 进行路由解析找到对应的 Handler 来执行处理,处理完毕之后转交给 Dispatcher,Dispatcher 负责 拉起 JobMaster 来负责这个 Job 的 Slot 资源申请 和 Task 的部署执行,关于 Job 执行过程中,所需要的 Slot 资源,由 JobMaster 向 ResourceManager 申请。
? JobManger的启动主类为:ClusterEntrypoint
因为多种组合都会调用ClusterEntrypoint.runClusterEntrypoint类来启动
ClusterEntryPoint.runClusterEntrypoint(entrypoint)
ClusterEntryPoint.startCluster();
runCluster(configuration,pluginManager)
initializeServices(configuration,pluginManager)
createDispatcherResourceManagerComponentFactory(configuration);
clusterComponent = dispatcherResourceManagerComponentFactory.create(...)
第一步中的InitializeServices()中的代码:
commonRpcService =
AkkaRpcServiceUtils.createRemoteRpcService(
configuration,
configuration.getString(JobManagerOptions.ADDRESS),
getRPCPortRange(configuration),
configuration.getString(JobManagerOptions.BIND_HOST),
configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));
JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
ioExecutor =
Executors.newFixedThreadPool(
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("cluster-io"));
haServices = createHaServices(configuration, ioExecutor);
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
heartbeatServices = createHeartbeatServices(configuration);
metricRegistry = createMetricRegistry(configuration, pluginManager);
final RpcService metricQueryServiceRpcService =
MetricUtils.startRemoteMetricsRpcService(
configuration, commonRpcService.getAddress());
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
final String hostname = RpcUtils.getHostname(commonRpcService);
processMetricGroup =
MetricUtils.instantiateProcessMetricGroup(
metricRegistry,
hostname,
ConfigurationUtils.getSystemResourceMetricsProbingInterval(
configuration));
executionGraphInfoStore =
createSerializableExecutionGraphStore(
configuration, commonRpcService.getScheduledExecutor());
第二步 createDispatcherResourceManagerComponentFactory(configuration) 中负责初始化了很多组件的工厂实例:
1、DispatcherRunnerFactory,默认实现:DefaultDispatcherRunnerFactory,生产 DefaultDispatcherRunner
2、ResourceManagerFactory,默认实现:StandaloneResourceManagerFactory,生产 StandaloneResourceManager
3、RestEndpointFactory,默认实现:SessionRestEndpointFactory,生产 DispatcherRestEndpoint
第三步 dispatcherResourceManagerComponentFactory.create(…) 中主要去创建 三个重要的组件:
1、DispatcherRunner,实现是:DefaultDispatcherRunner
2、ResourceManager,实现是:StandaloneResourceManager
3、WebMonitorEndpoint,实现是:DispatcherRestEndpoint
启动流程如下图:
WebMonitorEndpoint初始化和启动
入口:
DispatcherResourceManagerComponentFactory.create(....)
执行代码如下:
webMonitorEndpoint =
restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
webMonitorEndpoint.start();
start(){
synchronized (lock) {
Preconditions.checkState(
state == State.CREATED, "The RestServerEndpoint cannot be restarted.");
log.info("Starting rest endpoint.");
final Router router = new Router();
final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
handlers = initializeHandlers(restAddressFuture);
Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
checkAllEndpointsAndHandlersAreUnique(handlers);
handlers.forEach(handler -> registerHandler(router, handler, log));
ChannelInitializer<SocketChannel> initializer =
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
RouterHandler handler = new RouterHandler(router, responseHeaders);
if (isHttpsEnabled()) {
ch.pipeline()
.addLast(
"ssl",
new RedirectingSslHandler(
restAddress,
restAddressFuture,
sslHandlerFactory));
}
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(
new FlinkHttpObjectAggregator(
maxContentLength, responseHeaders))
.addLast(new ChunkedWriteHandler())
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));
}
};
NioEventLoopGroup bossGroup =
new NioEventLoopGroup(
1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
NioEventLoopGroup workerGroup =
new NioEventLoopGroup(
0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));
bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
Iterator<Integer> portsIterator;
try {
portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
} catch (IllegalConfigurationException e) {
throw e;
} catch (Exception e) {
throw new IllegalArgumentException(
"Invalid port range definition: " + restBindPortRange);
}
int chosenPort = 0;
while (portsIterator.hasNext()) {
try {
chosenPort = portsIterator.next();
final ChannelFuture channel;
if (restBindAddress == null) {
channel = bootstrap.bind(chosenPort);
} else {
channel = bootstrap.bind(restBindAddress, chosenPort);
}
serverChannel = channel.syncUninterruptibly().channel();
break;
} catch (final Exception e) {
if (!(e instanceof org.jboss.netty.channel.ChannelException
|| e instanceof java.net.BindException)) {
throw e;
}
}
}
if (serverChannel == null) {
throw new BindException(
"Could not start rest endpoint on any port in port range "
+ restBindPortRange);
}
log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort);
final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
final String advertisedAddress;
if (bindAddress.getAddress().isAnyLocalAddress()) {
advertisedAddress = this.restAddress;
} else {
advertisedAddress = bindAddress.getAddress().getHostAddress();
}
final int port = bindAddress.getPort();
log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);
restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();
restAddressFuture.complete(restBaseUrl);
state = State.RUNNING;
startInternal();
}
}
启动工作流程:
- 初始化一个Router以及很多的Handler,将handler进行去重排序后,分别注册到Router中
- 启动一个Netty的服务端
- 启动内部服务:进行竞选!并经主节点的信息放到ZooKeeper中
- 启动一个ExecutionGraph的清理任务
ResourceManger初始化和启动
核心入口:
DispatcherResourceManagerComponentFactory.create(....)
resourceManager =
resourceManagerFactory.createResourceManager(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
heartbeatServices,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
metricRegistry,
hostname,
ioExecutor);
resourceManager.start();
ResourceManager.onStart(){
log.info("Starting the resource manager.");
startResourceManagerServices();
}
startResourceManagerServices() throws Exception {
try {
leaderElectionService =
highAvailabilityServices.getResourceManagerLeaderElectionService();
initialize();
leaderElectionService.start(this);
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
registerMetrics();
} catch (Exception e) {
handleStartResourceManagerServicesException(e);
}
}
grandLeadership(){
final CompletableFuture<Boolean> acceptLeadershipFuture =
clearStateFuture.thenComposeAsync(
(ignored) -> tryAcceptLeadership(newLeaderSessionID),
getUnfencedMainThreadExecutor());
final CompletableFuture<Void> confirmationFuture =
acceptLeadershipFuture.thenAcceptAsync(
(acceptLeadership) -> {
if (acceptLeadership) {
leaderElectionService.confirmLeadership(
newLeaderSessionID, getAddress());
}
},
ioExecutor);
confirmationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
onFatalError(ExceptionUtils.stripCompletionException(throwable));
}
});
}
tryAcceptLeadership(newLeaderSessionID){
startServicesOnLeadership();
}
startServicesOnLeadership() {
startHeartbeatServices();
slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
onLeadership();
}
startHeartbeatServices() {
taskManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
jobManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new JobManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()){
this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
resourceActions = Preconditions.checkNotNull(newResourceActions);
started = true;
taskManagerTimeoutsAndRedundancyCheck =
scheduledExecutor.scheduleWithFixedDelay(
() ->
mainThreadExecutor.execute(
() -> checkTaskManagerTimeoutsAndRedundancy()),
0L,
taskManagerTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
slotRequestTimeoutCheck =
scheduledExecutor.scheduleWithFixedDelay(
() -> mainThreadExecutor.execute(() -> checkSlotRequestTimeouts()),
0L,
slotRequestTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
}
启动流程分析:
1、ResourceManager 是 RpcEndpoint 的子类,所以在构建 ResourceManager 对象完成之后,肯定会调用 start() 方法来启动这个 RpcEndpoint, 然后就调准到它的 onStart() 方法执行。 2、ResourceManager 是 LeaderContender 的子类,会通过 LeaderElectionService 参加竞选,如果竞选成功,则会回调 isLeader() 方法。 3、启动 ResourceManager 需要的一些服务: 两个心跳服务 ResourceManager 和 TaskExecutor 之间的心跳 ResourceManager 和 JobMaster 之间的心跳 两个定时服务 checkTaskManagerTimeoutsAndRedundancy() 检查 TaskExecutor 的超时 checkSlotRequestTimeouts() 检查 SlotRequest 超时
Dispatcher初始化和启动
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler,
new HaServicesJobGraphStoreFactory(highAvailabilityServices),
ioExecutor, rpcService, partialDispatcherServices
);
dispatchper = createDispatcher();
dispatchper.start();
nodeChanged();
leaderElectionEventHandler.onLeaderInformationChange();
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
isLeader();
leaderElectionEventHandler.onGrantLeadership();
leaderContender.grantLeadership(issuedLeaderSessionID);
startNewDispatcherLeaderProcess(leaderSessionID)
stopDispatcherLeaderProcess();
dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
newDispatcherLeaderProcess::start
startInternal()
onStart();
startServices();
recoverJobsAsync()
启动流程:
- 启动JobGraphStore服务
- 从JobGraphStrore恢复执行Job,要启动Disptcher
r.grantLeadership(issuedLeaderSessionID); startNewDispatcherLeaderProcess(leaderSessionID) stopDispatcherLeaderProcess(); dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID); newDispatcherLeaderProcess::start startInternal() onStart(); // 启动 JobGraphStore startServices(); // 恢复 Job recoverJobsAsync()
启动流程:
1. 启动JobGraphStore服务
2. 从JobGraphStrore恢复执行Job,要启动Disptcher
|