??????点击这里查看 Flink 1.13 源码解析 目录汇总
点击查看相关章节Flink 1.13 源码解析——启动脚本解析
点击查看相关章节Flink 1.13 源码解析前导——Akka通信模型
点击查看相关章节Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动
点击查看相关章节Flink 1.13 源码解析——TaskManager启动流程概览
点击查看相关章节Flink 1.13 源码解析——TaskManager启动流程 之 与ResourceManager的注册交互
目录
一、前言
二、TaskExecutor的构建
2.1、TaskManager基础服务的初始化
2.1.1、BlobCacheService的初始化
2.2、TaskExecutor的构造过程
2.2.3、TaskSlotTable详解
2.2.3、TaskExecutor的初始化
总结:
一、前言
????????在之前的章节中我们分析了Flink主节点(JobManager)的启动流程,在接下来这几章里,我们来从源码入手分析一下Flink从节点的启动流程,TaskManager的启动流程中,有很多步骤和主节点的启动是相同的,他没有主节点中那么多的组件,但是启动的步骤要比主节点繁杂很多,在这一章我们首先来了解TaskManager的初始化流程。
二、TaskExecutor的构建
????????在之前Flink启动脚本分析章节(点此查看 Flink 1.13 源码解析——启动脚本解析)中我们得知,standalone模式下Flink从节点的启动类为org.apache.flink.runtime.taskexecutor.TaskManagerRunner,所以我们直接来看这个类的main方法:
// --------------------------------------------------------------------------------------------
// Static entry point
// --------------------------------------------------------------------------------------------
public static void main(String[] args) throws Exception {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
if (maxOpenFileHandles != -1L) {
LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
} else {
LOG.info("Cannot determine the maximum number of open file descriptors");
}
// TODO 启动
runTaskManagerProcessSecurely(args);
}
在main方法中前几行代码做了一些参数、配置校验的工作,我们直接来看runTaskManagerProcessSecurely方法:
public static void runTaskManagerProcessSecurely(String[] args) {
Configuration configuration = null;
try {
// TODO 解析args和flink-conf.yaml文件得到配置信息
configuration = loadConfiguration(args);
} catch (FlinkParseException fpe) {
LOG.error("Could not load the configuration.", fpe);
System.exit(FAILURE_EXIT_CODE);
}
// TODO 启动
runTaskManagerProcessSecurely(checkNotNull(configuration));
}
该方法依然是我们熟悉的从命令以及flink-conf.yaml文件解析配置,然后将解析后的配置传递给runTaskManagerProcessSecurely方法,我们点进来继续看:
public static void runTaskManagerProcessSecurely(Configuration configuration) {
FlinkSecurityManager.setFromConfiguration(configuration);
// TODO 启动插件管理器
final PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configuration);
FileSystem.initialize(configuration, pluginManager);
int exitCode;
Throwable throwable = null;
try {
SecurityUtils.install(new SecurityConfiguration(configuration));
exitCode =
SecurityUtils.getInstalledContext()
// TODO 启动TaskManager
.runSecured(() -> runTaskManager(configuration, pluginManager));
} catch (Throwable t) {
throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
exitCode = FAILURE_EXIT_CODE;
}
if (throwable != null) {
LOG.error("Terminating TaskManagerRunner with exit code {}.", exitCode, throwable);
} else {
LOG.info("Terminating TaskManagerRunner with exit code {}.", exitCode);
}
System.exit(exitCode);
}
在该方法里,启动了一个插件管理器,并且执行了一个runTaskManager的方法,通过名字我们不难看出,离TaskManager的构建越来越近了。我们点进runTaskManager方法:
public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
throws Exception {
final TaskManagerRunner taskManagerRunner;
try {
// TODO 构建一个TaskManagerRunner
taskManagerRunner =
new TaskManagerRunner(
configuration,
pluginManager,
// TODO 真正创建TaskExecutor的地方
TaskManagerRunner::createTaskExecutorService);
// TODO 启动TaskManagerRunner
taskManagerRunner.start();
} catch (Exception exception) {
throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
}
try {
return taskManagerRunner.getTerminationFuture().get().getExitCode();
} catch (Throwable t) {
throw new FlinkException(
"Unexpected failure during runtime of TaskManagerRunner.",
ExceptionUtils.stripExecutionException(t));
}
}
在这个方法里做了两件事:
1、构建了一个TaskManagerRunner
2、启动TaskManagerRunner
实际上,TaskManager启动的所有准备工作,都是在这个TaskManagerRunner中完成的。我们继续进来这个TaskManagerRunner的构造方法来看:
2.1、TaskManager基础服务的初始化
public TaskManagerRunner(
Configuration configuration,
PluginManager pluginManager,
TaskExecutorServiceFactory taskExecutorServiceFactory)
throws Exception {
this.configuration = checkNotNull(configuration);
timeout = AkkaUtils.getTimeoutAsTime(configuration);
// TODO TaskManager 内部线程池,用来处理从节点内部各个组件的Io的线程池
// TODO 线程池大小为当前节点的cpu核心数
this.executor =
java.util.concurrent.Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("taskmanager-future"));
// TODO 高可用服务
highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
// TODO 1.12 新功能 JMX服务,提供监控信息
JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
// TODO 启动RPC服务,内部为Akka模型的ActorSystem
rpcService = createRpcService(configuration, highAvailabilityServices);
// TODO 为TaskManager生成了一个ResourceID
this.resourceId =
getTaskManagerResourceID(
configuration, rpcService.getAddress(), rpcService.getPort());
// TODO 初始化心跳服务,主要是初始化心跳间隔和心跳超时参数配置
HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
metricRegistry =
new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(configuration),
ReporterSetup.fromConfiguration(configuration, pluginManager));
final RpcService metricQueryServiceRpcService =
MetricUtils.startRemoteMetricsRpcService(configuration, rpcService.getAddress());
metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId);
// TODO 在主节点启动的时候,事实上已经启动了有个BolbServer,
// TODO 从节点启动的时候,会启动一个BlobCacheService,做文件缓存的服务
blobCacheService =
new BlobCacheService(
configuration, highAvailabilityServices.createBlobStore(), null);
final ExternalResourceInfoProvider externalResourceInfoProvider =
ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(
configuration, pluginManager);
// TODO 创建得到一个TaskExecutorService,内部封装了TaskExecutor,同时TaskExecutor的构建也在内部完成
taskExecutorService =
taskExecutorServiceFactory.createTaskExecutor(
this.configuration,
this.resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
false,
externalResourceInfoProvider,
this);
this.terminationFuture = new CompletableFuture<>();
this.shutdown = false;
handleUnexpectedTaskExecutorServiceTermination();
MemoryLogger.startIfConfigured(
LOG, configuration, terminationFuture.thenAccept(ignored -> {}));
}
不难看出,这里所做的工作和JobManager启动时一样,是一些基础服务的构建和启动,在这里一共做了以下这些工作:
1、初始化了一个TaskManager内部线程池,用来处理从节点内部各个组件的IO,该线程池的大小为当前节点CPU的核心数。
2、构建了一个高可用服务。
3、初始化JMX服务,用于提供监控信息。
4、启动RPC服务,内部为Akka模型的ActorSystem(点此查看Flink 1.13 源码解析前导——Akka通信模型)
4、为TaskManager生成了一个ResourceID。
5、初始化心跳服务,根据配置文件获取心跳间隔时间参数以及心跳超时参数
6、初始化metric服务
7、启动BlobCacheService服务,做文件缓存的服务。
8、构建了一个TaskExecutorService,内部封装了TaskExecutor。
2.1.1、BlobCacheService的初始化
在以上这些基础环境的初始化中,我们首先来看下BlobCacheService服务的初始化,点进BlobCacheService的构造方法:
public BlobCacheService(
final Configuration blobClientConfig,
final BlobView blobView,
@Nullable final InetSocketAddress serverAddress)
throws IOException {
/*
TODO 初始化了两个文件服务:
1. 持久化Blob缓存服务
2. 临时Blob缓存服务
在这两个服务的内部都会在启动的时候启动一个定时服务
就是把过期的某个Job的对应资源都删除掉
*/
this(
// TODO 持久化
new PermanentBlobCache(blobClientConfig, blobView, serverAddress),
// TODO 缓存
new TransientBlobCache(blobClientConfig, serverAddress));
}
在这个构造方法里,主要做了两件事:
1、初始化了一个持久化Blob缓存服务
2、初始化了一个临时Blob缓存服务
在这两个服务的内部,都会在启动的时候启动一个定时服务,就是将过期的某个Job的对应资源都删除掉。
我们以持久化Blob缓存服务为例,点进PermanentBlobCache对象的构造方法
public PermanentBlobCache(
final Configuration blobClientConfig,
final BlobView blobView,
@Nullable final InetSocketAddress serverAddress)
throws IOException {
super(
blobClientConfig,
blobView,
LoggerFactory.getLogger(PermanentBlobCache.class),
serverAddress);
// Initializing the clean up task
this.cleanupTimer = new Timer(true);
// TODO 配置过期时间为1小时
this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
// TODO 启动定时任务,每1小时清理一次
this.cleanupTimer.schedule(
new PermanentBlobCleanupTask(), cleanupInterval, cleanupInterval);
}
可以看到,在下面首先配置了一个过期时间,为1小时,接着启动了一个定时服务,每1小时执行一次PermanentBlobCleanupTask,我们继续来看PermanentBlobCleanupTask的run方法
class PermanentBlobCleanupTask extends TimerTask {
/** Cleans up BLOBs which are not referenced anymore. */
@Override
public void run() {
// TODO 通过引用计数的方式获取所有Job引用的文件
synchronized (jobRefCounters) {
Iterator<Map.Entry<JobID, RefCount>> entryIter =
jobRefCounters.entrySet().iterator();
final long currentTimeMillis = System.currentTimeMillis();
// TODO 遍历所有文件
while (entryIter.hasNext()) {
Map.Entry<JobID, RefCount> entry = entryIter.next();
RefCount ref = entry.getValue();
// TODO 判断是否过期
if (ref.references <= 0
&& ref.keepUntil > 0
&& currentTimeMillis >= ref.keepUntil) {
JobID jobId = entry.getKey();
final File localFile =
new File(
BlobUtils.getStorageLocationPath(
storageDir.getAbsolutePath(), jobId));
/*
* NOTE: normally it is not required to acquire the write lock to delete the job's
* storage directory since there should be no one accessing it with the ref
* counter being 0 - acquire it just in case, to always be on the safe side
*/
readWriteLock.writeLock().lock();
boolean success = false;
try {
// TODO 删除该资源文件夹
FileUtils.deleteDirectory(localFile);
success = true;
} catch (Throwable t) {
log.warn(
"Failed to locally delete job directory "
+ localFile.getAbsolutePath(),
t);
} finally {
readWriteLock.writeLock().unlock();
}
// let's only remove this directory from cleanup if the cleanup was
// successful
// (does not need the write lock)
if (success) {
entryIter.remove();
}
}
}
}
}
}
我们可以看到有以下操作:
1、首先在方法里通过引用计数的方式,获取所有job引用的资源文件。
2、遍历这些文件,并判断是否过期。
3、如果过期则删除该资源文件夹。
在临时缓存blob服务中也是一样的工作:
public TransientBlobCache(
final Configuration blobClientConfig, @Nullable final InetSocketAddress serverAddress)
throws IOException {
super(
blobClientConfig,
new VoidBlobStore(),
LoggerFactory.getLogger(TransientBlobCache.class),
serverAddress);
// Initializing the clean up task
this.cleanupTimer = new Timer(true);
// TODO 1小时
this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
this.cleanupTimer.schedule(
// TODO 定时服务
new TransientBlobCleanupTask(
blobExpiryTimes, readWriteLock.writeLock(), storageDir, log),
cleanupInterval,
cleanupInterval);
}
首先获取超时时间为1小时,接着启动了一个定时服务,每1小时清理一次。
接下来到了重要环节,TaskExecutor的初始化
2.2、TaskExecutor的构造过程
我们点进taskExecutorServiceFactory.createTaskExecutor方法里:
public static TaskExecutorService createTaskExecutorService(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
FatalErrorHandler fatalErrorHandler)
throws Exception {
// TODO 创建TaskExecutor
final TaskExecutor taskExecutor =
startTaskManager(
configuration,
resourceID,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
localCommunicationOnly,
externalResourceInfoProvider,
fatalErrorHandler);
/*
TODO 封装了一下TaskExecutor
TaskExecutor是TaskExecutorToServiceAdapter的成员变量
TaskExecutorToServiceAdapter是TaskManagerRunner的成员变量
*/
return TaskExecutorToServiceAdapter.createFor(taskExecutor);
}
可以看到在这里真正初始化了一个TaskExecutor,并将TaskExecutor封装了一下,我们首先来看TaskExecutor的初始化,我们进入startTaskManager方法:
在该方法内部依然是初始化了一些基础服务:
首先是初始化资源配置,获取硬件资源配置:
// TODO 初始化资源配置,获取硬件资源配置
final TaskExecutorResourceSpec taskExecutorResourceSpec =
TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
接着获取配置:
// TODO 获取配置(args和flink-conf)
TaskManagerServicesConfiguration taskManagerServicesConfiguration =
TaskManagerServicesConfiguration.fromConfiguration(
configuration,
resourceID,
externalAddress,
localCommunicationOnly,
taskExecutorResourceSpec);
在这里TaskManagerService初始化了一些核心服务:
// TODO 初始化了一些核心服务
TaskManagerServices taskManagerServices =
TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
blobCacheService.getPermanentBlobService(),
taskManagerMetricGroup.f1,
ioExecutor,
fatalErrorHandler);
我们进入fromConfiguration方法:
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
PermanentBlobService permanentBlobService,
MetricGroup taskManagerMetricGroup,
ExecutorService ioExecutor,
FatalErrorHandler fatalErrorHandler)
throws Exception {
// pre-start checks
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
// TODO 状态机 事件分发器
final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
// start the I/O manager, it will create some temp directories.
final IOManager ioManager =
new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
// TODO 作业执行期间shuffle相关操作工作,后面讲作业执行时再细聊
final ShuffleEnvironment<?, ?> shuffleEnvironment =
createShuffleEnvironment(
taskManagerServicesConfiguration,
taskEventDispatcher,
taskManagerMetricGroup,
ioExecutor);
final int listeningDataPort = shuffleEnvironment.start();
// TODO state管理服务
final KvStateService kvStateService =
KvStateService.fromConfiguration(taskManagerServicesConfiguration);
kvStateService.start();
final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
new UnresolvedTaskManagerLocation(
taskManagerServicesConfiguration.getResourceID(),
taskManagerServicesConfiguration.getExternalAddress(),
// we expose the task manager location with the listening port
// iff the external data port is not explicitly defined
taskManagerServicesConfiguration.getExternalDataPort() > 0
? taskManagerServicesConfiguration.getExternalDataPort()
: listeningDataPort);
// TODO 广播变量管理服务
final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
// TODO TaskExecutor内部,最重要的一个成员变量
// TODO 一张存放TaskSlot的表
final TaskSlotTable<Task> taskSlotTable =
createTaskSlotTable(
taskManagerServicesConfiguration.getNumberOfSlots(),
taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
taskManagerServicesConfiguration.getPageSize(),
ioExecutor);
final JobTable jobTable = DefaultJobTable.create();
// TODO 监控主节点Leader地址
final JobLeaderService jobLeaderService =
new DefaultJobLeaderService(
unresolvedTaskManagerLocation,
taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
。。。 。。。
return new TaskManagerServices(
unresolvedTaskManagerLocation,
taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),
ioManager,
shuffleEnvironment,
kvStateService,
broadcastVariableManager,
taskSlotTable,
jobTable,
jobLeaderService,
taskStateManager,
taskEventDispatcher,
ioExecutor,
libraryCacheManager);
}
在这里,初始化了事件分发起、IOManager、ShuffleEnvironment、state管理服务、广播变量历服务、TaskSlotJobManager的Leader地址监控服务等等,这里我们着重看一下TableSlot表,其他的核心服务我们会在后续Job的执行流程、Slot分配流程中详细描述,这里就先不聊了。
2.2.3、TaskSlotTable详解
首先在TaskSlotTable,是TaskExecutor中非常非常重要的一个成员变量,它是真正帮助TaskExecutor完成一切和Slot有关操作的组件,在ResourceManager中,也有一个类似的组件,就是在注册的两个定时任务中的其中一个:slot定时任务SlotManager。(点击查看Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动)
在JobMaster申请资源时,是ResourceManager中的SlotManager来完成资源分配的,在完成资源分配后,SlotManager会向TaskExecutor发送RPC请求,然后TaskExecutor再向ResourceManager去做汇报表示已完成分配。我们来看TaskSlotTable的实现类,其中有几个十分重要的变量:
/** The list of all task slots. */
// TODO 所有的slot
// TODO 在TaskManager启动时会将自身的slot汇报给ResourceManager,并将slot封装为taskSlot
private final Map<Integer, TaskSlot<T>> taskSlots;
/** Mapping from allocation id to task slot. */
// TODO 所有已被分配的slot,维护着分配ID和TaskSlot之间的关系
private final Map<AllocationID, TaskSlot<T>> allocatedSlots;
其中taskSlots存放着所有的当前节点的slot,在当前节点的TaskManager启动时,会将自身的slot汇报给ResourceManager,并将slot封装为taskSlot。
而allocatedSlots存放这所有已被分配的slot的信息,维护着分配ID和TaskSlot之间的关系。
2.2.3、TaskExecutor的初始化
我们继续回到TaskManagerRunner.startTaskManager方法,看最后一步,初始化TaskExecutor,我们点进TaskExecutor的构造方法,首先看到TaskExecutor继承自RPCEndpoint,那么我们就知道,当TaskExecutor初始化完成之后回去调用自身 的onStart方法(点击查看Flink 1.13 源码解析前导——Akka通信模型),此刻还在初始化之中,所以我们先继续往下看
public TaskExecutor(
RpcService rpcService,
TaskManagerConfiguration taskManagerConfiguration,
HighAvailabilityServices haServices,
TaskManagerServices taskExecutorServices,
ExternalResourceInfoProvider externalResourceInfoProvider,
HeartbeatServices heartbeatServices,
TaskManagerMetricGroup taskManagerMetricGroup,
@Nullable String metricQueryServiceAddress,
BlobCacheService blobCacheService,
FatalErrorHandler fatalErrorHandler,
TaskExecutorPartitionTracker partitionTracker) {
// TaskExecutor为RPCEndpoint的子类,这个构造器调用的RPCEndpoint的构造器
super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
checkArgument(
taskManagerConfiguration.getNumberSlots() > 0,
"The number of slots has to be larger than 0.");
this.taskManagerConfiguration = checkNotNull(taskManagerConfiguration);
this.taskExecutorServices = checkNotNull(taskExecutorServices);
this.haServices = checkNotNull(haServices);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.partitionTracker = partitionTracker;
this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
this.blobCacheService = checkNotNull(blobCacheService);
this.metricQueryServiceAddress = metricQueryServiceAddress;
this.externalResourceInfoProvider = checkNotNull(externalResourceInfoProvider);
this.libraryCacheManager = taskExecutorServices.getLibraryCacheManager();
this.taskSlotTable = taskExecutorServices.getTaskSlotTable();
this.jobTable = taskExecutorServices.getJobTable();
this.jobLeaderService = taskExecutorServices.getJobLeaderService();
this.unresolvedTaskManagerLocation =
taskExecutorServices.getUnresolvedTaskManagerLocation();
this.localStateStoresManager = taskExecutorServices.getTaskManagerStateStore();
this.shuffleEnvironment = taskExecutorServices.getShuffleEnvironment();
this.kvStateService = taskExecutorServices.getKvStateService();
this.ioExecutor = taskExecutorServices.getIOExecutor();
this.resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
this.hardwareDescription =
HardwareDescription.extractFromSystem(taskExecutorServices.getManagedMemorySize());
this.memoryConfiguration =
TaskExecutorMemoryConfiguration.create(taskManagerConfiguration.getConfiguration());
this.resourceManagerAddress = null;
this.resourceManagerConnection = null;
this.currentRegistrationTimeoutId = null;
final ResourceID resourceId =
taskExecutorServices.getUnresolvedTaskManagerLocation().getResourceID();
// TODO 初始化了两个心跳管理器
// TODO TaskExecutor维持和JobMaster的心跳
this.jobManagerHeartbeatManager =
createJobManagerHeartbeatManager(heartbeatServices, resourceId);
// TODO TaskExecutor维持和ResourceManager的心跳
this.resourceManagerHeartbeatManager =
createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
ExecutorThreadFactory sampleThreadFactory =
new ExecutorThreadFactory.Builder()
.setPoolName("flink-thread-info-sampler")
.build();
ScheduledExecutorService sampleExecutor =
Executors.newSingleThreadScheduledExecutor(sampleThreadFactory);
this.threadInfoSampleService = new ThreadInfoSampleService(sampleExecutor);
}
在前半部分进行的一些变量的赋值,在下面初始化了两个心跳管理器,分别为:
1、TaskExecutor维持和JobMaster的心跳的管理器
2、TaskExecutor维持和ResourceManager心跳的管理器
在心跳管理器内部初始化了一个HeartbeatManagerImpl对象,还记得我们在ResourceManager中初始化的心跳管理器为HeartbeatManagerSenderImpl,根据名字能看出这是一个心跳请求发送器,也是在ResourceManager那一章节中我们讲到,在HeartbeatManagerSenderImpl中会有一个定时任务,每10秒钟遍历一次所有的已注册的心跳目标对象,并向每个对象发送心跳请求(点击查看Flink 1.13 源码解析——JobManager启动流程之ResourceManager启动)
public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log) {
/*
TODO
主节点中的心跳管理器为HeartbeatManagerSenderImpl 心跳请求发送器 client
在HeartbeatManagerSenderImpl内部构建了一个定时服务
每10秒 向所有的心跳目标对象,发送心跳请求
从节点(当前)为HeartbeatManagerImpl 心跳请求处理器 Server
*/
return new HeartbeatManagerImpl<>(
heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log);
}
到此为止,我们的TaskExecutor的正式初始化完成。
总结:
我们在这里总结一下TaskExecutor的初始化流程:
1、首先构建了一个TaskManagerRunner,用于完成TaskManager启动的准备工作,再完成准备工作后,通过调用TaskManagerRunner的start方法来启动。
2、在TaskManagerRunner内部初始化了一个TaskManagerService对象,用来初始化TaskExecutor所需要的基础服务。
3、在TaskManagerService内部,首先会初始化一些基础服务,如TaskEvent Dispatcher、IO管理器、shuffleEnvironment、state管理器、TaskSlotTable等等。
4、在完成基础服务的初始化之后,开始初始化TaskExecutor,首先初始化了两个心跳管理期,分别来维护和JobMaster、ResourceManager的心跳。因为TaskExecutor继承了RpcEndpoint,所以具有生命周期方法onStart。
5、TaskExecutor初始化完成。
在下一章里我们来看已经初始化完成的TaskExecutor的启动流程。
下一章: Flink 1.13 源码解析——TaskManager启动流程概览
|