Hadoop源码分析(12)
1、 journalnode客户端
? 在文档(11)中分析了初始化editlog的方法。在初始化之前其会根据集 群的配置状态选择不同的方式来进行初始化。在HA状态下,其会调用一个 initJournals方法来进行初始化,这个方法会创建一个journalset对象,并创建 journalmanager对象传入journalset中,进行统一管理。
在文档(10)中分析了的在初始化完editlog后,接下来就要利用editlog获取 操作日志。在文档(10)中实现这个操作的是editlog的selectInputStreams方法, 这个方法的内容如下:
public Collection<EditLogInputStream> selectInputStreams(
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
boolean inProgressOk) throws IOException {
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
synchronized(journalSetLock) {
Preconditions.checkState(journalSet.isOpen(), "Cannot call " +
"selectInputStreams() on closed FSEditLog");
selectInputStreams(streams, fromTxId, inProgressOk);
}
try {
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
} catch (IOException e) {
if (recovery != null) {
LOG.error(e);
} else {
closeAllStreams(streams);
throw e;
}
}
return streams;
}
? 这里的重点是第9行,这里调用了一个重载方法,其内容如下:
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) throws IOException {
journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
}
? 这里很简单,就是调用了journalset的selectInputStreams方法。其内容如下:
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) throws IOException {
final PriorityQueue<EditLogInputStream> allStreams =
new PriorityQueue<EditLogInputStream>(64,
EDIT_LOG_INPUT_STREAM_COMPARATOR);
for (JournalAndStream jas : journals) {
if (jas.isDisabled()) {
LOG.info("Skipping jas " + jas + " since it's disabled");
continue;
}
try {
jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
} catch (IOException ioe) {
LOG.warn("Unable to determine input streams from " + jas.getManager() +
". Skipping.", ioe);
}
}
chainAndMakeRedundantStreams(streams, allStreams, fromTxId);
}
? 首先是第7行这里会直接遍历journals。这个journals是 journalset的属性,其类型是list。在文档(11)中提到在初始化editlog时, 调用了一个initJournals方法。这个方法里会调用journalset的add方法 存储创建的journalmanager。这个add方法实际是将数据存储到journals中, 这个方法的内容如下:
void add(JournalManager j, boolean required, boolean shared) {
JournalAndStream jas = new JournalAndStream(j, required, shared);
journals.add(jas);
}
? 这里可以看见它对传入journalmanager继续封装成了一个 JournalAndStream类,然后将创建的jas添加到journals中。这里 JournalAndStream的构造方法如下:
public JournalAndStream(JournalManager manager, boolean required,
boolean shared) {
this.journal = manager;
this.required = required;
this.shared = shared;
}
? 这里的构造方法只是单纯的赋值。/p>
?然后继续看selectInputStreams方法,在for里拿到journals中的 jas后,会先判断其是否可用(第8行),然后调用jas的getManager方法,再然后 调用其返回值的selectInputStreams方法(第13行)。 getManager方法很简单,会直接返回创建时传入的manager对象。其内容如下:
JournalManager getManager() {
return journal;
}
? 参考之前的文档可知,这里的journal实际是QuorumJournalManager, 所以这里实际是调用的QuorumJournalManager的selectInputStreams方法。 这个方法的内容如下:
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk) throws IOException {
QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
loggers.getEditLogManifest(fromTxnId, inProgressOk);
Map<AsyncLogger, RemoteEditLogManifest> resps =
loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
"selectInputStreams");
LOG.debug("selectInputStream manifests:\n" +
Joiner.on("\n").withKeyValueSeparator(": ").join(resps));
final PriorityQueue<EditLogInputStream> allStreams =
new PriorityQueue<EditLogInputStream>(64,
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
AsyncLogger logger = e.getKey();
RemoteEditLogManifest manifest = e.getValue();
for (RemoteEditLog remoteLog : manifest.getLogs()) {
URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());
EditLogInputStream elis = EditLogFileInputStream.fromUrl(
connectionFactory, url, remoteLog.getStartTxId(),
remoteLog.getEndTxId(), remoteLog.isInProgress());
allStreams.add(elis);
}
}
JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId);
}
? 首先是第4行,这里会调用getEditLogManifest方法与journalnode 进行连接,获取journalnode存储的editlog列表。
? 然后是第6行,因为namenode和journalnode不在同一个jvm中, 甚至不在同一台机器中,所以namenode和journalnode是使用远程调用(RPC) 来进行通信的。因此,在这里需要调用waitForWriteQuorum方法来等远程调用 返回结果。这个方法很重要,如果在这里与journalnode通信失败,在HA模式下, namenode会自动退出。
? 然后是第16行使用for循环遍历所有的editlog,并通过URL的方法 获取到具体的editlog的输入流。注意这里是有两个for循环,这是因为editlog 是分段存储的,不同的分段通过txid来标识,而在journalnode中,每一个 journalnode都会存储一份editlog。所以这里需要用两个for循环每个 journalnode上的每个editlog分段。
? 最后再 调用JournalSet的chainAndMakeRedundantStreams检查输入流。
? 这里先分析getEditLogManifest方法,并详细解析hdfs的远程调用。 首先是getEditLogManifest方法的内容如下:
public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
long fromTxnId, boolean inProgressOk) {
Map<AsyncLogger,
ListenableFuture<RemoteEditLogManifest>> calls
= Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<RemoteEditLogManifest> future =
logger.getEditLogManifest(fromTxnId, inProgressOk);
calls.put(logger, future);
}
return QuorumCall.create(calls);
}
? 这里的方法也很简单。首先是第6行使用for循环遍历loggers对象, 这个对象在文档(11)中解析过,其中存储的是与journalnode对应的 IPCLoggerChannel对象。然后调用IPCLoggerChannel对象的 getEditLogManifest方法,并将其返回的Future对象放入map中。 最后再利用这个map创建一个QuorumCall。 IPCLoggerChannel的getEditLogManifest方法内容如下:
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
final long fromTxnId, final boolean inProgressOk) {
return parallelExecutor.submit(new Callable<RemoteEditLogManifest>() {
@Override
public RemoteEditLogManifest call() throws IOException {
GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
journalId, fromTxnId, inProgressOk);
constructHttpServerURI(ret);
return PBHelper.convert(ret.getManifest());
}
});
}
?这个方法实际就一行,使用parallelExecutor的submit方法提交了 一个匿名的callable方法。parallelExecutor是一个线程池,callable是 java中带返回值的线程接口。
? 然后再看callable的call方法,这里首先是第6行会首先调用一个 getProxy方法获取一个代理对象。然后再调用代理对象的getEditLogManifest方 法。这一步获取代理对象实际是在创建一个HDFS的远程调用客户端。
? 这一步若是执行成功后,便代表着远程调用执行成功。最后再将返回值封装便可。
|