IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink源码系列(ResourceManager启动、JobMaster启动)-第八期 -> 正文阅读

[大数据]Flink源码系列(ResourceManager启动、JobMaster启动)-第八期

上一期指路

上一期

上上一期和上一期主要讲Flink内部的组件rm、jm、Dispatcher介绍,但其实rm和jm的启动还没有分析,只是在上上一期分析了rm的创建以及上一期分析的jm的创建,所以这一期主要是一个善后的内容,主要用于补充rm和jm的真正启动。

从之前的我们分析到的这个函数,即DefaultDispatcherResourceManagerComponentFactory中的create开始

1.DefaultDispatcherResourceManagerComponentFactory#create

resourceManager.start();

rpcServer.start()

rpc服务启动。即发消息通知底层的 AkkaRpcActor 切换为 START 状态,那么接下来就会执行ResourceManager中的onStart方法

2.ResourceManager#onStart->ResourceManager#startResourceManagerServices

	private void startResourceManagerServices() throws Exception {
		try {
			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();

			initialize();

			leaderElectionService.start(this);
			jobLeaderIdService.start(new JobLeaderIdActionsImpl());

			registerTaskExecutorMetrics();
		} catch (Exception e) {
			handleStartResourceManagerServicesException(e);
		}
	}

3.ActiveResourceManager#initialize->AbstractResourceManagerDriver#initialize->YarnResourceManagerDriver#initializeInternal

	protected void initializeInternal() throws Exception {
		final YarnContainerEventHandler yarnContainerEventHandler = new YarnContainerEventHandler();
		try {
			resourceManagerClient = yarnResourceManagerClientFactory.createResourceManagerClient(
				yarnHeartbeatIntervalMillis,
				yarnContainerEventHandler);
			resourceManagerClient.init(yarnConfig);
			resourceManagerClient.start();

			final RegisterApplicationMasterResponse registerApplicationMasterResponse = registerApplicationMaster();
			getContainersFromPreviousAttempts(registerApplicationMasterResponse);
			taskExecutorProcessSpecContainerResourcePriorityAdapter =
				new TaskExecutorProcessSpecContainerResourcePriorityAdapter(
					registerApplicationMasterResponse.getMaximumResourceCapability(),
					ExternalResourceUtils.getExternalResources(flinkConfig, YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
		} catch (Exception e) {
			throw new ResourceManagerException("Could not start resource manager client.", e);
		}

		nodeManagerClient = yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
		nodeManagerClient.init(yarnConfig);
		nodeManagerClient.start();
	}

?

4.StandaloneLeaderElectionService#start->ResourceManager#grantLeadership->ResourceManager#tryAcceptLeadership->ResourceManager#startServicesOnLeadership

	private void startServicesOnLeadership() {
		startHeartbeatServices();

		slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());

		onLeadership();
	}

rm启动总览

这一部分涉及的源码总览如下:

那么关于jm的启动,我们需要从上一期的如下开始:

1.JobManagerRunnerImpl#start->StandaloneLeaderElectionService#start->JobManagerRunnerImpl#grantLeadership->JobManagerRunnerImpl#verifyJobSchedulingStatusAndStartJobManager->JobManagerRunnerImpl#startJobMaster->JobMaster#start

	public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
		// make sure we receive RPC and async calls
		start();

		return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
	}

2.JobMaster#startJobExecution

	private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {

		validateRunsInMainThread();

		checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");

		if (Objects.equals(getFencingToken(), newJobMasterId)) {
			log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);

			return Acknowledge.get();
		}

		setNewFencingToken(newJobMasterId);

		startJobMasterServices();

		log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);

		resetAndStartScheduler();

		return Acknowledge.get();
	}

?

?3.JobMaster#startJobMasterServices

	private void startJobMasterServices() throws Exception {
		startHeartbeatServices();

		// start the slot pool make sure the slot pool now accepts messages for this leader
		slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());

		//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
		// try to reconnect to previously known leader
		reconnectToResourceManager(new FlinkException("Starting JobMaster component."));

		// job is ready to go, try to establish connection with resource manager
		//   - activate leader retrieval for the resource manager
		//   - on notification of the leader, the connection will be established and
		//     the slot pool will start requesting slots
		resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
	}

?jm启动总览

这一部分涉及到的源码流程图如下:

下一期讲YarnTaskExecutorRunner的启动,我们下期见!

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-09 12:46:25  更:2022-05-09 12:48:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 6:34:46-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码