上一期指路
上一期???????
接着上一期讲到JobMaster的启动,其中又启动了slotPool,如下:
那么接下来slotPool寻回rm并向其请求slot。
1.StandaloneLeaderRetrievalService#start->JobMaster的内部类ResourceManagerLeaderListener#notifyLeaderAddress->JobMaster#notifyOfNewResourceManagerLeader
private void notifyOfNewResourceManagerLeader(final String newResourceManagerAddress, final ResourceManagerId resourceManagerId) {
resourceManagerAddress = createResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
}
①createResourceManagerAddress
获取当前rm的leader的地址
②reconnectToResourceManager
JobMaster重连接RM
2.JobMaster#reconnectToResourceManager
private void reconnectToResourceManager(Exception cause) {
closeResourceManagerConnection(cause);
tryConnectToResourceManager();
}
①closeResourceManagerConnection
先关闭确保不重复连接
②tryConnectToResourceManager
尝试连接rm
3.JobMaster#tryConnectToResourceManager->JobMaster#connectToResourceManager
private void connectToResourceManager() {
assert(resourceManagerAddress != null);
assert(resourceManagerConnection == null);
assert(establishedResourceManagerConnection == null);
log.info("Connecting to ResourceManager {}", resourceManagerAddress);
resourceManagerConnection = new ResourceManagerConnection(
log,
jobGraph.getJobID(),
resourceId,
getAddress(),
getFencingToken(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
scheduledExecutorService);
resourceManagerConnection.start();
}
①new ResourceManagerConnection
创建与rm的连接
②resourceManagerConnection.start
启动
4.RegisteredRpcConnection#start
public void start() {
checkState(!closed, "The RPC connection is already closed");
checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");
final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
newRegistration.startRegistration();
} else {
// concurrent start operation
newRegistration.cancel();
}
}
①createNewRegistration
创建一个jm向rm发起注册
②startRegistration
启动这个注册
5.RetryingRegistration#startRegistration
public void startRegistration() {
if (canceled) {
// we already got canceled
return;
}
try {
// trigger resolution of the target address to a callable gateway
final CompletableFuture<G> rpcGatewayFuture;
if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
rpcGatewayFuture = (CompletableFuture<G>) rpcService.connect(
targetAddress,
fencingToken,
targetType.asSubclass(FencedRpcGateway.class));
} else {
rpcGatewayFuture = rpcService.connect(targetAddress, targetType);
}
// upon success, start the registration attempts
CompletableFuture<Void> rpcGatewayAcceptFuture = rpcGatewayFuture.thenAcceptAsync(
(G rpcGateway) -> {
log.info("Resolved {} address, beginning registration", targetName);
register(rpcGateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis());
},
rpcService.getExecutor());
// upon failure, retry, unless this is cancelled
rpcGatewayAcceptFuture.whenCompleteAsync(
(Void v, Throwable failure) -> {
if (failure != null && !canceled) {
final Throwable strippedFailure = ExceptionUtils.stripCompletionException(failure);
if (log.isDebugEnabled()) {
log.debug(
"Could not resolve {} address {}, retrying in {} ms.",
targetName,
targetAddress,
retryingRegistrationConfiguration.getErrorDelayMillis(),
strippedFailure);
} else {
log.info(
"Could not resolve {} address {}, retrying in {} ms: {}",
targetName,
targetAddress,
retryingRegistrationConfiguration.getErrorDelayMillis(),
strippedFailure.getMessage());
}
startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());
}
},
rpcService.getExecutor());
}
catch (Throwable t) {
completionFuture.completeExceptionally(t);
cancel();
}
}
①rpcService.connect
将目标地址解析为一个可调用的网关
②register
成功后,就开始尝试注册。由于注册涉及到rpc通信,所以注册成功后,我们可以直接找JobMaster中的onRegistrationSuccess方法
6.JobMaster#onRegistrationSuccess->JobMaster#establishResourceManagerConnection
private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
final ResourceManagerId resourceManagerId = success.getResourceManagerId();
// verify the response with current connection
if (resourceManagerConnection != null
&& Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {
log.info("JobManager successfully registered at ResourceManager, leader id: {}.", resourceManagerId);
final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();
establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
resourceManagerGateway,
resourceManagerResourceId);
slotPool.connectToResourceManager(resourceManagerGateway);
resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
resourceManagerGateway.heartbeatFromJobManager(resourceID);
}
@Override
public void requestHeartbeat(ResourceID resourceID, Void payload) {
// request heartbeat will never be called on the job manager side
}
});
} else {
log.debug("Ignoring resource manager connection to {} because it's duplicated or outdated.", resourceManagerId);
}
}
7.一系列跳转如下:
SlotPoolImpl#connectToResourceManager->SlotPoolImpl#requestSlotFromResourceManager->ResourceManager#requestSlot->SlotManagerImpl#registerSlotRequest->SlotManagerImpl#internalRequestSlot->SlotManagerImpl中的fulfillPendingSlotRequestWithPendingTaskManagerSlot->SlotManagerImpl#allocateResource->ResourceManager中的内部类ResourceActionsImpl#allocateResource->ActiveResourceManager#startNewWorker->ActiveResourceManager#requestNewWorker->YarnResourceManagerDriver#requestResource
?ps:上面一步一步跳转中涉及到了SlotManager,说明这里rm接受到了jm的slot请求,由flink内部的rm内部的SlotManager向yarn的rm申请资源。
public CompletableFuture<YarnWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
checkInitialized();
final CompletableFuture<YarnWorkerNode> requestResourceFuture = new CompletableFuture<>();
final Optional<TaskExecutorProcessSpecContainerResourcePriorityAdapter.PriorityAndResource> priorityAndResourceOpt =
taskExecutorProcessSpecContainerResourcePriorityAdapter.getPriorityAndResource(taskExecutorProcessSpec);
if (!priorityAndResourceOpt.isPresent()) {
requestResourceFuture.completeExceptionally(
new ResourceManagerException(
String.format("Could not compute the container Resource from the given TaskExecutorProcessSpec %s. " +
"This usually indicates the requested resource is larger than Yarn's max container resource limit.",
taskExecutorProcessSpec)));
} else {
final Priority priority = priorityAndResourceOpt.get().getPriority();
final Resource resource = priorityAndResourceOpt.get().getResource();
resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority));
// make sure we transmit the request fast and receive fast news of granted allocations
resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec, ignore -> new LinkedList<>()).add(requestResourceFuture);
log.info("Requesting new TaskExecutor container with resource {}, priority {}.", taskExecutorProcessSpec, priority);
}
return requestResourceFuture;
}
①resourceManagerClient.addContainerRequest
添加容器资源请求
ps:这里的客户端是指与yarn中的rm通信的,总的来说,就SlotManager向yarn中rm请求资源
②resourceManagerClient.setHeartbeatInterval
设置心跳间隔
③打印有关请求了新的TaskExecutor容器的信息
当TaskExecutor容器被分配了之后,就一定最后会调用这YarnResourceManagerDriver中的onContainersAllocated函数,因为有请求就一般会伴随着回调
当ResourceManager用已分配的容器响应心跳时调用
8.YarnResourceManagerDriver#onContainersAllocated->YarnResourceManagerDriver#onContainersOfPriorityAllocated->YarnResourceManagerDriver#startTaskExecutorInContainerAsync
private void startTaskExecutorInContainerAsync(
Container container,
TaskExecutorProcessSpec taskExecutorProcessSpec,
ResourceID resourceId,
CompletableFuture<YarnWorkerNode> requestResourceFuture) {
final CompletableFuture<ContainerLaunchContext> containerLaunchContextFuture =
FutureUtils.supplyAsync(() -> createTaskExecutorLaunchContext(
resourceId, container.getNodeId().getHost(), taskExecutorProcessSpec), getIoExecutor());
FutureUtils.assertNoException(
containerLaunchContextFuture.handleAsync((context, exception) -> {
if (exception == null) {
nodeManagerClient.startContainerAsync(container, context);
requestResourceFuture.complete(new YarnWorkerNode(container, resourceId));
} else {
requestResourceFuture.completeExceptionally(exception);
}
return null;
}, getMainThreadExecutor()));
}
?
9.YarnResourceManagerDriver#createTaskExecutorLaunchContext->Utils#createTaskExecutorContext
static ContainerLaunchContext createTaskExecutorContext(
org.apache.flink.configuration.Configuration flinkConfig,
YarnConfiguration yarnConfig,
YarnResourceManagerDriverConfiguration configuration,
ContaineredTaskManagerParameters tmParams,
String taskManagerDynamicProperties,
String workingDirectory,
Class<?> taskManagerMainClass,
Logger log) throws Exception {
// get and validate all relevant variables
String remoteFlinkJarPath = checkNotNull(configuration.getFlinkDistJar(), "Environment variable %s not set", YarnConfigKeys.FLINK_DIST_JAR);
String shipListString = checkNotNull(configuration.getClientShipFiles(), "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
final String remoteKeytabPath = configuration.getRemoteKeytabPath();
final String localKeytabPath = configuration.getLocalKeytabPath();
final String keytabPrincipal = configuration.getKeytabPrinciple();
final String remoteYarnConfPath = configuration.getYarnSiteXMLPath();
final String remoteKrb5Path = configuration.getKrb5Path();
if (log.isDebugEnabled()) {
log.debug("TM:remote keytab path obtained {}", remoteKeytabPath);
log.debug("TM:local keytab path obtained {}", localKeytabPath);
log.debug("TM:keytab principal obtained {}", keytabPrincipal);
log.debug("TM:remote yarn conf path obtained {}", remoteYarnConfPath);
log.debug("TM:remote krb5 path obtained {}", remoteKrb5Path);
}
String classPathString = checkNotNull(configuration.getFlinkClasspath(), "Environment variable %s not set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
//register keytab
LocalResource keytabResource = null;
if (remoteKeytabPath != null) {
log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
Path keytabPath = new Path(remoteKeytabPath);
FileSystem fs = keytabPath.getFileSystem(yarnConfig);
keytabResource = registerLocalResource(fs, keytabPath, LocalResourceType.FILE);
}
//To support Yarn Secure Integration Test Scenario
LocalResource yarnConfResource = null;
if (remoteYarnConfPath != null) {
log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
Path yarnConfPath = new Path(remoteYarnConfPath);
FileSystem fs = yarnConfPath.getFileSystem(yarnConfig);
yarnConfResource = registerLocalResource(fs, yarnConfPath, LocalResourceType.FILE);
}
// register krb5.conf
LocalResource krb5ConfResource = null;
boolean hasKrb5 = false;
if (remoteKrb5Path != null) {
log.info("Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
Path krb5ConfPath = new Path(remoteKrb5Path);
FileSystem fs = krb5ConfPath.getFileSystem(yarnConfig);
krb5ConfResource = registerLocalResource(fs, krb5ConfPath, LocalResourceType.FILE);
hasKrb5 = true;
}
Map<String, LocalResource> taskManagerLocalResources = new HashMap<>();
// register Flink Jar with remote HDFS
final YarnLocalResourceDescriptor flinkDistLocalResourceDesc =
YarnLocalResourceDescriptor.fromString(remoteFlinkJarPath);
taskManagerLocalResources.put(
flinkDistLocalResourceDesc.getResourceKey(),
flinkDistLocalResourceDesc.toLocalResource());
//To support Yarn Secure Integration Test Scenario
if (yarnConfResource != null) {
taskManagerLocalResources.put(YARN_SITE_FILE_NAME, yarnConfResource);
}
if (krb5ConfResource != null) {
taskManagerLocalResources.put(KRB5_FILE_NAME, krb5ConfResource);
}
if (keytabResource != null) {
taskManagerLocalResources.put(localKeytabPath, keytabResource);
}
// prepare additional files to be shipped
decodeYarnLocalResourceDescriptorListFromString(shipListString).forEach(
resourceDesc -> taskManagerLocalResources.put(resourceDesc.getResourceKey(), resourceDesc.toLocalResource()));
// now that all resources are prepared, we can create the launch context
log.info("Creating container launch context for TaskManagers");
boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
String launchCommand = BootstrapTools.getTaskManagerShellCommand(
flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
hasLogback, hasLog4j, hasKrb5, taskManagerMainClass, taskManagerDynamicProperties);
if (log.isDebugEnabled()) {
log.debug("Starting TaskManagers with command: " + launchCommand);
} else {
log.info("Starting TaskManagers");
}
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
ctx.setCommands(Collections.singletonList(launchCommand));
ctx.setLocalResources(taskManagerLocalResources);
Map<String, String> containerEnv = new HashMap<>();
containerEnv.putAll(tmParams.taskManagerEnv());
// add YARN classpath, etc to the container environment
containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
setupYarnClassPath(yarnConfig, containerEnv);
containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
if (remoteKeytabPath != null && localKeytabPath != null && keytabPrincipal != null) {
containerEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remoteKeytabPath);
containerEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localKeytabPath);
containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, keytabPrincipal);
} else if (localKeytabPath != null && keytabPrincipal != null) {
containerEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localKeytabPath);
containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, keytabPrincipal);
}
ctx.setEnvironment(containerEnv);
// For TaskManager YARN container context, read the tokens from the jobmanager yarn container local file.
// NOTE: must read the tokens from the local file, not from the UGI context, because if UGI is login
// using Kerberos keytabs, there is no HDFS delegation token in the UGI context.
final String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
log.debug("Adding security tokens to TaskExecutor's container launch context.");
try (DataOutputBuffer dob = new DataOutputBuffer()) {
Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), HadoopUtils.getHadoopConfiguration(flinkConfig));
// Filter out AMRMToken before setting the tokens to the TaskManager container context.
Credentials taskManagerCred = new Credentials();
Collection<Token<? extends TokenIdentifier>> userTokens = cred.getAllTokens();
for (Token<? extends TokenIdentifier> token : userTokens) {
if (!token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
final Text id = new Text(token.getIdentifier());
taskManagerCred.addToken(id, token);
}
}
taskManagerCred.writeTokenStorageToStream(dob);
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
ctx.setTokens(securityTokens);
} catch (Throwable t) {
log.error("Failed to add Hadoop's security tokens.", t);
}
} else {
log.info("Could not set security tokens because Hadoop's token file location is unknown.");
}
return ctx;
}
10.YarnTaskExecutorRunner#main
public static void main(String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
runTaskManagerSecurely(args);
}
终于分析到启动YarnTaskExecutorRunner了,剩下的下一期分析吧。
总览
这一期涉及到的源码流程图跳转图如下:
?我们下期见!
|