1、Flink主节点TaskManager启动分析:
????????TaskManager是Flink的worker节点,它负责Flink中本机slot资源的管理以及具体task的执行。 ????????TaskManager上的基本资源单位是slot,一个作业的task最终会部署在一个TM的slot上运行,TM会负责维护本地的slot资源列表,并来与FlinkMaster和JobManager通信。 ????????根据前面的启动分析:TaskManager的启动主类:TaskManagerRunner。
2、源码分析
代码执行的大致流程如下:
TaskManagerRunner.main() ?? ?runTaskManagerSecurely(args, ResourceID.generate()); ?? ??? ?# 加载配置 ?? ??? ?Configuration configuration = loadConfiguration(args); ?? ??? ?# 启动 TaskManager ?? ??? ?runTaskManagerSecurely(configuration, resourceID); ?? ??? ??? ?# 启动 TaskManager ?? ??? ??? ?runTaskManager(configuration, resourceID, pluginManager); ?? ??? ??? ??? ?# 构建 TaskManagerRunner 实例 ?? ??? ??? ??? ?taskManagerRunner = new TaskManagerRunner(...); ?? ??? ??? ??? ??? ?# 初始化一个线程池 ?? ??? ??? ??? ??? ?this.executor = Executors.newScheduledThreadPool(....) ?? ??? ??? ??? ??? ?# 获取高可用模式 ?? ??? ??? ??? ??? ?highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(...) ?? ??? ??? ??? ??? ?# 创建 RPC 服务 ?? ??? ??? ??? ??? ?rpcService = createRpcService(configuration, ?? ??? ??? ??? ??? ?highAvailabilityServices); ?? ??? ??? ??? ??? ?# 创建心跳服务 ?? ??? ??? ??? ??? ?heartbeatServices = HeartbeatServices.fromConfiguration(conf); ?? ??? ??? ??? ??? ?# 创建 BlobCacheService ?? ??? ??? ??? ??? ?blobCacheService = new BlobCacheService(....) ?? ??? ??? ??? ??? ?# 创建 TaskManager ?? ??? ??? ??? ??? ?taskManager = startTaskManager(.....) ?? ??? ??? ??? ??? ?# 初始化 TaskManagerServices ?? ??? ??? ??? ??? ?taskManagerServices = TaskManagerServices.fromConfiguration(...) ?? ??? ??? ??? ??? ?# 初始化 TaskEventDispatcher ?? ??? ??? ??? ??? ?taskEventDispatcher = new TaskEventDispatcher(); ?? ??? ??? ??? ??? ?# 初始化 IOManagerASync ?? ??? ??? ??? ??? ?ioManager = new IOManagerAsync(...) ?? ??? ??? ??? ??? ?# 初始化 NettyShuffleEnvironment ?? ??? ??? ??? ??? ?shuffleEnvironment = createShuffleEnvironment(...) ?? ??? ??? ??? ??? ?# 初始化 KVStageService ?? ??? ??? ??? ??? ?kvStateService = ?? ??? ??? ??? ??? ?KvStateService.fromConfiguration(...) ?? ??? ??? ??? ??? ?# 初始化 BroadCastVariableManager ?? ??? ??? ??? ??? ?broadcastVariableManager = new BroadcastVariableManager(); ?? ??? ??? ??? ??? ?# 初始化 TaskSlotTable ?? ??? ??? ??? ??? ?taskSlotTable = createTaskSlotTable(...) ?? ??? ??? ??? ??? ?# 初始化 DefaultJobTable ?? ??? ??? ??? ??? ?jobTable = DefaultJobTable.create(); ?? ??? ??? ??? ??? ?# 初始化 JobLeaderService ?? ??? ??? ??? ??? ?jobLeaderService = new DefaultJobLeaderService(....) ?? ??? ??? ??? ??? ?# 初始化 TaskStateManager ?? ??? ??? ??? ??? ?taskStateManager = new TaskExecutorLocalStateStoresManager() ?? ??? ??? ??? ??? ?# 初始化 LibraryCacheManager ?? ??? ??? ??? ??? ?libraryCacheManager = new BlobLibraryCacheManager() ?? ??? ??? ??? ??? ?# 返回 ?? ??? ??? ??? ??? ?return new TaskManagerServices(....) ?? ??? ??? ??? ?# 初始化一个 TaskExecutor ?? ??? ??? ??? ?return new TaskExecutor(.....) ?? ??? ??? ??? ??? ?# 初始化心跳管理器:jobManagerHeartbeatManager ?? ??? ??? ??? ??? ?this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices,resourceId); ?? ??? ??? ??? ??? ?# 初始化心跳管理器:resourceManagerHeartbeatManager ?? ??? ??? ??? ??? ?this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices,resourceId); ?? ??? ??? ??? ??? ?# 转到 TaskExecutor 的 onStart() 方法 ?? ??? ??? ??? ??? ?TaskExecutor.onStart(); ?? ??? ??? ??? ??? ??? ?startTaskExecutorServices(); ?? ??? ?# 启动 TaskManagerRunner ?? ??? ?taskManagerRunner.start();
?TaskManagerRunner的启动大致分为三类比较重要的:
- 一些基础服务
- TaskManagerService
- TaskExecutor
|