IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink源码系列(创建ResourceManager[Flink内部,非yarn中rm]、创建并启动Dispatcher)-第六期 -> 正文阅读

[大数据]Flink源码系列(创建ResourceManager[Flink内部,非yarn中rm]、创建并启动Dispatcher)-第六期

上一期指路

第五期

上一期已经分析到了执行YarnJobClusterEntrypoint中的main方法,我们深入其runClusterEntrypoint继续分析。

1.ClusterEntrypoint#runClusterEntrypoint->

ClusterEntrypoint#startCluster->ClusterEntrypoint#runCluster

	private void runCluster(Configuration configuration, PluginManager pluginManager) throws Exception {
		synchronized (lock) {

			initializeServices(configuration, pluginManager);

			// write host information into configuration
			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

			final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

			clusterComponent = dispatcherResourceManagerComponentFactory.create(
				configuration,
				ioExecutor,
				commonRpcService,
				haServices,
				blobServer,
				heartbeatServices,
				metricRegistry,
				archivedExecutionGraphStore,
				new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
				this);

			clusterComponent.getShutDownFuture().whenComplete(
				(ApplicationStatus applicationStatus, Throwable throwable) -> {
					if (throwable != null) {
						shutDownAsync(
							ApplicationStatus.UNKNOWN,
							ExceptionUtils.stringifyException(throwable),
							false);
					} else {
						// This is the general shutdown path. If a separate more specific shutdown was
						// already triggered, this will do nothing
						shutDownAsync(
							applicationStatus,
							null,
							true);
					}
				});
		}
	}

2.DefaultDispatcherResourceManagerComponentFactory#create

	public DispatcherResourceManagerComponent create(
			Configuration configuration,
			Executor ioExecutor,
			RpcService rpcService,
			HighAvailabilityServices highAvailabilityServices,
			BlobServer blobServer,
			HeartbeatServices heartbeatServices,
			MetricRegistry metricRegistry,
			ArchivedExecutionGraphStore archivedExecutionGraphStore,
			MetricQueryServiceRetriever metricQueryServiceRetriever,
			FatalErrorHandler fatalErrorHandler) throws Exception {

		LeaderRetrievalService dispatcherLeaderRetrievalService = null;
		LeaderRetrievalService resourceManagerRetrievalService = null;
		WebMonitorEndpoint<?> webMonitorEndpoint = null;
		ResourceManager<?> resourceManager = null;
		DispatcherRunner dispatcherRunner = null;

		try {
			dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();

			resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();

			final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
				rpcService,
				DispatcherGateway.class,
				DispatcherId::fromUuid,
				new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));

			final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
				rpcService,
				ResourceManagerGateway.class,
				ResourceManagerId::fromUuid,
				new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));

			final ScheduledExecutorService executor = WebMonitorEndpoint.createExecutorService(
				configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
				configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
				"DispatcherRestEndpoint");

			final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
			final MetricFetcher metricFetcher = updateInterval == 0
				? VoidMetricFetcher.INSTANCE
				: MetricFetcherImpl.fromConfiguration(
					configuration,
					metricQueryServiceRetriever,
					dispatcherGatewayRetriever,
					executor);

			webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
				configuration,
				dispatcherGatewayRetriever,
				resourceManagerGatewayRetriever,
				blobServer,
				executor,
				metricFetcher,
				highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
				fatalErrorHandler);

			log.debug("Starting Dispatcher REST endpoint.");
			webMonitorEndpoint.start();

			final String hostname = RpcUtils.getHostname(rpcService);

			resourceManager = resourceManagerFactory.createResourceManager(
				configuration,
				ResourceID.generate(),
				rpcService,
				highAvailabilityServices,
				heartbeatServices,
				fatalErrorHandler,
				new ClusterInformation(hostname, blobServer.getPort()),
				webMonitorEndpoint.getRestBaseUrl(),
				metricRegistry,
				hostname,
				ioExecutor);

			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint, ioExecutor);

			final PartialDispatcherServices partialDispatcherServices = new PartialDispatcherServices(
				configuration,
				highAvailabilityServices,
				resourceManagerGatewayRetriever,
				blobServer,
				heartbeatServices,
				() -> MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, hostname),
				archivedExecutionGraphStore,
				fatalErrorHandler,
				historyServerArchivist,
				metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
				ioExecutor);

			log.debug("Starting Dispatcher.");
			dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
				highAvailabilityServices.getDispatcherLeaderElectionService(),
				fatalErrorHandler,
				new HaServicesJobGraphStoreFactory(highAvailabilityServices),
				ioExecutor,
				rpcService,
				partialDispatcherServices);

			log.debug("Starting ResourceManager.");
			resourceManager.start();

			resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
			dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);

			return new DispatcherResourceManagerComponent(
				dispatcherRunner,
				DefaultResourceManagerService.createFor(resourceManager),
				dispatcherLeaderRetrievalService,
				resourceManagerRetrievalService,
				webMonitorEndpoint,
				fatalErrorHandler);

		} catch (Exception exception) {
			// clean up all started components
			if (dispatcherLeaderRetrievalService != null) {
				try {
					dispatcherLeaderRetrievalService.stop();
				} catch (Exception e) {
					exception = ExceptionUtils.firstOrSuppressed(e, exception);
				}
			}

			if (resourceManagerRetrievalService != null) {
				try {
					resourceManagerRetrievalService.stop();
				} catch (Exception e) {
					exception = ExceptionUtils.firstOrSuppressed(e, exception);
				}
			}

			final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);

			if (webMonitorEndpoint != null) {
				terminationFutures.add(webMonitorEndpoint.closeAsync());
			}

			if (resourceManager != null) {
				terminationFutures.add(resourceManager.closeAsync());
			}

			if (dispatcherRunner != null) {
				terminationFutures.add(dispatcherRunner.closeAsync());
			}

			final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeAll(terminationFutures);

			try {
				terminationFuture.get();
			} catch (Exception e) {
				exception = ExceptionUtils.firstOrSuppressed(e, exception);
			}

			throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception);
		}
	}

①highAvailabilityServices.getDispatcherLeaderRetriever()

highAvailabilityServices.getResourceManagerLeaderRetriever()

高可用相关,对应的leader寻回器

②new RpcGatewayRetriever

网关相关,对应组件的网关寻回器

③WebMonitorEndpoint.createExecutorService

构建executor

④getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL)

web ui使用指标获取器的更新间隔,默认为10000L

⑤restEndpointFactory.createRestEndpointwebMonitorEndpoint.start()

web监控终端启动

⑥resourceManagerFactory.createResourceManager

创建rm(flink内部,非yarn中rm)

⑦HistoryServerArchivist.createHistoryServerArchivist

创建history server相关

⑧dispatcherRunnerFactory.createDispatcherRunner

创建并启动Dispatcher ,其中dispatcher会创建和启动JobManager(JobMaster)

⑨resourceManager.start()

启动ResourceManager

⑩resourceManagerRetrievalService.start

dispatcherLeaderRetrievalService.start

启动组件寻回服务

3.ResourceManagerFactory#createResourceManager->ActiveResourceManagerFactorycreateResourceManager->YarnResourceManagerFactory#createResourceManagerDriver

	protected ResourceManagerDriver<YarnWorkerNode> createResourceManagerDriver(Configuration configuration, String webInterfaceUrl, String rpcAddress) {
		final YarnResourceManagerDriverConfiguration yarnResourceManagerDriverConfiguration = new YarnResourceManagerDriverConfiguration(System.getenv(), rpcAddress, webInterfaceUrl);

		return new YarnResourceManagerDriver(
			configuration,
			yarnResourceManagerDriverConfiguration,
			DefaultYarnResourceManagerClientFactory.getInstance(),
			DefaultYarnNodeManagerClientFactory.getInstance());
	}

其中new YarnResourceManagerDriver

?其实yarn部署模式下的ResourceManagerDriver的实现

点击ResourceManagerDriver进入

其作用是负责向特定的外部rsourceManager请求和释放资源。?

4.一系列跳转,如下

DefaultDispatcherRunnerFactory#createDispatcherRunner->DefaultDispatcherRunner#create->DispatcherRunnerLeaderElectionLifecycleManager#createFor->DispatcherRunnerLeaderElectionLifecycleManager的构造方法->StandaloneLeaderElectionService#start->DefaultDispatcherRunner#grantLeadership->DefaultDispatcherRunner#startNewDispatcherLeaderProcess

	private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
		stopDispatcherLeaderProcess();

		dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);

		final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
		FutureUtils.assertNoException(
			previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
	}

5.AbstractDispatcherLeaderProcess#start->AbstractDispatcherLeaderProcess#startInternal->JobDispatcherLeaderProcess#onStart->DefaultDispatcherGatewayServiceFactory#create

	public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
			DispatcherId fencingToken,
			Collection<JobGraph> recoveredJobs,
			JobGraphWriter jobGraphWriter) {

		final Dispatcher dispatcher;
		try {
			dispatcher = dispatcherFactory.createDispatcher(
				rpcService,
				fencingToken,
				recoveredJobs,
				(dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(),
				PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter));
		} catch (Exception e) {
			throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
		}

		dispatcher.start();

		return DefaultDispatcherGatewayService.from(dispatcher);
	}

我们点击dispatcher.start(),发现是rpcServer.start();

rpc服务启动。即发消息通知底层的 AkkaRpcActor 切换为 START 状态,那么直接看Dispatcher的onStart方法。

?6.Dispatcher#onStart

	public void onStart() throws Exception {
		try {
			startDispatcherServices();
		} catch (Throwable t) {
			final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), t);
			onFatalError(exception);
			throw exception;
		}

		startRecoveredJobs();
		this.dispatcherBootstrap = this.dispatcherBootstrapFactory.create(
				getSelfGateway(DispatcherGateway.class),
				this.getRpcService().getScheduledExecutor() ,
				this::onFatalError);
	}

①startDispatcherServices

启动Dispatcher服务

②startRecoveredJobs

开始处理恢复之前上传到HDFS的作业图

?这一期分析到了Dispatcher的创建与启动,后面几期会涉及到JobMaster的创建与启动。

总览

这一期涉及的源码流程图如下:

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-06 11:06:45  更:2022-05-06 11:08:16 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 7:52:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码