对于YARN的介绍,可以参考之前的文章: 大数据理论与实践4 分布式资源管理系统YARN
RM高可用
Hadoop官方文档对于RM的高可用是这样描述的:ResourceManager (RM) 负责跟踪集群中的资源,并调度应用程序。在Hadoop2.4之前是存在单点故障的。高可用机制是通过Active/Standby ResourceManager对的形式添加冗余,来消除这种单点故障。 当RM发生故障需要进行故障转移时,有三种方式:
- 手动转换和故障转移
管理员应该首先将 Active-RM 转换为 Standby,然后将 Standby-RM 转换为 Active。可通过命令行来进行。 - 自动故障转移
RM 可以选择嵌入基于Zookeeper的ActiveStandbyElector来决定哪个 RM 应该是 Active。不需要像HDFS那样运行单独的ZKFC守护进程,因为嵌入在RM中的ActiveStandbyElector充当故障检测器和领导选举器,而不是单独的ZKFC守护进程。
下面源码阅读主要是针对RM故障转移的。 首先在ResourceManager里面中,serviceInit()方法中,会对当前的高可用进行判断: 如果yarn.resourcemanager.ha.enabled配置参数为true(默认false),则为启用RM高可用;
protected void serviceInit(Configuration conf) throws Exception {
...
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
if (this.rmContext.isHAEnabled()) {
HAUtil.verifyAndSetConfiguration(this.conf);
}
...
}
在HAUtil中,会对RM的高可用信息进行验证:
public static void verifyAndSetConfiguration(Configuration conf)
throws YarnRuntimeException {
verifyAndSetRMHAIdsList(conf);
verifyAndSetCurrentRMHAId(conf);
verifyLeaderElection(conf);
verifyAndSetAllServiceAddresses(conf);
}
如果开启了RM高可用,会获取到yarn.resourcemanager.ha.automatic-failover.enabled(默认true),将RM设置为启用自动故障转移。 如果yarn.resourcemanager.ha.automatic-failover.embedded为true(默认false),则会在ResourceManager初始化过程中,调用createEmbeddedElector()创建选举器。
protected EmbeddedElector createEmbeddedElector() throws IOException {
EmbeddedElector elector;
curatorEnabled =
conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
this.zkManager = createAndStartZKManager(conf);
elector = new CuratorBasedElectorService(this);
} else {
elector = new ActiveStandbyElectorBasedElectorService(this);
}
return elector;
}
在hadoop3.3.1中,配置yarn.resourcemanager.ha.automatic-failover.curator-leader-elector.enabled已经被弃用了,因此选举器就是ActiveStandbyElector,它是基于ZK选主实现的。
基于ZK的HA机制
在ActiveStandbyElector中,被选举成为Active的RM会调用AdminService的transitionToActive()函数,把RM状态转换为Active:
public void becomeActive() throws ServiceFailedException {
cancelDisconnectTimer();
try {
rm.getRMContext().getRMAdminService().transitionToActive(req);
} catch (Exception e) {
throw new ServiceFailedException("RM could not transition to Active", e);
}
}
AdminService是提供管理员服务的RPC请求(类似的,ClientRMService为普通用户提供服务)。在AdminService中:
public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
if (isRMActive()) {
return;
}
try {
refreshAdminAcls(false);
} catch (YarnException ex) {
throw new ServiceFailedException("Can not execute refreshAdminAcls", ex);
}
UserGroupInformation user = checkAccess("transitionToActive");
checkHaStateChange(reqInfo);
try {
refreshAll();
} catch (Exception e) {
rm.getRMContext()
.getDispatcher()
.getEventHandler()
.handle(
new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e, "failure to refresh configuration settings"));
throw new ServiceFailedException(
"Error on refreshAll during transition to Active", e);
}
try {
rm.transitionToActive();
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
"", "RM",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
}
RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
"RM");
}
要想将RM转换为Active,首先需要进行各种权限检查,并且让这个RM或得到最新的各种信息。 可以看到,在这个方法中调用了ResourceManager的transitionToActive()方法:
synchronized void transitionToActive() throws Exception {
if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) {
LOG.info("Already in active state");
return;
}
LOG.info("Transitioning to active state");
this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
try {
startActiveServices();
return null;
} catch (Exception e) {
reinitialize(true);
throw e;
}
}
});
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
LOG.info("Transitioned to active state");
}
调用了startActiveServices()函数,启动RMActiveServices
void startActiveServices() throws Exception {
if (activeServices != null) {
clusterTimeStamp = System.currentTimeMillis();
activeServices.start();
}
}
这里的start()(定义在org.apache.hadoop.service AbstractService.java)会调用RMActiveServices(ResourceManager内部类)serviceStart()函数来开启RM Active的服务。 这里涉及了RMStateStore,是实现ResourceManager状态存储的基类。它的作用稍后会进行分析,可以理解为它负责存储和读取RM的状态。
protected void serviceStart() throws Exception {
RMStateStore rmStore = rmContext.getStateStore();
rmStore.start();
if(recoveryEnabled) {
try {
LOG.info("Recovery started");
rmStore.checkVersion();
if (rmContext.isWorkPreservingRecoveryEnabled()) {
rmContext.setEpoch(rmStore.getAndIncrementEpoch());
}
RMState state = rmStore.loadState();
recover(state);
LOG.info("Recovery ended");
} catch (Exception e) {
LOG.error("Failed to load/recover state", e);
throw e;
}
} else {
if (HAUtil.isFederationEnabled(conf)) {
long epoch = conf.getLong(YarnConfiguration.RM_EPOCH,
YarnConfiguration.DEFAULT_RM_EPOCH);
rmContext.setEpoch(epoch);
LOG.info("Epoch set for Federation: " + epoch);
}
}
super.serviceStart();
}
它的核心是进行数据的同步,那么具体什么数据被同步了呢,是怎样被同步的呢?
RM状态信息(元数据)与高可用机制
在上面的代码中,可以看出,启动Active RM的时候,核心是获取到状态信息,然后恢复状态。
RMState state = rmStore.loadState();
recover(state);
LOG.info("Recovery ended");
这里的rmStore是RMStateStore虚基类的实例。 从官方文档可以知道,RMStateStore是实现ResourceManager状态存储的基类。负责异步通知和与对象的接口。真正的存储实现需要从中派生并实现阻塞存储和加载方法,以实际存储和加载状态。
在Hadoop3.3.1中,提供了五种RM状态信息的存储方式,分别是:
- MemoryRMStateStore:信息状态保存在内存中的实现类
- FileSystemRMStateStore:信息状态保存在HDFS文件系统中,如HDFS 或本地FS
- NullRMStateStore:所有函数都是空的,什么都不做,不保存应用状态信息。
- ZKRMStateStore:信息状态保存在Zookeeper中。
- LeveldbRMStateStore:基于 LevelDB 的状态存储实现
存储方式默认值为FileSystemRMStateStore。 在初始化RMActiveServiceContext(维护Active的RM状态信息的类)的过程中,StateStore会被置为NullRMStateStore();在ResourceManager初始化过程中,会读取配置信息yarn.resourcemanager.store.class,在RMStateStoreFactory类中通过反射机制,获取到指定的存储方式,并初始化。 注意:用户可以自由选择任何存储来设置 RM 重启,但必须使用基于 ZooKeeper 的状态存储来支持RM高可用。因为只有基于ZooKeeper的 state-store支持fencing机制,以避免出现裂脑情况。基于Hadoop文件系统存储的是不支持RM高可用高可用的。基于LevelDB的状态存储被认为比基于HDFS和ZooKeeper的状态存储更轻,每次状态更新的I/O操作更少,文件系统上的文件总数也少得多,但是也不支持高可用。
protected void serviceInit(Configuration configuration) throws Exception
{
RMStateStore rmStore = null;
if (recoveryEnabled) {
rmStore = RMStateStoreFactory.getStore(conf);
boolean isWorkPreservingRecoveryEnabled =
conf.getBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
rmContext
.setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
} else {
rmStore = new NullRMStateStore();
}
try {
rmStore.setResourceManager(rm);
rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher);
} catch (Exception e) {
LOG.error("Failed to init state store", e);
throw e;
}
rmContext.setStateStore(rmStore);
}
那么接下来看通过各种方式获取和保存具体状态信息的。
基于ZK的RM状态信息获取(支持高可用)
在ZK中存储的状态信息如下:
The znode structure is as follows:
ROOT_DIR_PATH
|--- VERSION_INFO
|--- EPOCH_NODE
|--- RM_ZK_FENCING_LOCK
|--- RM_APP_ROOT
| |----- HIERARCHIES
| | |----- 1
| | | |----- (#ApplicationId barring last character)
| | | | |----- (#Last character of ApplicationId)
| | | | | |----- (#ApplicationAttemptIds)
| | | ....
| | |
| | |----- 2
| | | |----- (#ApplicationId barring last 2 characters)
| | | | |----- (#Last 2 characters of ApplicationId)
| | | | | |----- (#ApplicationAttemptIds)
| | | ....
| | |
| | |----- 3
| | | |----- (#ApplicationId barring last 3 characters)
| | | | |----- (#Last 3 characters of ApplicationId)
| | | | | |----- (#ApplicationAttemptIds)
| | | ....
| | |
| | |----- 4
| | | |----- (#ApplicationId barring last 4 characters)
| | | | |----- (#Last 4 characters of ApplicationId)
| | | | | |----- (#ApplicationAttemptIds)
| | | ....
| | |
| |----- (#ApplicationId1)
| | |----- (#ApplicationAttemptIds)
| |
| |----- (#ApplicationId2)
| | |----- (#ApplicationAttemptIds)
| ....
|
|--- RM_DT_SECRET_MANAGER_ROOT
|----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
|----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
| |----- 1
| | |----- (#TokenId barring last character)
| | | |----- (#Last character of TokenId)
| | ....
| |----- 2
| | |----- (#TokenId barring last 2 characters)
| | | |----- (#Last 2 characters of TokenId)
| | ....
| |----- 3
| | |----- (#TokenId barring last 3 characters)
| | | |----- (#Last 3 characters of TokenId)
| | ....
| |----- 4
| | |----- (#TokenId barring last 4 characters)
| | | |----- (#Last 4 characters of TokenId)
| | ....
| |----- Token_1
| |----- Token_2
| ....
|
|----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
| |----- Key_1
| |----- Key_2
....
|--- AMRMTOKEN_SECRET_MANAGER_ROOT
|----- currentMasterKey
|----- nextMasterKey
|-- RESERVATION_SYSTEM_ROOT
|------PLAN_1
| |------ RESERVATION_1
| |------ RESERVATION_2
| ....
|------PLAN_2
....
|-- PROXY_CA_ROOT
|----- caCert
|----- caPrivateKey
可以看到,主要存储了版本,Epoch信息,Application信息,安全相关(SECRET_MANAGER)的信息等。 通过之前的可以看出,通过loadState()方法来获取状态信息。 loadState()方法在ZKRMStateStore中实现如下:
@Override
public synchronized RMState loadState() throws Exception {
long start = clock.getTime();
RMState rmState = new RMState();
loadRMDTSecretManagerState(rmState);
loadRMAppState(rmState);
loadAMRMTokenSecretManagerState(rmState);
loadReservationSystemState(rmState);
loadProxyCAManagerState(rmState);
opDurations.addLoadStateCallDuration(clock.getTime() - start);
return rmState;
}
也就是说在loadState()的时候,把ZK里面存储的所有关于RM的信息全部或取下来,放在静态的rmState里面。
基于FileSystem的RM状态信息获取(不支持高可用)
和基于ZK的基本一致,loadState()函数内容完全一致,只是存储位置不同。 在FileSystemRMStateStore中存储了变量FileSystem,来管理当前的文件系统类型。
public class FileSystemRMStateStore extends RMStateStore {
protected FileSystem fs;
}
存储位置是存储在配置文件中yarn.resourcemanager.fs.state-store.uri指定的位置。默认值为${hadoop.tmp.dir}/yarn/system/rmstore,如果未提供文件系统名称,将使用*conf/core-site.xml中指定的fs.default.name。
基于内存的RM状态信息获取(不支持高可用)
获取到RMState(静态类)的信息。
public class MemoryRMStateStore extends RMStateStore {
RMState state = new RMState();
}
当需要存储信息时,会将信息直接写入这个静态类的对象。 在loadState()时,会返回一个copy
public synchronized RMState loadState() throws Exception {
RMState returnState = new RMState();
returnState.appState.putAll(state.appState);
returnState.rmSecretManagerState.getMasterKeyState()
.addAll(state.rmSecretManagerState.getMasterKeyState());
returnState.rmSecretManagerState.getTokenState().putAll(
state.rmSecretManagerState.getTokenState());
returnState.rmSecretManagerState.dtSequenceNumber =
state.rmSecretManagerState.dtSequenceNumber;
returnState.amrmTokenSecretManagerState =
state.amrmTokenSecretManagerState == null ? null
: AMRMTokenSecretManagerState
.newInstance(state.amrmTokenSecretManagerState);
if (state.proxyCAState.getCaCert() != null) {
byte[] caCertData = state.proxyCAState.getCaCert().getEncoded();
returnState.proxyCAState.setCaCert(caCertData);
}
if (state.proxyCAState.getCaPrivateKey() != null) {
byte[] caPrivateKeyData
= state.proxyCAState.getCaPrivateKey().getEncoded();
returnState.proxyCAState.setCaPrivateKey(caPrivateKeyData);
}
return returnState;
}
状态信息恢复
再通过上面两种方法来获取到RM状态之后,就可以进行状态恢复。调用recover(state)方法,来将状态同步到当前RM上。
public void recover(RMState state) throws Exception {
rmContext.getRMDelegationTokenSecretManager().recover(state);
rmContext.getAMRMTokenSecretManager().recover(state);
if (reservationSystem != null) {
reservationSystem.recover(state);
}
rmAppManager.recover(state);
rmContext.getProxyCAManager().recover(state);
setSchedulerRecoveryStartAndWaitTime(state, conf);
}
可以看出,恢复的也就是存储的那几块内容。
计算高可用
内容来自《 Hadoop 权威指南第四版 》
task失败
task 失败的第一种情况就是用户的 map task 或者 reduce task 代码在执行的之后抛出了运行时异常。如果发生了,就做以下操作:
- task JVM 会在退出之前报告失败给他的 AppMaster。
- 错误会立刻写进用户的日志中。
- AppMaster 将 task 标记为 fail。
- 释放 task 的容器,以留给其他的 task 使用。
task 失败的另一种情况就是 JVM 意外的退出,在这种情况下,操作如下:
- node manager 注意到处理已经退出,就报告给 AppMaster。
- AppMaster 就会将 task 标记为 failed。
AppMaster注意到task有一段时间没有发来执行状态的更新,就将其标记为 filed。JVM进程会在当前周期(超时周期一般为 10 分钟,通过 mapreduce.task.timeout 属性来设置,值的单位为毫秒)后自动的被kill掉。
如果AppMaster被告知task的尝试失败了,他将会重新调度来执行该task。 AppMaster会尽量不将task重新调度到先前失败的node manager之下。默认情况下当 task 失败了四次,就不再重新运行他。尝试次数是可以设置的,通过 mapreduce.map.maxattempt 属性设置 map 尝试次数。通过 mapreduce.reduce.maxattempt 设置 reduce 尝试次数。
job失败
在一些应用中,不希望因为少数的task失败而造成job中途夭折。这时,可以设置失败task的比例(mapreduce.map.failures.maxpercents 和 mapreduce.reduce.failures.maxpercents),失败的task在此比例内,就不会触发job的失败。当超过设置的比例,整个job失败后,RM会在其他NM上重启AM(默认2次)。
参考
Apache Hadoop YARN 官方文档 Apache Hadoop 3.3.1源码 YARN源码分析(三)-----ResourceManager HA之应用状态存储与恢复 yarn3.2源码分析之ResourceManager基于zk的HA机制 YARN - Task, Node manager, AppMaster, Resource manager 失败时所做的处理 RM状态存储与还原机制详解 YARN Federation的架构设计 深入理解Hadoop HA机制 醒一醒,讲到 ZooKeeper 的选举机制了 hadoop HA 详解
|