2021SC@SDUSC
Hadoop源码分析(六)—— NameNode实现(2)
Hadoop源码分析(五)——NameNode实现(1)
3. INodeFile 文件
INodeFile 所在的包为 org.apache.hadoop.hdfs.server.namenode,该类与目录 fNode 相对应,代表 HDFS 中的文件。
3.1 成员变量
static final FsPermission UMASK = FsPermission.createlmmutable((short)0111);
protected Blockinfo blocks[] = null;
protected long header;
static final long HEADERMASK = OxffffL << BLOCKBITS;
static final short BLOCKBITS = 48;
3.2 成员方法
protected INodeFile() {
blocks = null;
header = 0;
}
用于初始化INodeFile的成员变量的构造方法,该构造方法是protected型的可以由INodeFile的子类 来实现。
INodeFile(Permissionstatus permissions, int nrBlocks, short replication, long modificationTime, long atime, long preferredBlockSize) {
this(permissions, new Blockinfo[nrBlocks], replication, modificationTime, atime, preferredBlockSize);
}
protected INodeFile(Permissionstatus permissions, Blockinfo[] blklist, short replication, long modificationTime, long atime, long preferredBlockSize){
super(permissionsz modificationTime, atime);
this.setRepIication(replication);
this.setPreferredBlockSize(preferredBlockSize);
blocks = blklist;
}
根据指定的操作权限/组成文件的Block列表/副本数/访问时间/修改时间/以及Block的大小来创建 INodeFile 对象。
public boolean isDirectory() {
return false;
}
该方法用于标识此INode代表的是文件。
void addBlock(Blockinfo newblock) {
if (this.blocks == null) {
this.blocks = new Blockinfo[1];
this.blocks[0] = newblock;
} else {
int size = this.blocks.length;
Blockinfo[] newlist = new Blockinfo[size + 1];
System.arraycopy(this.blocks, 0, newlist, 0, size);
newlist[size] = newblock;
this.blocks = newlist;
}
}
该方法用于将一个新的数据Block添加到FNodeFile中的blocks数组中。
void setBlock(int idx. Blockinfo blk) {
this.blocks[idx] = blk;
}
根据索引信息和Block信息来设置INodeFile中的Block。
Block getLastBlock() {
if (this.blocks == null || this.blocks.length == 0)
return null;
return this.blocks[this.blocks.length - 1];
}
取得文件中的最后一个Blocko由于目前HDFS只支持在文件的末尾执行append操作,所以需要获 得最后一个Block来执行append操作。
Block getPenultimateBlock() {
if (blocks == null || blocks.length <= 1) {
return null;
}
return blocks[blocks.length - 2];
}
获得文件的倒数第二个Block。
public short getReplication() {
return (short) ((header & HEADERMASK) >> BLOCKBITS);
}
public void setReplication(short replication) {
if(replication <= 0)
throw new IllegalArgumentException ("Unexpected value for the replication"");
header = ((long)replication << BLOCKBITS) | (header & ~HEADERMASK);
}
用于取得和设置组成文件的数据Block的副本数信息的get和set方法。
public long getPreferredBlockSize() {
return header & ~HEADERMASK;
}
public void setPreferredBlockSize(long preferredBlkSize){
if((preferredBlkSize < 0) || (preferredBlkSize > ~HEADERMASK ))
throw new IllegalArgumentException("Unexpected value for the block size");
header = (header & HEADERMASK) | (preferredBlkSize & ~HEADERMASK);
}
用于取得和设置组成文件的数据Block的大小信息的get和set方法。
int collectSubtreeBlocksAndClear(List<Block> v) {
parent = null;
for (Block blk : blocks) {
v.add(blk);
}
blocks = null;
return 1;
}
将该INodeFile 下的所有Block保存到某个列表中,例如当删除某个目录下的所有文件时,就需要获 得所有的Block,然后在BlockMap中执行Block的删除操作。
NodeFileUnderConstruction toINodeFileUnderConstruction(
String clientName, String clientMachine, DatanodeDescriptor clientNode) throws lOException ( if (isUnderConstruction()) {
return (INodeFileUnderConstruction)this;
}
return new INodeFileUnderConstruction(name,
getReplication(), modificationTime, getPreferredBlockSize(), blocks, getPermissionStatus(), clientName, clientMachine, clientNode);
}
用于将该FNodeFile对象转换成NodeFileUnderConstruction对象的方法。如果该INodeFile已经被创 建了,则直接将其转换为 NodeFileUnderConstruction 实例;否则,调用 NodeFileUnderConstruction 的构 造方法来创建一个NodeFileUnderConstruction实例对象。
4. FSDirectory 文件系统目录
FSDirectory 所在的包为 org.apache.hadoop.hdfs.server.namenode,该类保存了整个 HDFS 文件系统目 录的状态。它管理向磁盘写入的数据或者从磁盘加载的数据,并且会将目录中的数据所发生的改变记录 到日志中。同时,它保存了一个最新的filename->Block list的映射表,并且将该映射表写入到磁盘中。 当系统加载FSImage时,FSImage会在FSDirectory对象上重建文件目录的状态。
4.1 成员变量
final FSNamesystem namesystem;
final INodeDirectoryWithQuota rootDir;
FSImage fsImage;
private boolean ready = false;
private final int IsLimit;
private final NameCache<ByteArray> nameCache;
4.2 成员方法
FSDirectory(FSNamesystem ns, Configuration conf) {
this(new FSImage(), ns, conf);
fslmage.setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
FSImage.getCheckpointEditsDirs(conf, null));
}
FSDirectory(FSImage fslmage, FSNamesystem ns, Configuration conf) {
rootDir = new INodeDirectoryWithQuota(rNodeDirectory.ROOT_NAME,
ns.createFsOwnerPennissions(new FsPerrnission((short) 0755)),
Integer.MAX_VALUE, -1);
this.fsImage = fsImage;
namesystem = ns;
int configuredLimit = conf.getlnt(
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
this.lsLimit = configuredLimit>0 ?
configuredLimit : DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT;
int threshold = conf.getlnt(
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY,
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT);
NameNode.LOG.info("Caching file names occuring more than" + threshold+ " times ");
nameCache = new NameCache<ByteArray>(threshold);
}
在构造方法中主要完成对成员变量的初始化工作。其中创建的根目录的操作权限为755,即 drwxrw-rw-。 void loadFSImage(Collection < File > dataDirs,Collection< File > editsDirs,StartupOption startOpt)方法用于 从文件系统镜像中加载目录树结构,loadFSImage方法会在FSNamesystem的initialize方法中被调用,是 系统初始化重要的一步。主要的处理逻辑如下:
if (startOpt == StartupOption.FORMAT) {
fslmage.setStorageDirectories(dataDirs, editsDirs);
fsImage.format();
startOpt = Startupoption.REGULAR;
}
根据Hadoop的启动项来执行对应的加载操作。如果Hadoop的启动项为FORMAT格式化,则首先 调用fslmage的setStorageDirectories方法来设置fslmage镜像文件的存储目录,然后调用fslmage的format 方法来真正执行格式化操作,最后将启动项更改为REGULAR正常启动。
if (fslmage.recoverTransitionRead(dataDirs, editsDirs, startOpt)) {
fslmage.saveNamespace(true);
}
首先调用fslmage的recoverTransitionRead方法来根据Hadoop的启动项和fslmage的存储目录来恢 复之前的事务,然后调用fslmage的saveNamespace方法来持久化镜像文件的内容,同时创建一个空的 EditLog 文件。
FSEditLog editLog = fslmage.getEditLog();
获取fslmage所对应的EditLog编辑日志。
assert editLog != null : "editLog must be initialized";
if (!editLog.isOpen())
editLog.open();
打开EditLog文件。
fslmage.setCheckpointDirectories(null, null);
设置fslmage检查点的存储目录。
synchronized (this) {
this.ready = true;
this.nameCache.initialized();
this.notifyAll();
}
}
将FSDirectory的状态标记为就绪态,同时初始化用于保存文件名称的缓存区,并通知阻塞在该 FSDirectory对象上的全部其他线程。
void waitForReady() {
if (!ready) {
synchronized (this) {
while (!ready) {
try {
this.wait(5000);
} catch (InterruptedException ie) {
}}}
}
}
该方法用于等待FSDirectory对象准备就绪并能够被使用。
|