IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 2021-11-11 Hadoop3 FSNamesystem -> 正文阅读

[大数据]2021-11-11 Hadoop3 FSNamesystem

本文主要介绍一下FSNamesystem,依赖代码为hadoop-3.3.0。

1 简介

FSNamesystem 是瞬态和持久namespace状态的容器,并在 NameNode 上完成所有记录工作。

主要作用如下:

1. 是 BlockManager、DatanodeManager、DelegationTokens、LeaseManager 等服务的容器。

2. 委托处理修改或检查namespace的 RPC 调用

3. 任何只涉及块的东西(例如块报告),它都委托给 BlockManager

4. 任何只涉及文件信息(例如权限、mkdirs)的操作,它都会委托给 FSDirectory

5. 任何跨越上述两个组件的东西都应该在这里协调。

6. 记录变动到FsEditLog

此变量管理的内容:

1. 有效文件名和blockList的映射;

2. 合法的block集

3. block和机器列表的映射

4. 机器和blockList的映射

5. 更新心跳机器的 LRU 缓存

2 FSNamesystem的初始化

FSNamesystem初始化是在NameNode#initialize()方法中的loadNamesystem(conf)完成:

...
// 根据配置中指定位置的edit和fsImage初始化FsNameSystem
loadNamesystem(conf);
...
// 这里其实是调用FSNamesystem中的loadFromDisk方法,在加载磁盘中edit日志的过程中完成了FSNamesystem的初始化
protected void loadNamesystem(Configuration conf) throws IOException {
    this.namesystem = FSNamesystem.loadFromDisk(conf);
}
/**
   * Instantiates an FSNamesystem loaded from the image and edits
   * directories specified in the passed Configuration.
   *
   * @param conf the Configuration which specifies the storage directories
   *             from which to load
   * @return an FSNamesystem which contains the loaded namespace
   * @throws IOException if loading fails
   */
  static FSNamesystem loadFromDisk(Configuration conf) throws IOException {

    // 检查和fsImage以及edit相关的配置
    checkConfiguration(conf);
    // 根据指定目录初始化FSImage
    FSImage fsImage = new FSImage(conf,
        FSNamesystem.getNamespaceDirs(conf),
        FSNamesystem.getNamespaceEditsDirs(conf));
    // 初始化FSNamesystem
    FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
    StartupOption startOpt = NameNode.getStartupOption(conf);
    if (startOpt == StartupOption.RECOVER) {
      namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
    }

    long loadStart = monotonicNow();
    try {
      // 加载fsImage,过程会做一系列的判断,如是否formatted,是否需要保存新的fsImage等
      namesystem.loadFSImage(startOpt);
    } catch (IOException ioe) {
      LOG.warn("Encountered exception loading fsimage", ioe);
      fsImage.close();
      throw ioe;
    }
    long timeTakenToLoadFSImage = monotonicNow() - loadStart;
    LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
    NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
    if (nnMetrics != null) {
      nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);
    }
    namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime());
    return namesystem;
  }
如上所示,loadNamesystem会在loadFromDisk中调用FSNamesystem初始化一个实例。在初始化中主要是一些变量的赋值操作:

/**
 * Create an FSNamesystem associated with the specified image.
 * 
 * Note that this does not load any data off of disk -- if you would
 * like that behavior, use {@link #loadFromDisk(Configuration)}
 *
 * @param conf configuration
 * @param fsImage The FSImage to associate with
 * @param ignoreRetryCache Whether or not should ignore the retry cache setup
 *                         step. For Secondary NN this should be set to true.
 * @throws IOException on bad configuration
 */
FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
    throws IOException {
    provider = DFSUtil.createKeyProviderCryptoExtension(conf);
    LOG.info("KeyProvider: " + provider);
    // 根据配置启用审计日志:hdfs审计日志(Auditlog)记录了用户针对hdfs的所有操作,
    // 详细信息包括操作成功与否、用户名称、客户机地址、操作命令、操作的目录等。
    // 对于用户的每一个操作,namenode都会将这些信息以key-value对的形式组织成固定格式的一条日志,
    // 然后记录到audit.log文件中。通过审计日志,我们可以实时查看hdfs的各种操作状况、
    // 可以追踪各种误操作、可以做一些指标监控等等。
    if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
                        DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
        LOG.info("Enabling async auditlog");
        enableAsyncAuditLog();
    }
    // 创建对应的fsNamesystem锁
    fsLock = new FSNamesystemLock(conf, detailedLockHoldTimeMetrics);
    cond = fsLock.newWriteLockCondition();
    cpLock = new ReentrantLock();

    this.fsImage = fsImage;
    try {
        // namenode的资源是否合规(是否小于100Mb、nn的最小冗余卷数)检查间隔
        resourceRecheckInterval = conf.getLong(
            DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
            DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);

        this.fsOwner = UserGroupInformation.getCurrentUser();
        this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY, 
                                   DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
        this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
                                                   DFS_PERMISSIONS_ENABLED_DEFAULT);

        this.isStoragePolicyEnabled =
            conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY,
                            DFS_STORAGE_POLICY_ENABLED_DEFAULT);
        this.isStoragePolicySuperuserOnly =
            conf.getBoolean(DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY,
                            DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT);

        this.snapshotDiffReportLimit =
            conf.getInt(DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT,
                        DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT);

        LOG.info("fsOwner                = " + fsOwner);
        LOG.info("supergroup             = " + supergroup);
        LOG.info("isPermissionEnabled    = " + isPermissionEnabled);
        LOG.info("isStoragePolicyEnabled = " + isStoragePolicyEnabled);

        // block allocation has to be persisted in HA using a shared edits directory
        // so that the standby has up-to-date namespace information
        nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
        this.haEnabled = HAUtil.isHAEnabled(conf, nameserviceId);  

        // Sanity check the HA-related config.
        if (nameserviceId != null) {
            LOG.info("Determined nameservice ID: " + nameserviceId);
        }
        LOG.info("HA Enabled: " + haEnabled);
        if (!haEnabled && HAUtil.usesSharedEditsDir(conf)) {
            LOG.warn("Configured NNs:\n" + DFSUtil.nnAddressesAsString(conf));
            throw new IOException("Invalid configuration: a shared edits dir " +
                                  "must not be specified if HA is not enabled.");
        }

        // block manager needs the haEnabled initialized
        // 初始化BlockManager
        this.blockManager = new BlockManager(this, haEnabled, conf);
        // 由于dn的静态数据是通过heartbeat进行返回到dnManager的,因此这里直接返回一个heartbeatManager
        this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();

        // Get the checksum type from config
        String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY,
                                          DFS_CHECKSUM_TYPE_DEFAULT);
        DataChecksum.Type checksumType;
        try {
            checksumType = DataChecksum.Type.valueOf(checksumTypeStr);
        } catch (IllegalArgumentException iae) {
            throw new IOException("Invalid checksum type in "
                                  + DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr);
        }

        try {
            digest = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new IOException("Algorithm 'MD5' not found");
        }

        this.serverDefaults = new FsServerDefaults(
            conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
            conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
            conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
            (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
            conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
            conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT),
            conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
            checksumType,
            conf.getTrimmed(
                CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
                ""),
            blockManager.getStoragePolicySuite().getDefaultPolicy().getId());

        this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, 
                                         DFS_NAMENODE_MAX_OBJECTS_DEFAULT);

        this.minBlockSize = conf.getLongBytes(
            DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY,
            DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
        this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY,
                                             DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT);
        this.batchedListingLimit = conf.getInt(
            DFSConfigKeys.DFS_NAMENODE_BATCHED_LISTING_LIMIT,
            DFSConfigKeys.DFS_NAMENODE_BATCHED_LISTING_LIMIT_DEFAULT);
        Preconditions.checkArgument(
            batchedListingLimit > 0,
            DFSConfigKeys.DFS_NAMENODE_BATCHED_LISTING_LIMIT +
            " must be greater than zero");
        this.numCommittedAllowed = conf.getInt(
            DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY,
            DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT);

        this.maxCorruptFileBlocksReturn = conf.getInt(
            DFSConfigKeys.DFS_NAMENODE_MAX_CORRUPT_FILE_BLOCKS_RETURNED_KEY,
            DFSConfigKeys.DFS_NAMENODE_MAX_CORRUPT_FILE_BLOCKS_RETURNED_DEFAULT);

        this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);

        this.standbyShouldCheckpoint = conf.getBoolean(
            DFS_HA_STANDBY_CHECKPOINTS_KEY, DFS_HA_STANDBY_CHECKPOINTS_DEFAULT);
        // # edit autoroll threshold is a multiple of the checkpoint threshold 
        this.editLogRollerThreshold = (long)
            (conf.getFloat(
                DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD,
                DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT) *
             conf.getLong(
                 DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
                 DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT));
        this.editLogRollerInterval = conf.getInt(
            DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS,
            DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT);

        this.lazyPersistFileScrubIntervalSec = conf.getInt(
            DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
            DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT);

        if (this.lazyPersistFileScrubIntervalSec < 0) {
            throw new IllegalArgumentException(
                DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC
                + " must be zero (for disable) or greater than zero.");
        }

        this.edekCacheLoaderDelay = conf.getInt(
            DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY,
            DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT);
        this.edekCacheLoaderInterval = conf.getInt(
            DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY,
            DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT);

        this.leaseRecheckIntervalMs = conf.getLong(
            DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_KEY,
            DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_DEFAULT);
        this.maxLockHoldToReleaseLeaseMs = conf.getLong(
            DFS_NAMENODE_MAX_LOCK_HOLD_TO_RELEASE_LEASE_MS_KEY,
            DFS_NAMENODE_MAX_LOCK_HOLD_TO_RELEASE_LEASE_MS_DEFAULT);

        // For testing purposes, allow the DT secret manager to be started regardless
        // of whether security is enabled.
        alwaysUseDelegationTokensForTests = conf.getBoolean(
            DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
            DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);

        // 创建hdfs的令牌管理器
        this.dtSecretManager = createDelegationTokenSecretManager(conf);
        // 初始化FSDirectory,一个管理namespace状态的数据结构,专门用于内存中的管理,
        // 而如果用将状态落盘,则使用的是FSNamesystem
        this.dir = new FSDirectory(this, conf);
        // 快照管理器
        this.snapshotManager = new SnapshotManager(conf, dir);
        // 缓存管理器:它通过处理数据节点缓存报告来维护缓存块到数据节点的映射。
        // 根据这些报告以及缓存指令的添加和删除,我们将安排缓存和取消缓存工作。
        this.cacheManager = new CacheManager(this, conf, blockManager);
        // Init ErasureCodingPolicyManager instance.
        ErasureCodingPolicyManager.getInstance().init(conf);
        this.topConf = new TopConf(conf);
        // 初始化审计日志
        this.auditLoggers = initAuditLoggers(conf);
        this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
            auditLoggers.get(0) instanceof DefaultAuditLogger;
        this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
        Class<? extends INodeAttributeProvider> klass = conf.getClass(
            DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY,
            null, INodeAttributeProvider.class);
        if (klass != null) {
            inodeAttributeProvider = ReflectionUtils.newInstance(klass, conf);
            LOG.info("Using INode attribute provider: " + klass.getName());
        }
        this.maxListOpenFilesResponses = conf.getInt(
            DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES,
            DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES_DEFAULT
        );
        Preconditions.checkArgument(maxListOpenFilesResponses > 0,
                                    DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES +
                                    " must be a positive integer."
                                   );
        this.allowOwnerSetQuota = conf.getBoolean(
            DFSConfigKeys.DFS_PERMISSIONS_ALLOW_OWNER_SET_QUOTA_KEY,
            DFSConfigKeys.DFS_PERMISSIONS_ALLOW_OWNER_SET_QUOTA_DEFAULT);
        this.blockDeletionIncrement = conf.getInt(
            DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_INCREMENT_KEY,
            DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_INCREMENT_DEFAULT);
        Preconditions.checkArgument(blockDeletionIncrement > 0,
                                    DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_INCREMENT_KEY +
                                    " must be a positive integer.");
    } catch(IOException e) {
        LOG.error(getClass().getSimpleName() + " initialization failed.", e);
        close();
        throw e;
    } catch (RuntimeException re) {
        LOG.error(getClass().getSimpleName() + " initialization failed.", re);
        close();
        throw re;
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-12 19:39:55  更:2021-11-12 19:41:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 0:29:01-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码