Hadoop源码分析(21)
读取journalnode数据
? 在文档(20)中分析journalnode是如何执行远程调用的getEditLogManifest方法的。当journalnode执行完成后会将结果返回个客户端即namenode。
? 这里接着从最初调用getEditLogManifest的selectInputStreams方法内容如下:
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk) throws IOException {
QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
loggers.getEditLogManifest(fromTxnId, inProgressOk);
Map<AsyncLogger, RemoteEditLogManifest> resps =
loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
"selectInputStreams");
LOG.debug("selectInputStream manifests:\n" +
Joiner.on("\n").withKeyValueSeparator(": ").join(resps));
final PriorityQueue<EditLogInputStream> allStreams =
new PriorityQueue<EditLogInputStream>(64,
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
AsyncLogger logger = e.getKey();
RemoteEditLogManifest manifest = e.getValue();
for (RemoteEditLog remoteLog : manifest.getLogs()) {
URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());
EditLogInputStream elis = EditLogFileInputStream.fromUrl(
connectionFactory, url, remoteLog.getStartTxId(),
remoteLog.getEndTxId(), remoteLog.isInProgress());
allStreams.add(elis);
}
}
JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId);
}
? 这个方法在文档(12)中分析过,第5行的getEditLogManifest方法即之前分析的远程调用方法,第6行和第7行的waitForWriteQuorum方法,是用来等journalnode的返回的方法。其内容如下:
<V> Map<AsyncLogger, V> waitForWriteQuorum(QuorumCall<AsyncLogger, V> q,
int timeoutMs, String operationName) throws IOException {
int majority = getMajoritySize();
try {
q.waitFor(
loggers.size(),
majority,
majority,
timeoutMs, operationName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted waiting " + timeoutMs + "ms for a " +
"quorum of nodes to respond.");
} catch (TimeoutException e) {
throw new IOException("Timed out waiting " + timeoutMs + "ms for a " +
"quorum of nodes to respond.");
}
if (q.countSuccesses() < majority) {
q.rethrowException("Got too many exceptions to achieve quorum size " +
getMajorityString());
}
return q.getResults();
}
? 重点在第5行调用QuorumCall对象的waitFor方法等待返回,获取到返回后执行第24行的方法获取结果。
? waitFor方法的内容如下:
public synchronized void waitFor(
int minResponses, int minSuccesses, int maxExceptions,
int millis, String operationName)
throws InterruptedException, TimeoutException {
long st = timer.monotonicNow();
long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD);
long et = st + millis;
while (true) {
restartQuorumStopWatch();
checkAssertionErrors();
if (minResponses > 0 && countResponses() >= minResponses) return;
if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
if (maxExceptions >= 0 && countExceptions() > maxExceptions) return;
long now = timer.monotonicNow();
if (now > nextLogTime) {
long waited = now - st;
String msg = String.format(
"Waited %s ms (timeout=%s ms) for a response for %s",
waited, millis, operationName);
if (!successes.isEmpty()) {
msg += ". Succeeded so far: [" + Joiner.on(",").join(successes.keySet()) + "]";
}
if (!exceptions.isEmpty()) {
msg += ". Exceptions so far: [" + getExceptionMapString() + "]";
}
if (successes.isEmpty() && exceptions.isEmpty()) {
msg += ". No responses yet.";
}
if (waited > millis * WAIT_PROGRESS_WARN_THRESHOLD) {
QuorumJournalManager.LOG.warn(msg);
} else {
QuorumJournalManager.LOG.info(msg);
}
nextLogTime = now + WAIT_PROGRESS_INTERVAL_MILLIS;
}
long rem = et - now;
if (rem <= 0) {
long timeoutIncrease = getQuorumTimeoutIncreaseMillis(0, millis);
if (timeoutIncrease > 0) {
et += timeoutIncrease;
} else {
throw new TimeoutException();
}
}
restartQuorumStopWatch();
rem = Math.min(rem, nextLogTime - now);
rem = Math.max(rem, 1);
wait(rem);
long timeoutIncrease = getQuorumTimeoutIncreaseMillis(-rem, millis);
if (timeoutIncrease > 0) {
et += timeoutIncrease;
}
}
}
? 这个房会用来检查journalnode的返回,判断其是否超时。注意这里会有countResponses、countSuccesses、countExceptions等方法,这个是因为它判断的并不是某一个journalnode的返回,而是所有journalnode节点的返回。
? 然后selectInputStreams方法之后的代码便是在解析这些返回值,在解析这些代码前,需要先解析清楚这些返回值的具体情况。
? 在上文提到了journalnode中存储日志的过程中采取了过半原则,即有过半的journalnode成功存储了这个日志,便认为这次存储成功了。反之,不过半则认为存储失败。
? 在理想情况下是所有journalnode都存储成,如图中情况A所示。但实际会因为某些journalnode网络故障、异常退出或其他原因使得该服务器存储失败,然后在一段时间后该journalnode可通过重启或其他手段恢复,继续存储日志。这时会出现如图中情况B所示的情景。
? 在selectInputStreams方法中,在接收到journalnode的返回值后,首先会创建一个allStreams对象,这个对象是PriorityQueue类的对象,这是一个优先队列,会对传入队列中的数据进行排序,排序的方式有传入的对象决定,这里创建的是:JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR。其内容如下:
static final public Comparator<EditLogInputStream>
EDIT_LOG_INPUT_STREAM_COMPARATOR = new Comparator<EditLogInputStream>() {
@Override
public int compare(EditLogInputStream a, EditLogInputStream b) {
return ComparisonChain.start().
compare(a.getFirstTxId(), b.getFirstTxId()).
compare(b.getLastTxId(), a.getLastTxId()).
result();
}
};
? 这里可以看见这个comparator主要是按照log文件的txid来排序的。
? 然后是selectInputStreams方法的第16行到第28行,这里会用两个for循环遍历所有journalnode的所有日志文件,并将其添加到上文建立的队列中。从上文的分析中可以知道每台journalnode都存储了一份日志文件,所以这里的日志文件会重复。因此在真正使用前需要合并重复日志。这里是同过JournalSet的chainAndMakeRedundantStreams方法来合并的。其内容如下:
public static void chainAndMakeRedundantStreams(
Collection<EditLogInputStream> outStreams,
PriorityQueue<EditLogInputStream> allStreams, long fromTxId) {
LinkedList<EditLogInputStream> acc =
new LinkedList<EditLogInputStream>();
EditLogInputStream elis;
while ((elis = allStreams.poll()) != null) {
if (acc.isEmpty()) {
acc.add(elis);
} else {
EditLogInputStream accFirst = acc.get(0);
long accFirstTxId = accFirst.getFirstTxId();
if (accFirstTxId == elis.getFirstTxId()) {
if (elis.isInProgress()) {
if (accFirst.isInProgress()) {
acc.add(elis);
}
} else {
if (accFirst.isInProgress()) {
acc.clear();
}
acc.add(elis);
}
} else if (accFirstTxId < elis.getFirstTxId()) {
Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR);
outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear();
acc.add(elis);
} else if (accFirstTxId > elis.getFirstTxId()) {
throw new RuntimeException("sorted set invariants violated! " +
"Got stream with first txid " + elis.getFirstTxId() +
", but the last firstTxId was " + accFirstTxId);
}
}
}
if (!acc.isEmpty()) {
Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR);
outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear();
}
}
? 首先是第10行,这里创建了一个acc对象,这是一个linkedlist,主要用来来存储相同的日志文件。然后是第13行定义了一个while循环从上述的队列中获取数据,然后是while循环内从第14行到第44行,这里是一个if else语句。首先是第14行当acc为空的时候,直接将拿到的日志文件添加到acc中。然后是else的部分,即acc中有内容的情况。首先是第17行先从acc中获取其中的第一条数据,然后第18行取出数据的txid,用这个txid与循环遍历的日志文件的txid进行比较,这里主要分为三种情况:txid相同,acc中的txid小于当前日志的txid,acc中的txid大于日志文件中的txid。
? 当两种txid相同的时候,会执行第23行到31行中的内容。这里主要是对可能出现的inProgress文件进行处理。在当前namenode的启动流程中不会有inprogress文件,但在别的流程中可能出现。这里如果没有inprogress文件的话,实际可以直接将当前日志添加到acc文件中。但这里对有inprogress文件的情况作出了处理,首先是当前文件是inprogress的情况(第22行的if语句),这时如果acc中文件也是inprogress文件,则将当前文件添加到acc中,否则不做任何处理。
? 在journalnode实际可能出现多个inprogress文件,而除了最新的inprogress文件其余的都没有用。出现多个inprogress文件与journalnode的故障有关:在journalnode出现故障退出服务的时候,这时会有一个inprogress文件在日志目录中,这代表了它故障前存储的日志,但一个journalnode故障并不会影响journalnode集群的工作,集群继续工作将当前的inprogress文件写成完整的日志文件,然后创建新的inprogress文件。这时如果故障的journalnode重新恢复后,它持有的inprogress文件与其他journalnode的inprogress文件不同,两者的数据不一致,这时journalnode的处理是去同步集群的inprogress文件,而对其原有的inprogress文件没有其他处理,所以在journalnode的目录下可能出现多个inprogress文件。
? 然后回到上述代码,第26行的else语句,代表了当前文件不是inprogress文件的情况,这时如果acc中的文件是inprogress文件,则代表添加进acc的文件是上述所说的不需要的inprogress文件,所以需要将acc中的数据清除,即第28行的clear语句。然后第29行将当前文件添加到acc中。
?然后是第32行,这里处理acc中的txid小于当前文件的txid的情况。上文解析了传入的队列是一个优先队列,队列中的数据都是按txid排序的。当前文件的txid大于acc存储的txid的时候队列中没有与acc中txid相同的文件了。所以这里首先会对acc中的数据进行排序(第35行),然后将acc中的数据封装成一个新的流,并添加到outStreams中(第36行),然后将acc清空,将当前的日志文件添加到acc中,开始对下一个txid的数据进行处理。
? 然后是第39行对acc中txid大于当前文件的txid的情况。这种情况理论上不会出现,若出现了对其的处理是抛出异常。
? 最后是第46行到第50行,对while结束后acc中存储的数据进行处理。
|