1 概述
Namenode会定期将文件系统的命名空间(文件目录树、文件/ 目录元信息) 保存到fsimage文件中, 以防止Namenode掉电或者进程崩溃。 但如果Namenode实时地将内存中的元数据同步到fsimage文件中, 将会非常消耗资源且造成Namenode运行缓慢。 所以Namenode会先将命名空间的修改操作保存在editlog文件中, 然后定期合并fsimage和editlog文件。
FSImage类主要实现了以下功能:
- 保存命名空间——将当前时刻Namenode内存中的命名空间保存到fsimage文件中。
- 加载fsimage文件——将磁盘上fsimage文件中保存的命名空间加载到Namenode内存中, 这个操作是保存命名空间操作的逆操作。
- 加载editlog文件——Namenode加载了fsimage文件后, 内存中只包含了命名空间在保存fsimage文件时的信息, Namenode还需要加载后续对命名空间的修改操作,即editlog文件中记录的内容。 所以FSImage类还提供了加载editlog文件到Namenode内存中的功能.
2 源码解释
2.1 初始化
FsImage是在NN启动时加载完成,通过FSNamesystem中的loadFromDisk方法进行初始化的
// FSNamesystem初始化是在NameNode#initialize()方法中的loadNamesystem(conf)完成:
// 根据配置中指定位置的edit和fsImage初始化FsNameSystem
loadNamesystem(conf);
...
// 这里其实是调用FSNamesystem中的loadFromDisk方法,在加载磁盘中edit日志的过程中完成了FSNamesystem的初始化
protected void loadNamesystem(Configuration conf) throws IOException {
this.namesystem = FSNamesystem.loadFromDisk(conf);
}
static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
// 检查和fsImage以及edit相关的配置
checkConfiguration(conf);
// 根据指定目录初始化FSImage
FSImage fsImage = new FSImage(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
}
现在看看他的构造函数:
/**
* Construct the FSImage. Set the default checkpoint directories.
*
* Setup storage and initialize the edit log.
*
* @param conf Configuration
* @param imageDirs Directories the image can be stored in.
* @param editsDirs Directories the editlog can be stored in.
* @throws IOException if directories are invalid.
*/
protected FSImage(Configuration conf,
Collection<URI> imageDirs,
List<URI> editsDirs)
throws IOException {
this.conf = conf;
// 负责管理 NameNode 使用的 StorageDirectories
storage = new NNStorage(conf, imageDirs, editsDirs);
// dfs.namenode.name.dir.restore:默认false,设置为 true 使 NameNode 能够尝试恢复
// 以前失败的 dfs.namenode.name.dir。启用后,将在检查点期间尝试恢复任何失败的目录
if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
storage.setRestoreFailedStorage(true);
}
// 初始化FsEditLog
this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
// 负责检查 NN 的存储目录并在检查点和编辑日志上执行保留策略。
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
}
2.2 保存Namespace
2.2.1 saveFSImageInAllDirs过程
FsImage的功能之一就是保存namespace到fsimage。这里使用的方法是saveFSImageInAllDirs,而此方法会在这些地方调用:
- SecondaryNameNode的doMerge方法中,完成fsimage和editlog的merge;
- Checkpointer中的doCheckpoint方法中
- FsImage在执行format和upgrade时
/**
* @see #saveFSImageInAllDirs(FSNamesystem, NameNodeFile, long, Canceler)
*/
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
throws IOException {
if (!addToCheckpointing(txid)) {
throw new IOException(("FS image is being downloaded from another NN"));
}
try {
saveFSImageInAllDirs(source, NameNodeFile.IMAGE, txid, null);
} finally {
removeFromCheckpointing(txid);
}
}
而另一种调用方式是在FsImage#saveNamespace方法中调用,这里保存namespace数据时,主要使用此种方式。
/**
* Save the contents of the FS image to a new image file in each of the
* current storage directories.
*/
public synchronized void saveNamespace(FSNamesystem source, NameNodeFile nnf,
Canceler canceler) throws IOException {
assert editLog != null : "editLog must be initialized";
LOG.info("Save namespace ...");
storage.attemptRestoreRemovedStorage();
boolean editLogWasOpen = editLog.isSegmentOpen();
if (editLogWasOpen) {
editLog.endCurrentLogSegment(true);
}
long imageTxId = getCorrectLastAppliedOrWrittenTxId();
if (!addToCheckpointing(imageTxId)) {
throw new IOException(
"FS image is being downloaded from another NN at txid " + imageTxId);
}
try {
try {
saveFSImageInAllDirs(source, nnf, imageTxId, canceler);
...
下面saveFSImageInAllDirs是一个私有方法,最终完成fsimage的存储。
// namenode可以配置多个存储目录
private synchronized void saveFSImageInAllDirs(FSNamesystem source,
NameNodeFile nnf, long txid, Canceler canceler) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAVING_CHECKPOINT);
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
throw new IOException("No image directories available!");
}
if (canceler == null) {
canceler = new Canceler();
}
SaveNamespaceContext ctx = new SaveNamespaceContext(
source, txid, canceler);
try {
// 对于每个存储路径,开启一个线程进行处理
List<Thread> saveThreads = new ArrayList<Thread>();
// save images into current
// 遍历
for (Iterator<StorageDirectory> it
= storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
StorageDirectory sd = it.next();
// 命名空间具体的保存操作是由FSImageSaver这个类来承担的,
// FSImageSaver是FSImage中的内部类, 也是一个线程类,
// 它的run()方法调用了saveFSImage()方法来保存fsimage文件。
FSImageSaver saver = new FSImageSaver(ctx, sd, nnf);
Thread saveThread = new Thread(saver, saver.toString());
saveThreads.add(saveThread);
saveThread.start();
}
//等待所有线程执行完毕
waitForThreads(saveThreads);
saveThreads.clear();
storage.reportErrorsOnDirectories(ctx.getErrorSDs());
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
throw new IOException(
"Failed to save in any storage directories while saving namespace.");
}
if (canceler.isCancelled()) {
deleteCancelledCheckpoint(txid);
ctx.checkCancelled(); // throws
assert false : "should have thrown above!";
}
// 将fsimage.ckpt 改名为 fsimage
renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf, false);
// Since we now have a new checkpoint, we can clean up some
// old edit logs and checkpoints.
// Do not purge anything if we just wrote a corrupted FsImage.
// 如果完成了一次checckpoint,则清楚老的editlog以及老的checkpoint
if (!exitAfterSave.get()) {
purgeOldStorage(nnf);
archivalManager.purgeCheckpoints(NameNodeFile.IMAGE_NEW);
}
} finally {
// Notify any threads waiting on the checkpoint to be canceled
// that it is complete.
ctx.markComplete();
ctx = null;
}
prog.endPhase(Phase.SAVING_CHECKPOINT);
}
总结一下就是:nn由多个目录存储fsimage,那么在保存namespace元数据时,会每一个目录对应一个线程去操作,之后等待所有线程执行完成,在此过程中,为了防止保存过程中出现错误,会在过程将namespace信息保存在一个fsimage.ckpt文件中,等到所有信息都被保存完整后才将文件名改为fsimage,最后会在确定完成了此次checkpoint之后清除老的editlog和checkpoint。
2.2.2 FSImageSaver
在上述过程中,主要的线程是FSImageSaver,这里通过start启动后调用线程中的run方法,从而完成namespace数据的保存:
/**
* FSImageSaver is being run in a separate thread when saving
* FSImage. There is one thread per each copy of the image.
*
* FSImageSaver assumes that it was launched from a thread that holds
* FSNamesystem lock and waits for the execution of FSImageSaver thread
* to finish.
* This way we are guaranteed that the namespace is not being updated
* while multiple instances of FSImageSaver are traversing it
* and writing it out.
*/
private class FSImageSaver implements Runnable {
private final SaveNamespaceContext context;
private final StorageDirectory sd;
private final NameNodeFile nnf;
public FSImageSaver(SaveNamespaceContext context, StorageDirectory sd,
NameNodeFile nnf) {
this.context = context;
this.sd = sd;
this.nnf = nnf;
}
@Override
public void run() {
// Deletes checkpoint file in every storage directory when shutdown.
Runnable cancelCheckpointFinalizer = () -> {
try {
deleteCancelledCheckpoint(context.getTxId());
LOG.info("FSImageSaver clean checkpoint: txid={} when meet " +
"shutdown.", context.getTxId());
} catch (IOException e) {
LOG.error("FSImageSaver cancel checkpoint threw an exception:", e);
}
};
ShutdownHookManager.get().addShutdownHook(cancelCheckpointFinalizer,
SHUTDOWN_HOOK_PRIORITY);
try {
saveFSImage(context, sd, nnf);
} catch (SaveNamespaceCancelledException snce) {
LOG.info("Cancelled image saving for " + sd.getRoot() +
": " + snce.getMessage());
// don't report an error on the storage dir!
} catch (Throwable t) {
LOG.error("Unable to save image for " + sd.getRoot(), t);
context.reportErrorOnStorageDirectory(sd);
try {
deleteCancelledCheckpoint(context.getTxId());
LOG.info("FSImageSaver clean checkpoint: txid={} when meet " +
"Throwable.", context.getTxId());
} catch (IOException e) {
LOG.error("FSImageSaver cancel checkpoint threw an exception:", e);
}
}
}
@Override
public String toString() {
return "FSImageSaver for " + sd.getRoot() +
" of type " + sd.getStorageDirType();
}
}
在上面run方法中会调用saveFSImage方法完成namespace数据的存储
2.2.3 saveFSImage
/**
* Save the contents of the FS image to the file.
*/
void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
NameNodeFile dstType) throws IOException {
// 获取当前命名空间中记录的最新事务的txid
long txid = context.getTxId();
// fsimage.ckpt文件
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
// IMAGE:fsimage文件,即为最后的目标文件
File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
// 使用protobuf方式做存储
FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context,
conf);
FSImageCompression compression = FSImageCompression.createCompression(conf);
// 通过saver存储nn的namespace数据
long numErrors = saver.save(newFile, compression);
if (numErrors > 0) {
// The image is likely corrupted.
LOG.error("Detected " + numErrors + " errors while saving FsImage " +
dstFile);
exitAfterSave.set(true);
}
// 将md5值写入md5文件
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
storage.setMostRecentCheckpointInfo(txid, Time.now());
}
2.2.4 FSImageFormatProtobuf.Saver
在上面步骤中使用FSImageFormatProtobuf.Saver完成nn的namespace数据的存储,调用save方法继续操作,而在此方法中可以调用saveInternal完成最后的操作。
不过在此之前,先了解下FSImageFormatProtobuf,使用protobuf定义的fsimage文件的格式, 它包括了4个部分的信息:
- MAGIC: fsimage的文件头, 是“HDFSIMG1”这个字符串的二进制形式, MAGIC头标识了当前fsimage文件是使用protobuf格式序列化的。 FSImage类在读取fsimage文件时, 会先判断fsimage文件是否包含了MAGIC头, 如果包含了则使用protobuf格式反序列化fsimage文件。
- SECTIONS: fsimage文件会将同一类型的Namenode元信息保存在一个section中,例如将文件系统元信息保存在NameSystemSection中, 将文件系统目录树中的所有INode信息保存在INodeSection中, 将快照信息保存在SnapshotSection中等。fsimage文件的第二个部分就是Namenode各类元信息对应的所有section, 每类section中都包含了对应Namenode元信息的属性。
- FileSummary: FileSummary记录了fsimage文件的元信息, 以及fsimage文件保存的所有section的信息。 FileSummary中的ondiskVersion字段记录了fsimage文件的版本号(hadoop3.3.0还是1) , layoutVersion字段记录了当前HDFS的文件系统布局版本号, codec字段记录了fsimage文件的压缩编码, sections字段则记录了fsimage文件中各个section字段的元信息, 每个fsimage文件中记录的section在FileSummary中都有一个与之对应的section字段。 FileSummary的section字段记录了对应的fsimage中section的名称、 在fsimage文件中的长度以及这个section在fsimage中的起始位置。 FSImage类在读取fsimage文件时, 会先从fsimage中读取出FileSummary部分, 然后利用FileSummary记录的元信息指导fsimage文件的反序列化操作。
- FileSummaryLength: FileSummaryLength记录了FileSummary在fsimage文件中所占的长度, FSImage类在读取fsimage文件时, 会首先读取FileSummaryLength获取FileSummary部分的长度, 然后根据这个长度从fsimage中反序列化出FileSummary
下面就先看看FSImageFormatProtobuf.Saver#save()方法
/**
* @return number of non-fatal errors detected while writing the image.
* @throws IOException on fatal error.
*/
long save(File file, FSImageCompression compression) throws IOException {
enableSubSectionsIfRequired();
// 创建outputstream
FileOutputStream fout = new FileOutputStream(file);
fileChannel = fout.getChannel();
try {
LOG.info("Saving image file {} using {}", file, compression);
long startTime = monotonicNow();
// 调用方法完成最后的保存
long numErrors = saveInternal(
fout, compression, file.getAbsolutePath());
LOG.info("Image file {} of size {} bytes saved in {} seconds {}.", file,
file.length(), (monotonicNow() - startTime) / 1000,
(numErrors > 0 ? (" with" + numErrors + " errors") : ""));
return numErrors;
} finally {
fout.close();
}
}
/**
* saveInternal()方法首先构造底层fsimage文件的输出流, 构造fsimage文件的描述类FileSummary,
* 然后在FileSummary中记录ondiskVersion、 layoutVersion、 codec等信息。
* @return number of non-fatal errors detected while writing the FsImage.
* @throws IOException on fatal error.
*/
private long saveInternal(FileOutputStream fout,
FSImageCompression compression, String filePath) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
MessageDigest digester = MD5Hash.getDigester();
// 获取layoutVersion
int layoutVersion =
context.getSourceNamesystem().getEffectiveLayoutVersion();
// 构建输出流
underlyingOutputStream = new DigestOutputStream(new BufferedOutputStream(
fout), digester);
// 写入magic_header:内容为"HDFSIMG1"
underlyingOutputStream.write(FSImageUtil.MAGIC_HEADER);
fileChannel = fout.getChannel();
// 构建FileSummary,是fsimage文件的描述部分, 也是protobuf定义的
// 包括onDiskVersion、layoutVersion
FileSummary.Builder b = FileSummary.newBuilder()
.setOndiskVersion(FSImageUtil.FILE_VERSION)
.setLayoutVersion(
context.getSourceNamesystem().getEffectiveLayoutVersion());
// 为FileSummary设置codec,记录的是压缩格式
codec = compression.getImageCodec();
if (codec != null) {
b.setCodec(codec.getClass().getCanonicalName());
sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
} else {
sectionOutputStream = underlyingOutputStream;
}
/**
* 接下来saveInternal()方法依次向fsimage文件中写入
* 1.命名空间信息、
* 2.inode信息、
* 3.快照信息、
* 4.安全信息、
* 5.缓存信息、
* 6.StringTable
* 信息等。
*
* 注意上述信息都是以section为单位写入的, 每个section的格式定义请参考fsimage.proto文件。
* saveInternal()方法以section为单位写入元数据信息时,
* 还会在FileSummary中记录这个section的长度,以及section在fsimage文件中的起始位置等信息。
*/
// 写入名称空间信息
saveNameSystemSection(b);
// Check for cancellation right after serializing the name system section.
// Some unit tests, such as TestSaveNamespace#testCancelSaveNameSpace
// depends on this behavior.
// 检查是否取消保存操作
context.checkCancelled();
Step step;
// Erasure coding policies should be saved before inodes
// 根据是否支持erasure_coding从而保存erasure_coding section
if (NameNodeLayoutVersion.supports(
NameNodeLayoutVersion.Feature.ERASURE_CODING, layoutVersion)) {
step = new Step(StepType.ERASURE_CODING_POLICIES, filePath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
saveErasureCodingSection(b);
prog.endStep(Phase.SAVING_CHECKPOINT, step);
}
// 保存命名空间中的inode信息
step = new Step(StepType.INODES, filePath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
// Count number of non-fatal errors when saving inodes and snapshots.
long numErrors = saveInodes(b);
// 保存快照信息
numErrors += saveSnapshots(b);
prog.endStep(Phase.SAVING_CHECKPOINT, step);
step = new Step(StepType.DELEGATION_TOKENS, filePath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
// 保存安全信息
saveSecretManagerSection(b);
prog.endStep(Phase.SAVING_CHECKPOINT, step);
// 保存缓存信息
step = new Step(StepType.CACHE_POOLS, filePath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
saveCacheManagerSection(b);
prog.endStep(Phase.SAVING_CHECKPOINT, step);
// 保存StringTable
saveStringTableSection(b);
// We use the underlyingOutputStream to write the header. Therefore flush
// the buffered stream (which is potentially compressed) first.
// flush 输出流
flushSectionOutputStream();
FileSummary summary = b.build();
// 保存FileSummary
saveFileSummary(underlyingOutputStream, summary);
underlyingOutputStream.close();
savedDigest = new MD5Hash(digester.digest());
return numErrors;
}
因此整个saveInternal方法完成fsimage的写入。
代码中涉及的各个section在此不作介绍,最后看看saveFileSummary:
private static void saveFileSummary(OutputStream out, FileSummary summary)
throws IOException {
summary.writeDelimitedTo(out);
int length = getOndiskTrunkSize(summary);
byte[] lengthBytes = new byte[4];
ByteBuffer.wrap(lengthBytes).asIntBuffer().put(length);
out.write(lengthBytes);
}
2.3 加载fsImage
当Namenode启动时, 首先会将fsimage文件中记录的命名空间加载到Namenode内存中, 然后再一条一条地将editlog文件中记录的更新操作加载并合并到命名空间中
long loadStart = monotonicNow();
try {
// 加载fsImage,过程会做一系列的判断,如是否formatted,是否需要保存新的fsImage等
namesystem.loadFSImage(startOpt);
} catch (IOException ioe) {
LOG.warn("Encountered exception loading fsimage", ioe);
fsImage.close();
throw ioe;
}
2.3.1 FSNamesystem#loadFSImage
// 加载fsimage
private void loadFSImage(StartupOption startOpt) throws IOException {
final FSImage fsImage = getFSImage();
// format before starting up if requested
// 如果是format,完成fsimage的format
if (startOpt == StartupOption.FORMAT) {
// reuse current id
fsImage.format(this, fsImage.getStorage().determineClusterId(), false);
startOpt = StartupOption.REGULAR;
}
boolean success = false;
writeLock();
try {
// We shouldn't be calling saveNamespace if we've come up in standby state.
MetaRecoveryContext recovery = startOpt.createRecoveryContext();
// 在加载fsImage的过程中完成判断,如果存在fsImage的文件目录未被格式化,则为true
final boolean staleImage
= fsImage.recoverTransitionRead(startOpt, this, recovery);
if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt)) {
rollingUpgradeInfo = null;
}
final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade();
LOG.info("Need to save fs image? " + needToSave
+ " (staleImage=" + staleImage + ", haEnabled=" + haEnabled
+ ", isRollingUpgrade=" + isRollingUpgrade() + ")");
if (needToSave) {
// 保存fsImage到磁盘中
fsImage.saveNamespace(this);
} else {
// No need to save, so mark the phase done.
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAVING_CHECKPOINT);
prog.endPhase(Phase.SAVING_CHECKPOINT);
}
// This will start a new log segment and write to the seen_txid file, so
// we shouldn't do it when coming up in standby state
if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)
|| (haEnabled && startOpt == StartupOption.UPGRADEONLY)) {
fsImage.openEditLogForWrite(getEffectiveLayoutVersion());
}
success = true;
} finally {
if (!success) {
fsImage.close();
}
writeUnlock("loadFSImage", true);
}
imageLoadComplete();
}
2.3.2 recoverTransitionRead
/**
* Analyze storage directories.
* Recover from previous transitions if required.
* Perform fs state transition if necessary depending on the namespace info.
* Read storage info.
*
* @throws IOException
* @return true if the image needs to be saved or false otherwise
*/
boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
MetaRecoveryContext recovery)
throws IOException {
assert startOpt != StartupOption.FORMAT :
"NameNode formatting should be performed before reading the image";
// 获取imageDir和editDir
Collection<URI> imageDirs = storage.getImageDirectories();
Collection<URI> editsDirs = editLog.getEditURIs();
// none of the data dirs exist
if((imageDirs.size() == 0 || editsDirs.size() == 0)
&& startOpt != StartupOption.IMPORT)
throw new IOException(
"All specified directories are not accessible or do not exist.");
// 1. For each data directory calculate its state and
// check whether all is consistent before transitioning.
// 检查每一个fsimage数据目录的状态
Map<StorageDirectory, StorageState> dataDirStates =
new HashMap<StorageDirectory, StorageState>();
// 对于每个存储目录,执行未完成转换(例如升级、回滚、检查点)的
// 恢复并将目录的存储状态插入到 dataDirStates 映射中。
boolean isFormatted = recoverStorageDirs(startOpt, storage, dataDirStates);
if (LOG.isTraceEnabled()) {
LOG.trace("Data dir states:\n " +
Joiner.on("\n ").withKeyValueSeparator(": ")
.join(dataDirStates));
}
if (!isFormatted && startOpt != StartupOption.ROLLBACK
&& startOpt != StartupOption.IMPORT) {
throw new IOException("NameNode is not formatted.");
}
int layoutVersion = storage.getLayoutVersion();
if (startOpt == StartupOption.METADATAVERSION) {
System.out.println("HDFS Image Version: " + layoutVersion);
System.out.println("Software format version: " +
HdfsServerConstants.NAMENODE_LAYOUT_VERSION);
return false;
}
if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) {
NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
}
if (startOpt != StartupOption.UPGRADE
&& startOpt != StartupOption.UPGRADEONLY
&& !RollingUpgradeStartupOption.STARTED.matches(startOpt)
&& layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
&& layoutVersion != HdfsServerConstants.NAMENODE_LAYOUT_VERSION) {
throw new IOException(
"\nFile system image contains an old layout version "
+ storage.getLayoutVersion() + ".\nAn upgrade to version "
+ HdfsServerConstants.NAMENODE_LAYOUT_VERSION + " is required.\n"
+ "Please restart NameNode with the \""
+ RollingUpgradeStartupOption.STARTED.getOptionString()
+ "\" option if a rolling upgrade is already started;"
+ " or restart NameNode with the \""
+ StartupOption.UPGRADE.getName() + "\" option to start"
+ " a new upgrade.");
}
// 处理upgrade和upgradeOnly,设置clusterId和blockPoolId
storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);
// 2. Format unformatted dirs.
// format fsimage dirs
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
StorageState curState = dataDirStates.get(sd);
switch(curState) {
case NON_EXISTENT:
throw new IOException(StorageState.NON_EXISTENT +
" state cannot be here");
case NOT_FORMATTED:
// Create a dir structure, but not the VERSION file. The presence of
// VERSION is checked in the inspector's needToSave() method and
// saveNamespace is triggered if it is absent. This will bring
// the storage state uptodate along with a new VERSION file.
// If HA is enabled, NNs start up as standby so saveNamespace is not
// triggered.
LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
LOG.info("Formatting ...");
sd.clearDirectory(); // create empty current dir
// For non-HA, no further action is needed here, as saveNamespace will
// take care of the rest.
if (!target.isHaEnabled()) {
continue;
}
// If HA is enabled, save the dirs to create a version file later when
// a checkpoint image is saved.
if (newDirs == null) {
newDirs = new HashSet<StorageDirectory>();
}
newDirs.add(sd);
break;
default:
break;
}
}
// 3. Do transitions
// 执行不同操作
switch(startOpt) {
case UPGRADE:
case UPGRADEONLY:
doUpgrade(target);
return false; // upgrade saved image already
case IMPORT:
doImportCheckpoint(target);
return false; // import checkpoint saved image already
case ROLLBACK:
throw new AssertionError("Rollback is now a standalone command, " +
"NameNode should not be starting with this option.");
case REGULAR:
default:
// just load the image
}
// 执行加载loadFSImage
return loadFSImage(target, startOpt, recovery);
}
2.3.3 FSImage#loadFSImage
/**
* 选择一个最新的fsimage目录加载并与editlog合并
* Choose latest image from one of the directories,
* load it and merge with the edits.
*
* Saving and loading fsimage should never trigger symlink resolution.
* The paths that are persisted do not have *intermediate* symlinks
* because intermediate symlinks are resolved at the time files,
* directories, and symlinks are created. All paths accessed while
* loading or saving fsimage should therefore only see symlinks as
* the final path component, and the functions called below do not
* resolve symlinks that are the final path component.
*
* @return whether the image should be saved
* @throws IOException
*/
private boolean loadFSImage(FSNamesystem target, StartupOption startOpt,
MetaRecoveryContext recovery)
throws IOException {
final boolean rollingRollback
= RollingUpgradeStartupOption.ROLLBACK.matches(startOpt);
final EnumSet<NameNodeFile> nnfs;
if (rollingRollback) {
// if it is rollback of rolling upgrade, only load from the rollback image
nnfs = EnumSet.of(NameNodeFile.IMAGE_ROLLBACK);
} else {
// otherwise we can load from both IMAGE and IMAGE_ROLLBACK
nnfs = EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK);
}
final FSImageStorageInspector inspector = storage
.readAndInspectDirs(nnfs, startOpt);
isUpgradeFinalized = inspector.isUpgradeFinalized();
List<FSImageFile> imageFiles = inspector.getLatestImages();
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.LOADING_FSIMAGE);
File phaseFile = imageFiles.get(0).getFile();
prog.setFile(Phase.LOADING_FSIMAGE, phaseFile.getAbsolutePath());
prog.setSize(Phase.LOADING_FSIMAGE, phaseFile.length());
// 各个文件目录是否被格式化了
boolean needToSave = inspector.needToSave();
Iterable<EditLogInputStream> editStreams = null;
initEditLog(startOpt);
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
// If we're open for write, we're either non-HA or we're the active NN, so
// we better be able to load all the edits. If we're the standby NN, it's
// OK to not be able to read all of edits right now.
// In the meanwhile, for HA upgrade, we will still write editlog thus need
// this toAtLeastTxId to be set to the max-seen txid
// For rollback in rolling upgrade, we need to set the toAtLeastTxId to
// the txid right before the upgrade marker.
long toAtLeastTxId = editLog.isOpenForWrite() ? inspector
.getMaxSeenTxId() : 0;
if (rollingRollback) {
// note that the first image in imageFiles is the special checkpoint
// for the rolling upgrade
toAtLeastTxId = imageFiles.get(0).getCheckpointTxId() + 2;
}
editStreams = editLog.selectInputStreams(
imageFiles.get(0).getCheckpointTxId() + 1,
toAtLeastTxId, recovery, false);
} else {
editStreams = FSImagePreTransactionalStorageInspector
.getEditLogStreams(storage);
}
int maxOpSize = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT);
for (EditLogInputStream elis : editStreams) {
elis.setMaxOpSize(maxOpSize);
}
for (EditLogInputStream l : editStreams) {
LOG.debug("Planning to load edit log stream: " + l);
}
if (!editStreams.iterator().hasNext()) {
LOG.info("No edit log streams selected.");
}
FSImageFile imageFile = null;
// 通过imageFile加载fsimage
for (int i = 0; i < imageFiles.size(); i++) {
try {
imageFile = imageFiles.get(i);
loadFSImageFile(target, recovery, imageFile, startOpt);
break;
} catch (IllegalReservedPathException ie) {
throw new IOException("Failed to load image from " + imageFile,
ie);
} catch (Exception e) {
LOG.error("Failed to load image from " + imageFile, e);
target.clear();
imageFile = null;
}
}
// Failed to load any images, error out
if (imageFile == null) {
FSEditLog.closeAllStreams(editStreams);
throw new IOException("Failed to load FSImage file, see error(s) " +
"above for more info.");
}
prog.endPhase(Phase.LOADING_FSIMAGE);
if (!rollingRollback) {
prog.beginPhase(Phase.LOADING_EDITS);
long txnsAdvanced = loadEdits(editStreams, target, Long.MAX_VALUE,
startOpt, recovery);
prog.endPhase(Phase.LOADING_EDITS);
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
txnsAdvanced);
} else {
// Trigger the rollback for rolling upgrade. Here lastAppliedTxId equals
// to the last txid in rollback fsimage.
rollingRollback(lastAppliedTxId + 1, imageFiles.get(0).getCheckpointTxId());
needToSave = false;
}
editLog.setNextTxId(lastAppliedTxId + 1);
return needToSave;
}
在上面代码中,遍历所有fsimageFile,而后根据指定的fsImageFile加载fsimage,过程中各种嵌套调用loadFSImage,因此我们在这看最后一个:
/**
* Load in the filesystem image from file. It's a big list of
* filenames and blocks.
*/
private void loadFSImage(File curFile, MD5Hash expectedMd5,
FSNamesystem target, MetaRecoveryContext recovery,
boolean requireSameLayoutVersion) throws IOException {
// BlockPoolId is required when the FsImageLoader loads the rolling upgrade
// information. Make sure the ID is properly set.
target.setBlockPoolId(this.getBlockPoolID());
// 构建LoaderDelegator对象(加载器代理对象),便于后续使用此加载器完成fsimage的加载
FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target);
loader.load(curFile, requireSameLayoutVersion);
// Check that the image digest we loaded matches up with what
// we expected
MD5Hash readImageMd5 = loader.getLoadedImageMd5();
if (expectedMd5 != null &&
!expectedMd5.equals(readImageMd5)) {
throw new IOException("Image file " + curFile +
" is corrupt with MD5 checksum of " + readImageMd5 +
" but expecting " + expectedMd5);
}
long txId = loader.getLoadedImageTxId();
LOG.info("Loaded image for txid " + txId + " from " + curFile);
lastAppliedTxId = txId;
storage.setMostRecentCheckpointInfo(txId, curFile.lastModified());
}
// 在此方法中在此调用
public void load(File file, boolean requireSameLayoutVersion)
throws IOException {
Preconditions.checkState(impl == null, "Image already loaded!");
InputStream is = null;
try {
is = Files.newInputStream(file.toPath());
byte[] magic = new byte[FSImageUtil.MAGIC_HEADER.length];
IOUtils.readFully(is, magic, 0, magic.length);
if (Arrays.equals(magic, FSImageUtil.MAGIC_HEADER)) {
FSImageFormatProtobuf.Loader loader = new FSImageFormatProtobuf.Loader(
conf, fsn, requireSameLayoutVersion);
impl = loader;
// 使用它的load方法加载fsimage文件. 在load方法中最终调用了loadInternal(raFile, fin);方法.
// 这个方法是加载fsimage文件的相对最底层的方法了.
loader.load(file);
} else {
Loader loader = new Loader(conf, fsn);
impl = loader;
loader.load(file);
}
} finally {
IOUtils.cleanupWithLogger(LOG, is);
}
}
FSImageFormatProtobuf.Loader.loadInternal()方法执行了加载fsimage文件的操作, loadInternal()方法会打开fsimage文件通道, 然后读取fsimage文件中的FileSummary对象, FileSummary对象中记录了fsimage中保存的所有section的信息。loadInternal()会对FileSummary对象中保存的section排序, 然后遍历每个section并调用对应的方法从fsimage文件中加载这个section。
private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
throws IOException {
if (!FSImageUtil.checkFileFormat(raFile)) {
throw new IOException("Unrecognized file format");
}
// 加载fileSummary
FileSummary summary = FSImageUtil.loadSummary(raFile);
if (requireSameLayoutVersion && summary.getLayoutVersion() !=
HdfsServerConstants.NAMENODE_LAYOUT_VERSION) {
throw new IOException("Image version " + summary.getLayoutVersion() +
" is not equal to the software version " +
HdfsServerConstants.NAMENODE_LAYOUT_VERSION);
}
FileChannel channel = fin.getChannel();
// innnode和snapshot加载器
FSImageFormatPBINode.Loader inodeLoader = new FSImageFormatPBINode.Loader(
fsn, this);
FSImageFormatPBSnapshot.Loader snapshotLoader = new FSImageFormatPBSnapshot.Loader(
fsn, this);
// 给section排序
ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
.getSectionsList());
Collections.sort(sections, new Comparator<FileSummary.Section>() {
@Override
public int compare(FileSummary.Section s1, FileSummary.Section s2) {
SectionName n1 = SectionName.fromString(s1.getName());
SectionName n2 = SectionName.fromString(s2.getName());
if (n1 == null) {
return n2 == null ? 0 : -1;
} else if (n2 == null) {
return -1;
} else {
return n1.ordinal() - n2.ordinal();
}
}
});
StartupProgress prog = NameNode.getStartupProgress();
/**
* beginStep() and the endStep() calls do not match the boundary of the
* sections. This is because that the current implementation only allows
* a particular step to be started for once.
*/
Step currentStep = null;
boolean loadInParallel = enableParallelSaveAndLoad(conf);
ExecutorService executorService = null;
ArrayList<FileSummary.Section> subSections =
getAndRemoveSubSections(sections);
if (loadInParallel) {
executorService = getParallelExecutorService();
}
// 遍历每个section, 并调用对应的方法加载这个section
for (FileSummary.Section s : sections) {
channel.position(s.getOffset());
InputStream in = new BufferedInputStream(new LimitInputStream(fin,
s.getLength()));
in = FSImageUtil.wrapInputStreamForCompression(conf,
summary.getCodec(), in);
String n = s.getName();
SectionName sectionName = SectionName.fromString(n);
if (sectionName == null) {
throw new IOException("Unrecognized section " + n);
}
ArrayList<FileSummary.Section> stageSubSections;
// 加载fsimage文件中的不同类型的section
switch (sectionName) {
case NS_INFO:
loadNameSystemSection(in);
break;
case STRING_TABLE:
loadStringTableSection(in);
break;
case INODE: {
currentStep = new Step(StepType.INODES);
prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
stageSubSections = getSubSectionsOfName(
subSections, SectionName.INODE_SUB);
if (loadInParallel && (stageSubSections.size() > 0)) {
inodeLoader.loadINodeSectionInParallel(executorService,
stageSubSections, summary.getCodec(), prog, currentStep);
} else {
inodeLoader.loadINodeSection(in, prog, currentStep);
}
}
break;
case INODE_REFERENCE:
snapshotLoader.loadINodeReferenceSection(in);
break;
case INODE_DIR:
stageSubSections = getSubSectionsOfName(
subSections, SectionName.INODE_DIR_SUB);
if (loadInParallel && stageSubSections.size() > 0) {
inodeLoader.loadINodeDirectorySectionInParallel(executorService,
stageSubSections, summary.getCodec());
} else {
inodeLoader.loadINodeDirectorySection(in);
}
break;
case FILES_UNDERCONSTRUCTION:
inodeLoader.loadFilesUnderConstructionSection(in);
break;
case SNAPSHOT:
snapshotLoader.loadSnapshotSection(in);
break;
case SNAPSHOT_DIFF:
snapshotLoader.loadSnapshotDiffSection(in);
break;
case SECRET_MANAGER: {
prog.endStep(Phase.LOADING_FSIMAGE, currentStep);
Step step = new Step(StepType.DELEGATION_TOKENS);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
loadSecretManagerSection(in, prog, step);
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
break;
case CACHE_MANAGER: {
Step step = new Step(StepType.CACHE_POOLS);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
loadCacheManagerSection(in, prog, step);
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
break;
case ERASURE_CODING:
Step step = new Step(StepType.ERASURE_CODING_POLICIES);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
loadErasureCodingSection(in);
prog.endStep(Phase.LOADING_FSIMAGE, step);
break;
default:
LOG.warn("Unrecognized section {}", n);
break;
}
}
if (executorService != null) {
executorService.shutdown();
}
}
2.4 加载Editlog
在FSImage#loadFSImage中,在完成fsimage的加载后,如果不需要进行回滚,那么将执行加载edits的操作:
if (!rollingRollback) {
prog.beginPhase(Phase.LOADING_EDITS);
long txnsAdvanced = loadEdits(editStreams, target, Long.MAX_VALUE,
startOpt, recovery);
prog.endPhase(Phase.LOADING_EDITS);
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
txnsAdvanced);
} else {
// Trigger the rollback for rolling upgrade. Here lastAppliedTxId equals
// to the last txid in rollback fsimage.
rollingRollback(lastAppliedTxId + 1, imageFiles.get(0).getCheckpointTxId());
needToSave = false;
}
最后看看loadEdits
FSImage.loadEdits()方法会构造一个FSEditLogLoader对象. 然后遍历Namenode所有存储路径上保存的editlog文件的输入流 并调用FSEditLogLoader.loadFSEdits()方法加载指定路径上的editlog文件。
public long loadEdits(Iterable<EditLogInputStream> editStreams,
FSNamesystem target, long maxTxnsToRead,
StartupOption startOpt, MetaRecoveryContext recovery)
throws IOException {
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
//记录命名空间中加载的最新的事务id
long prevLastAppliedTxId = lastAppliedTxId;
long remainingReadTxns = maxTxnsToRead;
try {
// 构造FSEditLogLoader对象用于加栽editlog文件
FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);
// Load latest edits
// 遍历edit流,加载所有edit
for (EditLogInputStream editIn : editStreams) {
LogAction logAction = loadEditLogHelper.record();
if (logAction.shouldLog()) {
String logSuppressed = "";
if (logAction.getCount() > 1) {
logSuppressed = "; suppressed logging for " +
(logAction.getCount() - 1) + " edit reads";
}
LOG.info("Reading " + editIn + " expecting start txid #" +
(lastAppliedTxId + 1) + logSuppressed);
}
try {
remainingReadTxns -= loader.loadFSEdits(editIn, lastAppliedTxId + 1,
remainingReadTxns, startOpt, recovery);
} finally {
// Update lastAppliedTxId even in case of error, since some ops may
// have been successfully applied before the error.
lastAppliedTxId = loader.getLastAppliedTxId();
}
// If we are in recovery mode, we may have skipped over some txids.
if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID
&& recovery != null) {
lastAppliedTxId = editIn.getLastTxId();
}
if (remainingReadTxns <= 0) {
break;
}
}
} finally {
FSEditLog.closeAllStreams(editStreams);
}
return lastAppliedTxId - prevLastAppliedTxId;
}
|