IT数码 购物 网址 头条 软件 日历 阅读 图书馆
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
   -> 大数据 -> 2021-12-13 hadoop3 编辑日志:FsEditLog -> 正文阅读

[大数据]2021-12-13 hadoop3 编辑日志:FsEditLog


1 概述

我们知道,hdfs中的操作和状态等数据都存在与元数据中,而元数据通过fsimage和edit log管理。

当我们进行第一次namenode格式化的时候,我们会创建fsimage和editlog文件,而如果不是第一次启动,就会加载对应目录下的fsimage和edit log完成namenode的启动,可参见FSNamesystem

FSImage 是 NameNode 中关于元数据的镜像,一般称为检查点的镜像;会在内存和磁盘中各保存一份,包含了整个HDFS文件系统的所有目录和文件的信息。 对于文件来说包括了数据块描述信息、修改时间、访问时间等。 对于目录来说包括修改时间、访问权限控制信息(目录所属用户,所在组)等。


由于创建一个新的fsimage需要耗费大量的网络资源,因此Hadoop官方在进行checkpoint操作(fsimage和edit log合并)时,使用了SecondlyNamenode


1)SecondaryNameNode 通知 NameNode 停止使用 EditLog,暂时将新的写操作存放到 文件;

2)SecondaryNameNode 通过 HTTP GET 请求,从 NameNode 中获取 FSImage 和 EditLog,将它们加载到自己的内存中;

3)SecondaryNameNode 合并 FSImage 和 EditLog,合并完成后生成新的 FSImage,命名为 fsimage.ckpt;

4)SecondaryNameNode 通过 HTTP POST 请求方式,将新的 fsimage.ckpt 发送给 NameNode;

5)NameNode 把 fsimage.ckpt 改为 fsimage(覆盖掉原来的),并删掉旧的 edits 文件,把 重命名为 edits,最后更新 fstime(即最后一个检查点的时间戳)。



2 FsEditLog源码介绍

2.1 TransactionId


TransactionId与客户端每次发起的RPC操作相关, 当客户端发起一次RPC请求对Namenode的命名空间修改后, Namenode就会在editlog中发起一个新的transaction用于记录这次操作, 每个transaction会用一个唯一的transactionId标识。


  • edits_start_transaction_id-end_transaction_id:edits文件就是我们描述的editlog文件,edits文件中存放的是客户端执行的所有更新命名空间的操作。 每个edits文件都包含了文件名中start trancsaction id - end transaction id之间的所有事务。比如 edits_0000000000331043694-0000000000331043707 , 这个文件记录了transaction id在331043694和331043707之间的所有事务(transaction)
  • edits_inprogress_start_transaction_id: 正在进行处理的editlog。 所有从start transaction id开始的新的修改操作都会记录在这个文件中, 直到HDFS重置(roll) 这个日志文件。 重置操作会将inprogress文件关闭, 并将inprogress文件改名为正常的editlog文件(如上一项所示) , 同时还会打开一个新的inprogress文件, 记录正在进行的事务。 例如当前文件夹中的edits_inprogress_0000000000331043708文件, 这个文件记录了所有transaction id大于331043708的新开始的事务, 我们将这个事务区间称为一个日志段落(segment) 。Namenode元数据文件夹中存在这个文件有两种可能: 要么是Active Namenode正在写入数据, 要么是前一个Namenode没有正确地关闭。
  • fsimage_end_transaction_id:fsimage文件是Hadoop文件系统元数据的一个永久性的检查点, 包含Hadoop文件系统中end transaction id前的完整的HDFS命名空间元数据镜像, 也就是HDFS所有目录和文件对应的INode的序列化信息。 以当前文件夹为例, fsimage_0000000000363262204就是fsimage_0000000000363216289与edits_0000000000363216290-0000000000363262204(可能不是一个文件)合并后的镜像文件, 保存了transaction id小于363216289的HDFS命名空间的元数据。 每个fsimage文件还有一个对应的md5文件, 用来确保fsimage文件的正确性, 以防止磁盘异常发生。
  • seen_txid: 这个文件中保存了上一个检查点(checkpoint) (合并edits和fsimage文件) 以及编辑日志重置(editlog roll) (持久化当前的inprogress文件并且创建一个新的inprogress文件) 时最新的事务id (transaction id) 。 要特别注意的是, 这个事务id并不是Namenode内存中最新的事务id, 因为seen_txid只在检查点操作以及编辑日志重置操作时更新。 这个文件的作用在于Namenode启动时, 可以利用这个文件判断是否有edits文件丢失。 例如, Namenode使用不同的目录保存fsimage以及edits文件, 如果保存edits的目录内容丢失, Namenode将会使用上一个检查点保存的fsimage启动, 那么上一个检查点之后的所有事务都会丢失。 为了防止发生这种状况, Namenode启动时会检查seen_txid并确保内存中加载的事务id至少超过seen_txid; 否则Namenode将终止启动操作。
   * TransactionId与客户端每次发起的RPC操作相关,
   * 当客户端发起一次RPC请求对Namenode的命名空间修改后,
   * Namenode就会在editlog中发起一个新的transaction用于记录这次操作,
   * 每个transaction会用一个唯一的transactionId标识。
private static class TransactionId {
    public long txid;

    TransactionId(long value) {
        this.txid = value;

2.2 FsEditLog变量

// 初始editlog状态
private State state = State.UNINITIALIZED;

private JournalSet journalSet = null;

EditLogOutputStream editLogStream = null;

// a monotonically increasing counter that represents transactionIds.
// All of the threads which update/increment txid are synchronized,
// so make txid volatile instead of AtomicLong.
// 一个单调递增的计数器,代表 transactionId
private volatile long txid = 0;

// stores the last synced transactionId.
private long synctxid = 0;

// the first txid of the log that's currently open for writing.
// If this value is N, we are currently writing to edits_inprogress_N
private volatile long curSegmentTxId = HdfsServerConstants.INVALID_TXID;

// the time of printing the statistics to the log file.
private long lastPrintTime;

// is a sync currently running?
private volatile boolean isSyncRunning;

// is an automatic sync scheduled?
private volatile boolean isAutoSyncScheduled = false;

// these are statistics counters.
private long numTransactions;        // number of transactions
private final AtomicLong numTransactionsBatchedInSync = new AtomicLong();
private long totalTimeTransactions;  // total time for all transactions
private NameNodeMetrics metrics;

// 负责管理 NameNode 使用的 StorageDirectories
private final NNStorage storage;
private final Configuration conf;

// edit目录
private final List<URI> editsDirs;

protected final OpInstanceCache cache = new OpInstanceCache();

   * The edit directories that are shared between primary and secondary.
private final List<URI> sharedEditsDirs;

   * Take this lock when adding journals to or closing the JournalSet. Allows
   * us to ensure that the JournalSet isn't closed or updated underneath us
   * in selectInputStreams().
private final Object journalSetLock = new Object();
// stores the most current transactionId of this thread.
private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() {
    protected synchronized TransactionId initialValue() {
        return new TransactionId(Long.MAX_VALUE);

2.3 FsEditLog构造函数

FSEditLog 是通过newInstance方法进行构造的, 可以根据配置dfs.namenode.edits.asynclogging 生成不同的FSEditLog 实例, 默认是 FSEditLogAsync

static FSEditLog newInstance(Configuration conf, NNStorage storage,
                             List<URI> editsDirs) {
    // dfs.namenode.edits.asynclogging,默认为true
    boolean asyncEditLogging = conf.getBoolean(
        DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT);"Edit logging is async:" + asyncEditLogging);
    return asyncEditLogging
        ? new FSEditLogAsync(conf, storage, editsDirs)
        : new FSEditLog(conf, storage, editsDirs);

FSEditLogAsync(Configuration conf, NNStorage storage, List<URI> editsDirs) {
    super(conf, storage, editsDirs);
    // op instances cannot be shared due to queuing for background thread.
 * Constructor for FSEditLog. Underlying journals are constructed, but 
 * no streams are opened until open() is called.
 * 构建了底层日志,但在调用 open() 之前不会打开任何流
 * @param conf The namenode configuration
 * @param storage Storage object used by namenode
 * @param editsDirs List of journals to use
FSEditLog(Configuration conf, NNStorage storage, List<URI> editsDirs) {
    isSyncRunning = false;
    this.conf = conf; = storage;
    metrics = NameNode.getNameNodeMetrics();
    lastPrintTime = monotonicNow();

    // If this list is empty, an error will be thrown on first use
    // of the editlog, as no journals will exist
    this.editsDirs = Lists.newArrayList(editsDirs);

    // dfs.namenode.shared.edits.dir:HA 集群中多个 namenode 之间共享存储上的目录。
    // 该目录将由活动写入并由备用读取,以保持名称空间同步。它应该在非 HA 集群中留空。
    this.sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);

2.4 edit状态机


 * State machine for edit log.
 * In a non-HA setup:
 * The log starts in UNINITIALIZED state upon construction. Once it's
 * initialized, it is usually in IN_SEGMENT state, indicating that edits may
 * be written. In the middle of a roll, or while saving the namespace, it
 * briefly enters the BETWEEN_LOG_SEGMENTS state, indicating that the previous
 * segment has been closed, but the new one has not yet been opened.
 * In an HA setup:
 * The log starts in UNINITIALIZED state upon construction. Once it's
 * initialized, it sits in the OPEN_FOR_READING state the entire time that the
 * NN is in standby. Upon the NN transition to active, the log will be CLOSED,
 * and then move to being BETWEEN_LOG_SEGMENTS, much as if the NN had just
 * started up, and then will move to IN_SEGMENT so it can begin writing to the
 * log. The log states will then revert to behaving as they do in a non-HA
 * setup.
private enum State {
    UNINITIALIZED,  // 初始状态
    // 在非ha状态时,短暂存在于前一个segment关闭,下一个完成初始化之前
    // 在ha状态,standby nn切换为active nn的过程中
    // editlog处于可写状态
    // standby状态时即为此状态
    // standby 转换为active时,editLog文件会关闭

2.4.1 简单介绍几个方法

  • initJournalsForWrite:将FSEditLog从UNINITIALIZED状态转换为BETWEEN_LOG_SEGMENTS状态,其中调用initJournals()方法会根据传入的dirs 变量初始化journalSet,初始化完成后可以调用方法往其中写入editLog了
  • initSharedJournalsForRead:用在HA情况下。 调用这个方法会将FSEditLog从UNINITIALIZED状态转换为OPEN_FOR_READING状态。与initJournalsForWrite()方法相同, initSharedJournalsForRead()方法也调用了initJournals()方法执行初始化操作, 只不过editlog文件的存储位置不同, 在HA的情况下,editlog文件的存储目录为共享存储目录, 这个共享存储目录由Active Namenode和StandbyNamenode共享读取
  • openForWrite:在非HA机制下, 调用这个方法会完成BETWEEN_LOG_SEGMENTS状态到IN_SEGMENT状态的转换
  • endCurrentLogSegment:endCurrentLogSegment()会将当前正在写入的日志段落关闭, 它调用了journalSet.finalizeLogSegment()方法将curSegmentTxid -> lastTxId之间的操作持久化到磁盘上
  • close:close()方法用于关闭editlog文件的存储, 完成了IN_SEGMENT到CLOSED状态的改变。 close()会首先等待sync操作完成, 然后调用endCurrentLogSegment()方法, 将当前正在进行写操作的日志段落结束。 之后close()方法会关闭journalSet对象, 并将FSEditLog状态机转变为CLOSED状态。

2.5 重要方法

2.5.1 事务相关

  • beginTransaction开启一个事务
private long beginTransaction() {
    assert Thread.holdsLock(this);
    // get a new transactionId

    // record the transactionId when new data was written to the edits log
    // 更新transactionId中的txid
    TransactionId id = myTransactionId.get();
    id.txid = txid;
    return monotonicNow();
  • endTransaction
private void endTransaction(long start) {
    assert Thread.holdsLock(this);

    // update statistics
    long end = monotonicNow();
    totalTimeTransactions += (end-start);
    if (metrics != null) // Metrics is non-null only when used inside name node

2.5.2 logEdit


   * Write an operation to the edit log.
   * <p/>
   * Additionally, this will sync the edit log if required by the underlying
   * edit stream's automatic sync policy (e.g. when the buffer is full, or
   * if a time interval has elapsed).
void logEdit(final FSEditLogOp op) {
    boolean needsSync = false;
    synchronized (this) {
        assert isOpenForWrite() :
        "bad state: " + state;

        // wait if an automatic sync is scheduled
        // 根据是否开启了自动同步,如果开启了,等待同步被调度

        // check if it is time to schedule an automatic sync
        // 开启edit事务,并更新needSync变量的值
        // 最后会调用EditLogFileOutputStream#shouldForceSync方法更新
        needsSync = doEditTransaction(op);
        if (needsSync) {
            isAutoSyncScheduled = true;

    // Sync the log if an automatic sync is required.
    // 是否进行同步,后面在仔细分析logSync方法
    if (needsSync) {

synchronized boolean doEditTransaction(final FSEditLogOp op) {
    long start = beginTransaction();

    try {
    } catch (IOException ex) {
        // All journals failed, it is handled in logSync.
    } finally {
    return shouldForceSync();

logEdit()方法会调用beginTransaction()方法在editlog文件中开启一个新的transaction, 然后使用editlog输入流写入要被记录的操作, 接下来调用endTransaction()方法关闭这个transaction, 最后调用logSync()方法将写入的信息同步到磁盘上。

logEdit()方法调用beginTransaction()、 editLogStream.write()以及endTransaction()三个方法时使用了synchronized关键字进行同步操作, 这样就保证了多个线程调用FSEditLog.logEdit()方法向editlog文件中写数据时, editlog文件记录的内容不会相互影响。 同时, 也保证了这几个并发线程保存操作对应的transactionId(通过调用beginTransaction()方法获得) 是唯一并递增的。

logEdit()方法中调用logSync()方法执行刷新操作的语句并不在synchronized代码段中。 这是因为调用logSync()方法必然会触发写editlog文件的磁盘操作, 这是一个非常耗时的操作, 如果放入同步模块中会造成其他调用FSEditLog.logSync()线程的等待时间过长。 所以, HDFS设计者将需要进行同步操作的synchronized代码段放入logSync()方法中, 也就让输出日志记录和刷新缓冲区数据到磁盘这两个操作分离了。 同时, 利用EditLogOutputStream的两个缓冲区, 使得日志记录和刷新缓冲区数据这两个操作可以并发执行, 大大地提高了Namenode的吞吐量。

2.5.3 logDelete


logDelete()方法首先会构造一个DeleteOp对象, 这个DeleteOp类是FSEditLogOp类的子类, 用于记录删除操作的相关信息, 包括了ClientProtocol.delete()调用中所有参数携带的信息。构造DeleteOp对象后, logDelete()方法会调用logRpcIds()方法在DeleteOp对象中添加RPC调用相关信息, 之后logDelete()方法会调用logEdit()方法在editlog文件中记录这次删除操作。

   * Add delete file record to edit log
void logDelete(String src, long timestamp, boolean toLogRpcIds) {
    DeleteOp op = DeleteOp.getInstance(cache.get())
    logRpcIds(op, toLogRpcIds);

2.5.4 logSync

logEdit()方法通过调用beginTransaction()方法成功地获取一个transactionId之后, 就会通过输出流向editlog文件写数据以记录当前的操作, 但是写入的这些数据并没有直接保存在editlog文件中, 而是暂存在输出流的缓冲区中。 所以当logEdit()方法将一个完整的操作写入输出流后, 需要调用logSync()方法同步当前线程对editlog文件所做的修改。

对比这个transactionId和已经同步到editlog文件中的transactionId。 如果当前线程的transactionId大于editlog文件中的transactionId, 则表明editlog文件中记录的数据不是最新的, 同时如果当前没有别的线程执行同步操作, 则开始同步操作将输出流缓存中的数据写入editlog文件中。


  1. 判断当前操作是否已经同步到了editLog文件中,如果尚未同步,则标记isSyncRunning为true,且交换内存,做同步准备;
  1. 未同步,刷新数据到磁盘中,过程中耗时较久,因此不加锁,因为在上一段同步代码中已经将双buffer调换了位置, 不会有线程向同步缓存中插入新操作;
  2. 重置isSyncRunning标志位, 并且通知等待的线程, 这部分代码需要进行加锁
   * Sync all modifications done by this thread.
   * The internal concurrency design of this class is as follows:
   *   - Log items are written synchronized into an in-memory buffer,
   *     and each assigned a transaction ID.
   *   - When a thread (client) would like to sync all of its edits, logSync()
   *     uses a ThreadLocal transaction ID to determine what edit number must
   *     be synced to.
   *   - The isSyncRunning volatile boolean tracks whether a sync is currently
   *     under progress.
   * The data is double-buffered within each edit log implementation so that
   * in-memory writing can occur in parallel with the on-disk writing.
   * Each sync occurs in three steps:
   *   1. synchronized, it swaps the double buffer and sets the isSyncRunning
   *      flag.
   *   2. unsynchronized, it flushes the data to storage
   *   3. synchronized, it resets the flag and notifies anyone waiting on the
   *      sync.
   * The lack of synchronization on step 2 allows other threads to continue
   * to write into the memory buffer while the sync is in progress.
   * Because this step is unsynchronized, actions that need to avoid
   * concurrency with sync() should be synchronized and also call
   * waitForSyncToFinish() before assuming they are running alone.
public void logSync() {
    // Fetch the transactionId of this thread.

// mytxid是ThreadLocal变量myTransactionId中存储的当前线程需要同步的txid
protected void logSync(long mytxid) {
    long syncStart = 0;
    boolean sync = false;
    long editsBatchedInSync = 0;
    try {
        EditLogOutputStream logStream = null;
        synchronized (this) {
            try {

                // if somebody is already syncing, then wait
                // 当前txid大于editlog中已经同步的txid,并且有线程正在同步, 则等待
                while (mytxid > synctxid && isSyncRunning) {
                    try {
                    } catch (InterruptedException ie) {

                // If this transaction was already flushed, then nothing to do
                // 如果txid小于editlog中已经同步的txid, 则表明当前操作已经被同步到存储上, 不需要再次同步
                if (mytxid <= synctxid) {

                // now, this thread will do the sync.  track if other edits were
                // included in the sync - ie. batched.  if this is the only edit
                // synced then the batched count is 0
                // 开启同步,在此过程中将isSyncRunning置为true
                editsBatchedInSync = txid - synctxid - 1;
                syncStart = txid;
                isSyncRunning = true;
                sync = true;

                // swap buffers
                try {
                    if (journalSet.isEmpty()) {
                        throw new IOException("No journals available to flush");
                    // 双缓存交换内存
                } catch (IOException e) {
                    final String msg =
                        "Could not sync enough journals to persistent storage " +
                        "due to " + e.getMessage() + ". " +
                        "Unsynced transactions: " + (txid - synctxid);
                    LOG.error(msg, new Exception());
                    synchronized(journalSetLock) {
                        IOUtils.cleanupWithLogger(LOG, journalSet);
                    terminate(1, msg);
            } finally {
                // Prevent RuntimeException from blocking other log edit write
                // 防止其他log edit 写入阻塞, 引起的RuntimeException,唤醒其他等待的线程
            //editLogStream may become null,
            //so store a local variable for flush.
            logStream = editLogStream;

        // do the sync
        long start = monotonicNow();
        try {
            // 刷写数据,由于此过程耗时较久,因此将此操作移出同步代码块中
            if (logStream != null) {
        } catch (IOException ex) {
            synchronized (this) {
                final String msg =
                    "Could not sync enough journals to persistent storage. "
                    + "Unsynced transactions: " + (txid - synctxid);
                LOG.error(msg, new Exception());
                synchronized(journalSetLock) {
                    IOUtils.cleanupWithLogger(LOG, journalSet);
                terminate(1, msg);
        long elapsed = monotonicNow() - start;

        if (metrics != null) { // Metrics non-null only when used inside name node

    } finally {
        // Prevent RuntimeException from blocking other log edit sync 
        synchronized (this) {
            if (sync) {
                // 已同步txid赋值为开始sync操作的txid
                synctxid = syncStart;
                for (JournalManager jm : journalSet.getJournalManagers()) {
             * {@link FileJournalManager#lastReadableTxId} is only meaningful
             * for file-based journals. Therefore the interface is not added to
             * other types of {@link JournalManager}.
                    if (jm instanceof FileJournalManager) {
                isSyncRunning = false;

3 EditLogOutputStream

在FSEditLog#doEditTransaction中FSEditLog类会调用FSEditLog.editLogStream字段的write()方法在editlog文件中记录一个操作, 数据会先被写入到editlog文件输出流的缓存中, 然后FSEditLog类会调用editLogStream.flush()方法将缓存中的数据同步到磁盘上。

synchronized boolean doEditTransaction(final FSEditLogOp op) {
    long start = beginTransaction();

    try {
    } catch (IOException ex) {
        // All journals failed, it is handled in logSync.
    } finally {
    return shouldForceSync();


3.1 JournalSetOutputStream

JournalSetOutputStream类是EditLogOutputStream的子类, 在JournalSetOutputStream对象上调用的所有EditLogOutputStream接口方法都会被前转到FSEditLog.journalSet字段中保存的editlog文件在所有存储位置上的输出流对象(通过调用mapJournalsAndReportErrors()方法实现) 。

FSEditLog的editLogStream字段就是JournalSetOutputStream类型的(是在startLogSegment()方法中赋值的) , 通过调用JournalSetOutputStream对象提供的方法, FSEditLog可以将Namenode多个存储位置上的editlog文件输出流对外封装成一个输出流, 大大方便了调用。

JournalSetOutputStream类是通过mapJournalsAndReportErrors()方法, 将EditLogOutputStream接口上的write()调用前转到了FSEditLog中保存的所有存储路径上editlog文件对应的EditLogOutputStream输出流对象上的。 这个方法会遍历FSEditLog.journalSet.journals集合, 然后将write()请求前转到journals集合中保存的所有JournalAndStream对象上。 journalSet的journals字段是一个JournalAndStream对象的集合,JournalAndStream对象封装了一个JournalManager对象, 以及在这个JournalManager上打开的editlog文件的EditLogOutputStream对象。

journalSet.journals字段是在FSEditLog.startLogSegment()方法中赋值的 , 这个方法调用了journalSet.startLogSegment()方法在所有editlog文件的存储路径上构造输出流, 并将这些输出流保存在FSEditLog的journalSet.journals字段中。

public void write(final FSEditLogOp op)
    throws IOException {
    // 调用mapJournalsAndReportErrors方法
    mapJournalsAndReportErrors(new JournalClosure() {
        public void apply(JournalAndStream jas) throws IOException {
            if (jas.isActive()) {
                // 获取当前的JournalAndStream对应的EditLogFileOutputStream#write方法
                // 往editLog文件中写入数据
    }, "write op");

   * Apply the given operation across all of the journal managers, disabling
   * any for which the closure throws an IOException.
   * @param closure {@link JournalClosure} object encapsulating the operation.
   * @param status message used for logging errors (e.g. "opening journal")
   * @throws IOException If the operation fails on all the journals.
private void mapJournalsAndReportErrors(
    JournalClosure closure, String status) throws IOException{

    List<JournalAndStream> badJAS = Lists.newLinkedList();
    for (JournalAndStream jas : journals) {
        try {
            // 回调apply方法
        } catch (Throwable t) {
            if (jas.isRequired()) {
                final String msg = "Error: " + status + " failed for required journal ("
                    + jas + ")";
                LOG.error(msg, t);
                // If we fail on *any* of the required journals, then we must not
                // continue on any of the other journals. Abort them to ensure that
                // retry behavior doesn't allow them to keep going in any way.
                // the current policy is to shutdown the NN on errors to shared edits
                // dir. There are many code paths to shared edits failures - syncs,
                // roll of edits etc. All of them go through this common function 
                // where the isRequired() check is made. Applying exit policy here 
                // to catch all code paths.
                terminate(1, msg);
            } else {
                LOG.error("Error: " + status + " failed for (journal " + jas + ")", t);
    if (!NameNodeResourcePolicy.areResourcesAvailable(journals,
                                                      minimumRedundantJournals)) {
        String message = status + " failed for too many journals";
        LOG.error("Error: " + message);
        throw new IOException(message);

3.2 EditFileOutputStream

EditLogFileOutputStream是向本地文件系统中保存的editlog文件写数据的输出流, 向EditLogFileOutputStream写数据时, 数据首先被写入到输出流的缓冲区中, 当显式地调用flush()操作后, 数据才会从缓冲区同步到editlog文件中。

3.2.1 构造函数

   * Creates output buffers and file object.
   * @param conf
   *          Configuration object
   * @param name
   *          File name to store edit log
   * @param size
   *          Size of flush buffer
   * @throws IOException
public EditLogFileOutputStream(Configuration conf, File name, int size)
    throws IOException {
    shouldSyncWritesAndSkipFsync = conf.getBoolean(

    file = name;
    doubleBuf = new EditsDoubleBuffer(size);
    RandomAccessFile rp;
    if (shouldSyncWritesAndSkipFsync) {
        rp = new RandomAccessFile(name, "rws");
    } else {
        rp = new RandomAccessFile(name, "rw");
    fp = new FileOutputStream(rp.getFD()); // open for append
    fc = rp.getChannel();

3.2.2 变量

public static final int MIN_PREALLOCATION_LENGTH = 1024 * 1024;

// 输出流对应的editlog文件。
private File file;

// editlog文件对应的输出流。
private FileOutputStream fp; // file stream for storing edit logs

// editlog文件对应的输出流通道。
private FileChannel fc; // channel of the file stream for sync

// 一个具有两块缓存的缓冲区, 数据必须先写入缓存, 然后再由缓存同步到磁盘上。
private EditsDoubleBuffer doubleBuf;

//用来扩充editlog文件大小的数据块。 当要进行同步操作时,如果editlog文件不够大, 
// 则使用fill来扩充editlog,文件最小1M
static final ByteBuffer fill = ByteBuffer.allocateDirect(MIN_PREALLOCATION_LENGTH);

private boolean shouldSyncWritesAndSkipFsync = false;

private static boolean shouldSkipFsyncForTests = false;

// EditLogFileOutputStream有一个static的代码段, 将fill字段用
// FSEditLogOpCodes.OP_INVALID 字节填满。
// 在创建edit inprocess文件时,首先会用"-1"填充1M大小的文件空间,然后将写入的指针归0
static {
    for (int i = 0; i < fill.capacity(); i++) {

3.2.3 写数据


public void write(FSEditLogOp op) throws IOException {
    doubleBuf.writeOp(op, getCurrentLogVersion());

public void writeOp(FSEditLogOp op, int logVersion) throws IOException {
    bufCurrent.writeOp(op, logVersion);

public void writeOp(FSEditLogOp op, int logVersion) throws IOException {
    if (firstTxId == HdfsServerConstants.INVALID_TXID) {
        firstTxId = op.txid;
    } else {
        assert op.txid > firstTxId;
    writer.writeOp(op, logVersion);

3.2.4 EditsDoubleBuffer


private TxnBuffer bufCurrent; // current buffer for writing

private TxnBuffer bufReady; // buffer ready for flushing

//缓冲区的大小   默认 512K
private final int initBufferSize;

输出流要进行同步操作时, 首先要调用EditsDoubleBuffer.setReadyToFlush()方法交换两个缓冲区, 将正在写入的缓存改变为同步缓存, 然后才可以进行同步操作。

public void setReadyToFlush() {
    assert isFlushed() : "previous data not flushed yet";
    TxnBuffer tmp = bufReady;
    bufReady = bufCurrent;
    bufCurrent = tmp;
// 完成了setReadyToFlush()调用之后, 
// 输出流就可以调用flushTo()方法将同步缓存中的数据写入到文件中。
   * Writes the content of the "ready" buffer to the given output stream,
   * and resets it. Does not swap any buffers.
public void flushTo(OutputStream out) throws IOException {
    bufReady.writeTo(out); // write data to file
    bufReady.reset(); // erase all data in the buffer

  大数据 最新文章
亚马逊云科技:还在苦于ETL?Zero ETL的时代
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
专题五 Redis高并发场景
上一篇文章      下一篇文章      查看所有文章
加:2021-12-14 16:00:53  更:2021-12-14 16:03:31 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年3日历 -2025/3/6 11:41:04-

  网站联系: qq:121756557  IT数码