FSImage并行加载
关于NameNode FSImage并行加载现在有很多文章都详细介绍过,比如:https://blog.csdn.net/Androidlushangderen/article/details/100088073,所以这里就不大篇幅说了
简单总结就是开启该功能后,FSImage文件里的FILESUMMARY将会添加INODE_SUB和INODE_DIR_SUB两类SECTION的索引,将INODE和INODE_DIR两个Section进行逻辑拆分,以实现并行加载。
并行加载FSImage可以显著提高NameNode重启效率,issue链接:https://issues.apache.org/jira/browse/HDFS-14617
OIV简介
Offline Image Viewer 是将 hdfs fsimage 文件的内容转储为人类可读格式并提供只读 WebHDFS API 的工具,以便允许对 Hadoop 集群的namespace进行离线分析和检查,该工具能够相对快速地处理非常大的image文件。官网地址:https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsImageViewer.html
如果集群开启了FSImage并行加载功能,这里衍生出几个问题:
- oiv 会走这个并行加载逻辑吗? 能不能受益?
- oiv的解析产出的结果会有什么变化?
- 旧版本的oiv能解析并行格式的FSImage吗?
这里提前给出结论
- oiv不走这个加载逻辑,不受益
- oiv的解析产出的结果没有变化
- hadoop3.3.0以下版本的oiv无法解析并行格式的FSImage,需要升级到hadoop3.3.0版本,或backport HDFS-14617
相关代码
一、NameNode启动时加载FSImage的逻辑 org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.java
private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
throws IOException {
if (!FSImageUtil.checkFileFormat(raFile)) {
throw new IOException("Unrecognized file format");
}
FileSummary summary = FSImageUtil.loadSummary(raFile);
if (requireSameLayoutVersion && summary.getLayoutVersion() !=
HdfsServerConstants.NAMENODE_LAYOUT_VERSION) {
throw new IOException("Image version " + summary.getLayoutVersion() +
" is not equal to the software version " +
HdfsServerConstants.NAMENODE_LAYOUT_VERSION);
}
FileChannel channel = fin.getChannel();
FSImageFormatPBINode.Loader inodeLoader = new FSImageFormatPBINode.Loader(
fsn, this);
FSImageFormatPBSnapshot.Loader snapshotLoader = new FSImageFormatPBSnapshot.Loader(
fsn, this);
ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
.getSectionsList());
Collections.sort(sections, new Comparator<FileSummary.Section>() {
@Override
public int compare(FileSummary.Section s1, FileSummary.Section s2) {
SectionName n1 = SectionName.fromString(s1.getName());
SectionName n2 = SectionName.fromString(s2.getName());
if (n1 == null) {
return n2 == null ? 0 : -1;
} else if (n2 == null) {
return -1;
} else {
return n1.ordinal() - n2.ordinal();
}
}
});
StartupProgress prog = NameNode.getStartupProgress();
Step currentStep = null;
boolean loadInParallel = enableParallelSaveAndLoad(conf);
ExecutorService executorService = null;
ArrayList<FileSummary.Section> subSections =
getAndRemoveSubSections(sections);
if (loadInParallel) {
executorService = getParallelExecutorService();
}
for (FileSummary.Section s : sections) {
channel.position(s.getOffset());
InputStream in = new BufferedInputStream(new LimitInputStream(fin,
s.getLength()));
in = FSImageUtil.wrapInputStreamForCompression(conf,
summary.getCodec(), in);
String n = s.getName();
SectionName sectionName = SectionName.fromString(n);
if (sectionName == null) {
throw new IOException("Unrecognized section " + n);
}
ArrayList<FileSummary.Section> stageSubSections;
switch (sectionName) {
case NS_INFO:
loadNameSystemSection(in);
break;
case STRING_TABLE:
loadStringTableSection(in);
break;
case INODE: {
currentStep = new Step(StepType.INODES);
prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
stageSubSections = getSubSectionsOfName(
subSections, SectionName.INODE_SUB);
if (loadInParallel && (stageSubSections.size() > 0)) {
inodeLoader.loadINodeSectionInParallel(executorService,
stageSubSections, summary.getCodec(), prog, currentStep);
} else {
inodeLoader.loadINodeSection(in, prog, currentStep);
}
}
break;
case INODE_REFERENCE:
snapshotLoader.loadINodeReferenceSection(in);
break;
case INODE_DIR:
stageSubSections = getSubSectionsOfName(
subSections, SectionName.INODE_DIR_SUB);
if (loadInParallel && stageSubSections.size() > 0) {
inodeLoader.loadINodeDirectorySectionInParallel(executorService,
stageSubSections, summary.getCodec());
} else {
inodeLoader.loadINodeDirectorySection(in);
}
break;
case FILES_UNDERCONSTRUCTION:
inodeLoader.loadFilesUnderConstructionSection(in);
break;
case SNAPSHOT:
snapshotLoader.loadSnapshotSection(in);
break;
case SNAPSHOT_DIFF:
snapshotLoader.loadSnapshotDiffSection(in);
break;
case SECRET_MANAGER: {
prog.endStep(Phase.LOADING_FSIMAGE, currentStep);
Step step = new Step(StepType.DELEGATION_TOKENS);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
loadSecretManagerSection(in, prog, step);
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
break;
case CACHE_MANAGER: {
Step step = new Step(StepType.CACHE_POOLS);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
loadCacheManagerSection(in, prog, step);
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
break;
case ERASURE_CODING:
Step step = new Step(StepType.ERASURE_CODING_POLICIES);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
loadErasureCodingSection(in);
prog.endStep(Phase.LOADING_FSIMAGE, step);
break;
default:
LOG.warn("Unrecognized section {}", n);
break;
}
}
if (executorService != null) {
executorService.shutdown();
}
}
二、OIV解析时加载FSImage的逻辑 这里以输出xml格式举例,其他输出格式逻辑都差不多 org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter.java
public void visit(RandomAccessFile file) throws IOException {
if (!FSImageUtil.checkFileFormat(file)) {
throw new IOException("Unrecognized FSImage");
}
FileSummary summary = FSImageUtil.loadSummary(file);
try (FileInputStream fin = new FileInputStream(file.getFD())) {
out.print("<?xml version=\"1.0\"?>\n<fsimage>");
out.print("<version>");
o("layoutVersion", summary.getLayoutVersion());
o("onDiskVersion", summary.getOndiskVersion());
o("oivRevision", VersionInfo.getRevision());
out.print("</version>\n");
ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
.getSectionsList());
Collections.sort(sections, new Comparator<FileSummary.Section>() {
@Override
public int compare(FileSummary.Section s1, FileSummary.Section s2) {
SectionName n1 = SectionName.fromString(s1.getName());
SectionName n2 = SectionName.fromString(s2.getName());
if (n1 == null) {
return n2 == null ? 0 : -1;
} else if (n2 == null) {
return -1;
} else {
return n1.ordinal() - n2.ordinal();
}
}
});
for (FileSummary.Section s : sections) {
fin.getChannel().position(s.getOffset());
InputStream is = FSImageUtil.wrapInputStreamForCompression(conf,
summary.getCodec(), new BufferedInputStream(new LimitInputStream(
fin, s.getLength())));
SectionName sectionName = SectionName.fromString(s.getName());
if (sectionName == null) {
throw new IOException("Unrecognized section " + s.getName());
}
switch (sectionName) {
case NS_INFO:
dumpNameSection(is);
break;
case STRING_TABLE:
loadStringTable(is);
break;
case ERASURE_CODING:
dumpErasureCodingSection(is);
break;
case INODE:
dumpINodeSection(is);
break;
case INODE_REFERENCE:
dumpINodeReferenceSection(is);
break;
case INODE_DIR:
dumpINodeDirectorySection(is);
break;
case FILES_UNDERCONSTRUCTION:
dumpFileUnderConstructionSection(is);
break;
case SNAPSHOT:
dumpSnapshotSection(is);
break;
case SNAPSHOT_DIFF:
dumpSnapshotDiffSection(is);
break;
case SECRET_MANAGER:
dumpSecretManagerSection(is);
break;
case CACHE_MANAGER:
dumpCacheManagerSection(is);
break;
default:
break;
}
}
out.print("</fsimage>\n");
}
}
三、为什么旧版本代码无法解析并行格式的FSImage
在解析FSImage文件前需要对所有section进行排序,之后的解析也要按照顺序,由于新版本中添加了INODE_SUB,INODE_DIR_SUB等section,会导致旧版本的代码在排序时出错,之后加载顺序不对导致异常
ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
.getSectionsList());
Collections.sort(sections, new Comparator<FileSummary.Section>() {
@Override
public int compare(FileSummary.Section s1, FileSummary.Section s2) {
SectionName n1 = SectionName.fromString(s1.getName());
SectionName n2 = SectionName.fromString(s2.getName());
if (n1 == null) {
return n2 == null ? 0 : -1;
} else if (n2 == null) {
return -1;
} else {
return n1.ordinal() - n2.ordinal();
}
}
});
public enum SectionName {
NS_INFO("NS_INFO"),
STRING_TABLE("STRING_TABLE"),
EXTENDED_ACL("EXTENDED_ACL"),
ERASURE_CODING("ERASURE_CODING"),
INODE("INODE"),
INODE_SUB("INODE_SUB"),
INODE_REFERENCE("INODE_REFERENCE"),
INODE_REFERENCE_SUB("INODE_REFERENCE_SUB"),
SNAPSHOT("SNAPSHOT"),
INODE_DIR("INODE_DIR"),
INODE_DIR_SUB("INODE_DIR_SUB"),
FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"),
SNAPSHOT_DIFF("SNAPSHOT_DIFF"),
SNAPSHOT_DIFF_SUB("SNAPSHOT_DIFF_SUB"),
SECRET_MANAGER("SECRET_MANAGER"),
CACHE_MANAGER("CACHE_MANAGER");
private static final SectionName[] values = SectionName.values();
public static SectionName fromString(String name) {
for (SectionName n : values) {
if (n.name.equals(name))
return n;
}
return null;
}
private final String name;
private SectionName(String name) {
this.name = name;
}
}
|