TaskManagerRunner 启动源码分析
taskmanger
TaskManager 是 Flink 的从节点,它负责 Flink 中集群的worker节点上 slot 资源的管理以及具体 task 的执行。TaskManager 上的基本资源单位是 slot,一个作业的 task 最终会部署在一个 TaskManager 的 slot上运行,TaskManager 会负责维护本地的 slot 资源列表,并来与 Flink Master节点 和作业的主节点 JobMaster 通信。
public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {
从注释我们也能直到:该类是taskmanager无论在yarn模式下还是在standalone模式下的执行入口,它会构建相关组件的网络,io管理,内存管理,RPC服务,HA服务等。
入口
启动该类直接调用其main方法:
public static void main(String[] args) throws Exception {
EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
if (maxOpenFileHandles != -1L) {
LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
} else {
LOG.info("Cannot determine the maximum number of open file descriptors");
}
runTaskManagerSecurely(args);
}
进入runTaskManagerSecurely(args, ResourceID.generate());
public static void runTaskManagerSecurely(String[] args) {
try {
Configuration configuration = loadConfiguration(args);
runTaskManagerSecurely(configuration);
} catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("TaskManager initialization failed.", strippedThrowable);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
}
进入runTaskManagerSecurely(configuration);:
public static void runTaskManagerSecurely(Configuration configuration) throws Exception {
replaceGracefulExitWithHaltIfConfigured(configuration);
final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
FileSystem.initialize(configuration, pluginManager);
SecurityUtils.install(new SecurityConfiguration(configuration));
SecurityUtils.getInstalledContext().runSecured(() -> {
runTaskManager(configuration, pluginManager);
return null;
});
}
runTaskManager(configuration, pluginManager);
public static void runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {
final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);
taskManagerRunner.start();
}
这里进入了核心部分:
TaskManagerRunner::createTaskExecutorService
public static TaskExecutorService createTaskExecutorService(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
FatalErrorHandler fatalErrorHandler) throws Exception {
final TaskExecutor taskExecutor = startTaskManager(
configuration,
resourceID,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
localCommunicationOnly,
externalResourceInfoProvider,
fatalErrorHandler);
return TaskExecutorToServiceAdapter.createFor(taskExecutor);
}
这里最重要的是创建startTaskManager,后面的return只是包装类一个TaskExecutorService对象返回,所以进入startTaskManager方法:
public static TaskExecutor startTaskManager(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
FatalErrorHandler fatalErrorHandler) throws Exception {
String externalAddress = rpcService.getAddress();
final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
TaskManagerServicesConfiguration taskManagerServicesConfiguration =
TaskManagerServicesConfiguration.fromConfiguration(
configuration,
resourceID,
externalAddress,
localCommunicationOnly,
taskExecutorResourceSpec);
Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistry,
externalAddress,
resourceID,
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
final ExecutorService ioExecutor = Executors.newFixedThreadPool(
taskManagerServicesConfiguration.getNumIoThreads(),
new ExecutorThreadFactory("flink-taskexecutor-io"));
TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
blobCacheService.getPermanentBlobService(),
taskManagerMetricGroup.f1,
ioExecutor,
fatalErrorHandler);
MetricUtils.instantiateFlinkMemoryMetricGroup(
taskManagerMetricGroup.f1,
taskManagerServices.getTaskSlotTable(),
taskManagerServices::getManagedMemorySize);
TaskManagerConfiguration taskManagerConfiguration =
TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec, externalAddress);
String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();
return new TaskExecutor(
rpcService,
taskManagerConfiguration,
highAvailabilityServices,
taskManagerServices,
externalResourceInfoProvider,
heartbeatServices,
taskManagerMetricGroup.f0,
metricQueryServiceAddress,
blobCacheService,
fatalErrorHandler,
new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
createBackPressureSampleService(configuration, rpcService.getScheduledExecutor()));
}
这里又有两个重要方法:TaskManagerServices.fromConfiguration和new TaskExecutor,先看第一个:
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
PermanentBlobService permanentBlobService,
MetricGroup taskManagerMetricGroup,
ExecutorService ioExecutor,
FatalErrorHandler fatalErrorHandler) throws Exception {
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
taskManagerServicesConfiguration,
taskEventDispatcher,
taskManagerMetricGroup,
ioExecutor);
final int listeningDataPort = shuffleEnvironment.start();
final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
kvStateService.start();
final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new UnresolvedTaskManagerLocation(
taskManagerServicesConfiguration.getResourceID(),
taskManagerServicesConfiguration.getExternalAddress(),
taskManagerServicesConfiguration.getExternalDataPort() > 0 ?
taskManagerServicesConfiguration.getExternalDataPort() :
listeningDataPort);
final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(
taskManagerServicesConfiguration.getNumberOfSlots(),
taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
taskManagerServicesConfiguration.getPageSize(),
ioExecutor);
final JobTable jobTable = DefaultJobTable.create();
final JobLeaderService jobLeaderService = new DefaultJobLeaderService(unresolvedTaskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
}
final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
stateRootDirectoryFiles,
ioExecutor);
final boolean failOnJvmMetaspaceOomError =
taskManagerServicesConfiguration.getConfiguration().getBoolean(CoreOptions.FAIL_ON_USER_CLASS_LOADING_METASPACE_OOM);
final boolean checkClassLoaderLeak =
taskManagerServicesConfiguration.getConfiguration().getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER);
final LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(
permanentBlobService,
BlobLibraryCacheManager.defaultClassLoaderFactory(
taskManagerServicesConfiguration.getClassLoaderResolveOrder(),
taskManagerServicesConfiguration.getAlwaysParentFirstLoaderPatterns(),
failOnJvmMetaspaceOomError ? fatalErrorHandler : null,
checkClassLoaderLeak));
return new TaskManagerServices(
unresolvedTaskManagerLocation,
taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),
ioManager,
shuffleEnvironment,
kvStateService,
broadcastVariableManager,
taskSlotTable,
jobTable,
jobLeaderService,
taskStateManager,
taskEventDispatcher,
ioExecutor,
libraryCacheManager);
}
这里的代码非常重要: (1)ShuffleEnvironment:这个类封装了netty server和netty client,实现类是NettyShuffleEnvironment,flink的task之间数据流的传递全部使用的是netty的channel来传递,而每两个taskmanager之间都会启动唯一的nettyserver-nettyclient这样的cs结构,如果taskmanager有两个slot:slot1和slot2,与另外一个taskmanager上的两个slot:slot3和slot4通信,那么会复用这个netty连接通道来传输数据。
(2)KvStateService:每个taskExecutor上会有一个KvStateService服务,该服务用于task来注册状态,包装了一个kvStateServer
(3)初始化BroadcastVariableManager,管理广播变量的组件,广播变量如配置等通过一个source广播过来,taskmanager接收进来后放在自己的内存里,就是通过该组件实现。
(4)JobLeaderService:获取jobleader服务,通过前面的选举机制可以知道,leader会把信息写道zk,这里从zk读取就行了。
(5)最后,返回new TaskManagerServices,TaskManagerServices的构造并没有太多逻辑,都是对变量的赋值操作。
这几个具体组件不再深入分析,只需要知道createTaskExecutorService这个方法主要启动了这些组件。
new TaskManagerRunner
TaskManagerRunner::createTaskExecutorService执行之后,来到new TaskManagerRunner方法:
public TaskManagerRunner(
Configuration configuration,
PluginManager pluginManager,
TaskExecutorServiceFactory taskExecutorServiceFactory) throws Exception {
this.configuration = checkNotNull(configuration);
timeout = AkkaUtils.getTimeoutAsTime(configuration);
this.executor = java.util.concurrent.Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("taskmanager-future"));
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
rpcService = createRpcService(configuration, highAvailabilityServices);
this.resourceId = getTaskManagerResourceID(configuration, rpcService.getAddress(), rpcService.getPort());
HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
metricRegistry = new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(configuration),
ReporterSetup.fromConfiguration(configuration, pluginManager));
final RpcService metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService(configuration, rpcService.getAddress());
metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId);
blobCacheService = new BlobCacheService(
configuration, highAvailabilityServices.createBlobStore(), null
);
final ExternalResourceInfoProvider externalResourceInfoProvider =
ExternalResourceUtils.createStaticExternalResourceInfoProvider(
ExternalResourceUtils.getExternalResourceAmountMap(configuration),
ExternalResourceUtils.externalResourceDriversFromConfig(configuration, pluginManager));
taskExecutorService = taskExecutorServiceFactory.createTaskExecutor(
this.configuration,
this.resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
false,
externalResourceInfoProvider,
this);
this.terminationFuture = new CompletableFuture<>();
this.shutdown = false;
handleUnexpectedTaskExecutorServiceTermination();
MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture);
}
最后的taskExecutorServiceFactory.createTaskExecutor实际调用的就是上面第一步分析的TaskManagerRunner::createTaskExecutorService这句代码
final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);
在创建完成 taskManagerRunner后就进入 taskManagerRunner的start方法了:
taskManagerRunner.start();
这里的start方法经过层层调用: taskExecutorService.start(); -> taskExecutor.start(); -> rpcServer.start(); -> rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
@Override
public void start() {
rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
}
最终调用的是RpcEndpoint的tell方法!因此根据flink RPC框架的套路,必然会调用到TaskExecutor的onStart方法,到这里:
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
...
@Override
public void onStart() throws Exception {
try {
startTaskExecutorServices();
} catch (Throwable t) {
final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
startRegistrationTimeout();
}
两件事:继续执行start和注册超时服务,其中超时服务默认五分钟的超时时间。 看startTaskExecutorServices();:
private void startTaskExecutorServices() throws Exception {
try {
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}
首先,对Resourcemanager地址做一个监听,如果RM发生变化则重新连接,因为要对其进行注册,心跳,资源汇报等工作。 然后,TaskManager管理着持有的slot,在TaskManager中这些slot的管理就是taskSlotTable来管理的。 第三,监听jobmanager服务。 最后,启动一个文件缓存服务。
到了这里taskmanager就启动完成了,这里只针对taskmanager做一个抽象组件启动流程的大概描述,深入每个组件的细节需要后面挨个分析,最后做个总结:
总结
开始从main方法进入启动流程 runTaskManagerSecurely(configuration, resourceID); ->runTaskManagerSecurely(configuration, resourceID); ->runTaskManager(configuration, resourceID, pluginManager); ->taskManagerRunner = new TaskManagerRunner(…) { 初始化了一个 TaskManagerServices,并启动一堆服务 初始化 TaskExecutor,TaskExecutor 它是一个 RpcEndpoint,启动会调用它的onStart方法 } ->taskManagerRunner = new TaskManagerRunner(…); { 启动highAvailabilityServices 创建rpcService 创建heartbeatServices 创建blobCacheService 创建taskExecutorService } ->TaskManagerRunner::createTaskExecutorService ->启动 TaskManagerRunner,然后跳转到 TaskExecutor 中的 onStart() 方法 taskManagerRunner.start(); taskExecutor.start();
到此为止,TaskManager启动完毕!
|