本文主要介绍一下FSNamesystem,依赖代码为hadoop-3.3.0。
1 简介
FSNamesystem 是瞬态和持久namespace状态的容器,并在 NameNode 上完成所有记录工作。
主要作用如下:
1. 是 BlockManager、DatanodeManager、DelegationTokens、LeaseManager 等服务的容器。
2. 委托处理修改或检查namespace的 RPC 调用
3. 任何只涉及块的东西(例如块报告),它都委托给 BlockManager
4. 任何只涉及文件信息(例如权限、mkdirs)的操作,它都会委托给 FSDirectory
5. 任何跨越上述两个组件的东西都应该在这里协调。
6. 记录变动到FsEditLog
此变量管理的内容:
1. 有效文件名和blockList的映射;
2. 合法的block集
3. block和机器列表的映射
4. 机器和blockList的映射
5. 更新心跳机器的 LRU 缓存
2 FSNamesystem的初始化
FSNamesystem初始化是在NameNode#initialize()方法中的loadNamesystem(conf)完成:
...
// 根据配置中指定位置的edit和fsImage初始化FsNameSystem
loadNamesystem(conf);
...
// 这里其实是调用FSNamesystem中的loadFromDisk方法,在加载磁盘中edit日志的过程中完成了FSNamesystem的初始化
protected void loadNamesystem(Configuration conf) throws IOException {
this.namesystem = FSNamesystem.loadFromDisk(conf);
}
/**
* Instantiates an FSNamesystem loaded from the image and edits
* directories specified in the passed Configuration.
*
* @param conf the Configuration which specifies the storage directories
* from which to load
* @return an FSNamesystem which contains the loaded namespace
* @throws IOException if loading fails
*/
static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
// 检查和fsImage以及edit相关的配置
checkConfiguration(conf);
// 根据指定目录初始化FSImage
FSImage fsImage = new FSImage(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
// 初始化FSNamesystem
FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
StartupOption startOpt = NameNode.getStartupOption(conf);
if (startOpt == StartupOption.RECOVER) {
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
}
long loadStart = monotonicNow();
try {
// 加载fsImage,过程会做一系列的判断,如是否formatted,是否需要保存新的fsImage等
namesystem.loadFSImage(startOpt);
} catch (IOException ioe) {
LOG.warn("Encountered exception loading fsimage", ioe);
fsImage.close();
throw ioe;
}
long timeTakenToLoadFSImage = monotonicNow() - loadStart;
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
if (nnMetrics != null) {
nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);
}
namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime());
return namesystem;
}
如上所示,loadNamesystem会在loadFromDisk中调用FSNamesystem初始化一个实例。在初始化中主要是一些变量的赋值操作:
/**
* Create an FSNamesystem associated with the specified image.
*
* Note that this does not load any data off of disk -- if you would
* like that behavior, use {@link #loadFromDisk(Configuration)}
*
* @param conf configuration
* @param fsImage The FSImage to associate with
* @param ignoreRetryCache Whether or not should ignore the retry cache setup
* step. For Secondary NN this should be set to true.
* @throws IOException on bad configuration
*/
FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
throws IOException {
provider = DFSUtil.createKeyProviderCryptoExtension(conf);
LOG.info("KeyProvider: " + provider);
// 根据配置启用审计日志:hdfs审计日志(Auditlog)记录了用户针对hdfs的所有操作,
// 详细信息包括操作成功与否、用户名称、客户机地址、操作命令、操作的目录等。
// 对于用户的每一个操作,namenode都会将这些信息以key-value对的形式组织成固定格式的一条日志,
// 然后记录到audit.log文件中。通过审计日志,我们可以实时查看hdfs的各种操作状况、
// 可以追踪各种误操作、可以做一些指标监控等等。
if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
LOG.info("Enabling async auditlog");
enableAsyncAuditLog();
}
// 创建对应的fsNamesystem锁
fsLock = new FSNamesystemLock(conf, detailedLockHoldTimeMetrics);
cond = fsLock.newWriteLockCondition();
cpLock = new ReentrantLock();
this.fsImage = fsImage;
try {
// namenode的资源是否合规(是否小于100Mb、nn的最小冗余卷数)检查间隔
resourceRecheckInterval = conf.getLong(
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
this.fsOwner = UserGroupInformation.getCurrentUser();
this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
DFS_PERMISSIONS_ENABLED_DEFAULT);
this.isStoragePolicyEnabled =
conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY,
DFS_STORAGE_POLICY_ENABLED_DEFAULT);
this.isStoragePolicySuperuserOnly =
conf.getBoolean(DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY,
DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT);
this.snapshotDiffReportLimit =
conf.getInt(DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT,
DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT);
LOG.info("fsOwner = " + fsOwner);
LOG.info("supergroup = " + supergroup);
LOG.info("isPermissionEnabled = " + isPermissionEnabled);
LOG.info("isStoragePolicyEnabled = " + isStoragePolicyEnabled);
// block allocation has to be persisted in HA using a shared edits directory
// so that the standby has up-to-date namespace information
nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
this.haEnabled = HAUtil.isHAEnabled(conf, nameserviceId);
// Sanity check the HA-related config.
if (nameserviceId != null) {
LOG.info("Determined nameservice ID: " + nameserviceId);
}
LOG.info("HA Enabled: " + haEnabled);
if (!haEnabled && HAUtil.usesSharedEditsDir(conf)) {
LOG.warn("Configured NNs:\n" + DFSUtil.nnAddressesAsString(conf));
throw new IOException("Invalid configuration: a shared edits dir " +
"must not be specified if HA is not enabled.");
}
// block manager needs the haEnabled initialized
// 初始化BlockManager
this.blockManager = new BlockManager(this, haEnabled, conf);
// 由于dn的静态数据是通过heartbeat进行返回到dnManager的,因此这里直接返回一个heartbeatManager
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
// Get the checksum type from config
String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY,
DFS_CHECKSUM_TYPE_DEFAULT);
DataChecksum.Type checksumType;
try {
checksumType = DataChecksum.Type.valueOf(checksumTypeStr);
} catch (IllegalArgumentException iae) {
throw new IOException("Invalid checksum type in "
+ DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr);
}
try {
digest = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IOException("Algorithm 'MD5' not found");
}
this.serverDefaults = new FsServerDefaults(
conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
(short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT),
conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
checksumType,
conf.getTrimmed(
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
""),
blockManager.getStoragePolicySuite().getDefaultPolicy().getId());
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
this.minBlockSize = conf.getLongBytes(
DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT);
this.batchedListingLimit = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_BATCHED_LISTING_LIMIT,
DFSConfigKeys.DFS_NAMENODE_BATCHED_LISTING_LIMIT_DEFAULT);
Preconditions.checkArgument(
batchedListingLimit > 0,
DFSConfigKeys.DFS_NAMENODE_BATCHED_LISTING_LIMIT +
" must be greater than zero");
this.numCommittedAllowed = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY,
DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT);
this.maxCorruptFileBlocksReturn = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_MAX_CORRUPT_FILE_BLOCKS_RETURNED_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_CORRUPT_FILE_BLOCKS_RETURNED_DEFAULT);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
this.standbyShouldCheckpoint = conf.getBoolean(
DFS_HA_STANDBY_CHECKPOINTS_KEY, DFS_HA_STANDBY_CHECKPOINTS_DEFAULT);
// # edit autoroll threshold is a multiple of the checkpoint threshold
this.editLogRollerThreshold = (long)
(conf.getFloat(
DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD,
DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT) *
conf.getLong(
DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT));
this.editLogRollerInterval = conf.getInt(
DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS,
DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT);
this.lazyPersistFileScrubIntervalSec = conf.getInt(
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT);
if (this.lazyPersistFileScrubIntervalSec < 0) {
throw new IllegalArgumentException(
DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC
+ " must be zero (for disable) or greater than zero.");
}
this.edekCacheLoaderDelay = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY,
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT);
this.edekCacheLoaderInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY,
DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT);
this.leaseRecheckIntervalMs = conf.getLong(
DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_KEY,
DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_DEFAULT);
this.maxLockHoldToReleaseLeaseMs = conf.getLong(
DFS_NAMENODE_MAX_LOCK_HOLD_TO_RELEASE_LEASE_MS_KEY,
DFS_NAMENODE_MAX_LOCK_HOLD_TO_RELEASE_LEASE_MS_DEFAULT);
// For testing purposes, allow the DT secret manager to be started regardless
// of whether security is enabled.
alwaysUseDelegationTokensForTests = conf.getBoolean(
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
// 创建hdfs的令牌管理器
this.dtSecretManager = createDelegationTokenSecretManager(conf);
// 初始化FSDirectory,一个管理namespace状态的数据结构,专门用于内存中的管理,
// 而如果用将状态落盘,则使用的是FSNamesystem
this.dir = new FSDirectory(this, conf);
// 快照管理器
this.snapshotManager = new SnapshotManager(conf, dir);
// 缓存管理器:它通过处理数据节点缓存报告来维护缓存块到数据节点的映射。
// 根据这些报告以及缓存指令的添加和删除,我们将安排缓存和取消缓存工作。
this.cacheManager = new CacheManager(this, conf, blockManager);
// Init ErasureCodingPolicyManager instance.
ErasureCodingPolicyManager.getInstance().init(conf);
this.topConf = new TopConf(conf);
// 初始化审计日志
this.auditLoggers = initAuditLoggers(conf);
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
auditLoggers.get(0) instanceof DefaultAuditLogger;
this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
Class<? extends INodeAttributeProvider> klass = conf.getClass(
DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY,
null, INodeAttributeProvider.class);
if (klass != null) {
inodeAttributeProvider = ReflectionUtils.newInstance(klass, conf);
LOG.info("Using INode attribute provider: " + klass.getName());
}
this.maxListOpenFilesResponses = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES,
DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES_DEFAULT
);
Preconditions.checkArgument(maxListOpenFilesResponses > 0,
DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES +
" must be a positive integer."
);
this.allowOwnerSetQuota = conf.getBoolean(
DFSConfigKeys.DFS_PERMISSIONS_ALLOW_OWNER_SET_QUOTA_KEY,
DFSConfigKeys.DFS_PERMISSIONS_ALLOW_OWNER_SET_QUOTA_DEFAULT);
this.blockDeletionIncrement = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_INCREMENT_KEY,
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_INCREMENT_DEFAULT);
Preconditions.checkArgument(blockDeletionIncrement > 0,
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_INCREMENT_KEY +
" must be a positive integer.");
} catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
throw e;
} catch (RuntimeException re) {
LOG.error(getClass().getSimpleName() + " initialization failed.", re);
close();
throw re;
}
}
|