基于源码hadoop-3.3.0
1 概述
我们知道,hdfs中的操作和状态等数据都存在与元数据中,而元数据通过fsimage和edit log管理。
当我们进行第一次namenode格式化的时候,我们会创建fsimage和editlog文件,而如果不是第一次启动,就会加载对应目录下的fsimage和edit log完成namenode的启动,可参见FSNamesystem。
FSImage 是 NameNode 中关于元数据的镜像,一般称为检查点的镜像;会在内存和磁盘中各保存一份,包含了整个HDFS文件系统的所有目录和文件的信息。 对于文件来说包括了数据块描述信息、修改时间、访问时间等。 对于目录来说包括修改时间、访问权限控制信息(目录所属用户,所在组)等。
editlog主要是在NameNode已经启动情况下对HDFS进行的各种更新操作进行记录,HDFS客户端执行所有的写操作都会被记录到editlog中。
由于创建一个新的fsimage需要耗费大量的网络资源,因此Hadoop官方在进行checkpoint操作(fsimage和edit log合并)时,使用了SecondlyNamenode
主要操作步骤如下:
1)SecondaryNameNode 通知 NameNode 停止使用 EditLog,暂时将新的写操作存放到 edits.new 文件;
2)SecondaryNameNode 通过 HTTP GET 请求,从 NameNode 中获取 FSImage 和 EditLog,将它们加载到自己的内存中;
3)SecondaryNameNode 合并 FSImage 和 EditLog,合并完成后生成新的 FSImage,命名为 fsimage.ckpt;
4)SecondaryNameNode 通过 HTTP POST 请求方式,将新的 fsimage.ckpt 发送给 NameNode;
5)NameNode 把 fsimage.ckpt 改为 fsimage(覆盖掉原来的),并删掉旧的 edits 文件,把 edits.new 重命名为 edits,最后更新 fstime(即最后一个检查点的时间戳)。
另外,我们知道,namenode的元数据是以两种状态存储的,一种就是刚才说的fsimage落盘存储,保证了他的可靠性,另一种就是直接存储在namenode的内存中。考虑到hdfs的元数据可靠性保障,我们会定时对内存元数据刷盘。但是当我们集群规模不小时,每次刷盘的元数据量就会很大,为了保证刷盘时的性能,Hadoop采用了双缓存机制。
即我们将会开辟两份一模一样的内存空间,一个为bufCurrent,产生的数据会直接写入到这个bufCurrent,而另一个叫bufReady,在bufCurrent数据写入(其实这里可能不止一条数据)后,两片内存就会exchange(交换)。然后之前的bufCurrent就负责往磁盘上写入数据,之前的bufReady就继续接收客户端写入的数据。其实就是将向磁盘写数据的任务交给了后台去做。
2 FsEditLog源码介绍
2.1 TransactionId
在具体了解FsEditLog之前,我们需要先了解Transaction机制。
TransactionId与客户端每次发起的RPC操作相关, 当客户端发起一次RPC请求对Namenode的命名空间修改后, Namenode就会在editlog中发起一个新的transaction用于记录这次操作, 每个transaction会用一个唯一的transactionId标识。
下面主要介绍下各个文件:
- edits_start_transaction_id-end_transaction_id:edits文件就是我们描述的editlog文件,edits文件中存放的是客户端执行的所有更新命名空间的操作。 每个edits文件都包含了文件名中start trancsaction id - end transaction id之间的所有事务。比如 edits_0000000000331043694-0000000000331043707 , 这个文件记录了transaction id在331043694和331043707之间的所有事务(transaction)
- edits_inprogress_start_transaction_id: 正在进行处理的editlog。 所有从start transaction id开始的新的修改操作都会记录在这个文件中, 直到HDFS重置(roll) 这个日志文件。 重置操作会将inprogress文件关闭, 并将inprogress文件改名为正常的editlog文件(如上一项所示) , 同时还会打开一个新的inprogress文件, 记录正在进行的事务。 例如当前文件夹中的edits_inprogress_0000000000331043708文件, 这个文件记录了所有transaction id大于331043708的新开始的事务, 我们将这个事务区间称为一个日志段落(segment) 。Namenode元数据文件夹中存在这个文件有两种可能: 要么是Active Namenode正在写入数据, 要么是前一个Namenode没有正确地关闭。
- fsimage_end_transaction_id:fsimage文件是Hadoop文件系统元数据的一个永久性的检查点, 包含Hadoop文件系统中end transaction id前的完整的HDFS命名空间元数据镜像, 也就是HDFS所有目录和文件对应的INode的序列化信息。 以当前文件夹为例, fsimage_0000000000363262204就是fsimage_0000000000363216289与edits_0000000000363216290-0000000000363262204(可能不是一个文件)合并后的镜像文件, 保存了transaction id小于363216289的HDFS命名空间的元数据。 每个fsimage文件还有一个对应的md5文件, 用来确保fsimage文件的正确性, 以防止磁盘异常发生。
- seen_txid: 这个文件中保存了上一个检查点(checkpoint) (合并edits和fsimage文件) 以及编辑日志重置(editlog roll) (持久化当前的inprogress文件并且创建一个新的inprogress文件) 时最新的事务id (transaction id) 。 要特别注意的是, 这个事务id并不是Namenode内存中最新的事务id, 因为seen_txid只在检查点操作以及编辑日志重置操作时更新。 这个文件的作用在于Namenode启动时, 可以利用这个文件判断是否有edits文件丢失。 例如, Namenode使用不同的目录保存fsimage以及edits文件, 如果保存edits的目录内容丢失, Namenode将会使用上一个检查点保存的fsimage启动, 那么上一个检查点之后的所有事务都会丢失。 为了防止发生这种状况, Namenode启动时会检查seen_txid并确保内存中加载的事务id至少超过seen_txid; 否则Namenode将终止启动操作。
/**
* TransactionId与客户端每次发起的RPC操作相关,
* 当客户端发起一次RPC请求对Namenode的命名空间修改后,
* Namenode就会在editlog中发起一个新的transaction用于记录这次操作,
* 每个transaction会用一个唯一的transactionId标识。
*
*/
private static class TransactionId {
public long txid;
TransactionId(long value) {
this.txid = value;
}
}
2.2 FsEditLog变量
// 初始editlog状态
private State state = State.UNINITIALIZED;
//initialize,管理一些列JournalManager
private JournalSet journalSet = null;
@VisibleForTesting
EditLogOutputStream editLogStream = null;
// a monotonically increasing counter that represents transactionIds.
// All of the threads which update/increment txid are synchronized,
// so make txid volatile instead of AtomicLong.
// 一个单调递增的计数器,代表 transactionId
private volatile long txid = 0;
// stores the last synced transactionId.
private long synctxid = 0;
// the first txid of the log that's currently open for writing.
// If this value is N, we are currently writing to edits_inprogress_N
private volatile long curSegmentTxId = HdfsServerConstants.INVALID_TXID;
// the time of printing the statistics to the log file.
private long lastPrintTime;
// is a sync currently running?
private volatile boolean isSyncRunning;
// is an automatic sync scheduled?
private volatile boolean isAutoSyncScheduled = false;
// these are statistics counters.
private long numTransactions; // number of transactions
private final AtomicLong numTransactionsBatchedInSync = new AtomicLong();
private long totalTimeTransactions; // total time for all transactions
private NameNodeMetrics metrics;
// 负责管理 NameNode 使用的 StorageDirectories
private final NNStorage storage;
private final Configuration conf;
// edit目录
private final List<URI> editsDirs;
protected final OpInstanceCache cache = new OpInstanceCache();
/**
* The edit directories that are shared between primary and secondary.
*/
private final List<URI> sharedEditsDirs;
/**
* Take this lock when adding journals to or closing the JournalSet. Allows
* us to ensure that the JournalSet isn't closed or updated underneath us
* in selectInputStreams().
*/
private final Object journalSetLock = new Object();
// stores the most current transactionId of this thread.
private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() {
@Override
protected synchronized TransactionId initialValue() {
return new TransactionId(Long.MAX_VALUE);
}
};
2.3 FsEditLog构造函数
FSEditLog 是通过newInstance方法进行构造的, 可以根据配置dfs.namenode.edits.asynclogging 生成不同的FSEditLog 实例, 默认是 FSEditLogAsync
static FSEditLog newInstance(Configuration conf, NNStorage storage,
List<URI> editsDirs) {
// dfs.namenode.edits.asynclogging,默认为true
boolean asyncEditLogging = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT);
LOG.info("Edit logging is async:" + asyncEditLogging);
return asyncEditLogging
? new FSEditLogAsync(conf, storage, editsDirs)
: new FSEditLog(conf, storage, editsDirs);
}
FSEditLogAsync(Configuration conf, NNStorage storage, List<URI> editsDirs) {
super(conf, storage, editsDirs);
// op instances cannot be shared due to queuing for background thread.
cache.disableCache();
}
/**
* Constructor for FSEditLog. Underlying journals are constructed, but
* no streams are opened until open() is called.
* 构建了底层日志,但在调用 open() 之前不会打开任何流
*
* @param conf The namenode configuration
* @param storage Storage object used by namenode
* @param editsDirs List of journals to use
*/
FSEditLog(Configuration conf, NNStorage storage, List<URI> editsDirs) {
isSyncRunning = false;
this.conf = conf;
this.storage = storage;
metrics = NameNode.getNameNodeMetrics();
lastPrintTime = monotonicNow();
// If this list is empty, an error will be thrown on first use
// of the editlog, as no journals will exist
this.editsDirs = Lists.newArrayList(editsDirs);
// dfs.namenode.shared.edits.dir:HA 集群中多个 namenode 之间共享存储上的目录。
// 该目录将由活动写入并由备用读取,以保持名称空间同步。它应该在非 HA 集群中留空。
this.sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);
}
2.4 edit状态机
在Hadoop中,FsEditLog是一个有限状态机。关于此状态机的状态转换,在注释中解释的非常清楚。
/**
* State machine for edit log.
*
* In a non-HA setup:
*
* The log starts in UNINITIALIZED state upon construction. Once it's
* initialized, it is usually in IN_SEGMENT state, indicating that edits may
* be written. In the middle of a roll, or while saving the namespace, it
* briefly enters the BETWEEN_LOG_SEGMENTS state, indicating that the previous
* segment has been closed, but the new one has not yet been opened.
*
* In an HA setup:
*
* The log starts in UNINITIALIZED state upon construction. Once it's
* initialized, it sits in the OPEN_FOR_READING state the entire time that the
* NN is in standby. Upon the NN transition to active, the log will be CLOSED,
* and then move to being BETWEEN_LOG_SEGMENTS, much as if the NN had just
* started up, and then will move to IN_SEGMENT so it can begin writing to the
* log. The log states will then revert to behaving as they do in a non-HA
* setup.
*/
private enum State {
UNINITIALIZED, // 初始状态
// 在非ha状态时,短暂存在于前一个segment关闭,下一个完成初始化之前
// 在ha状态,standby nn切换为active nn的过程中
BETWEEN_LOG_SEGMENTS,
// editlog处于可写状态
IN_SEGMENT,
// standby状态时即为此状态
OPEN_FOR_READING,
// standby 转换为active时,editLog文件会关闭
CLOSED;
}
2.4.1 简单介绍几个方法
- initJournalsForWrite:将FSEditLog从UNINITIALIZED状态转换为BETWEEN_LOG_SEGMENTS状态,其中调用initJournals()方法会根据传入的dirs 变量初始化journalSet,初始化完成后可以调用方法往其中写入editLog了
- initSharedJournalsForRead:用在HA情况下。 调用这个方法会将FSEditLog从UNINITIALIZED状态转换为OPEN_FOR_READING状态。与initJournalsForWrite()方法相同, initSharedJournalsForRead()方法也调用了initJournals()方法执行初始化操作, 只不过editlog文件的存储位置不同, 在HA的情况下,editlog文件的存储目录为共享存储目录, 这个共享存储目录由Active Namenode和StandbyNamenode共享读取
- openForWrite:在非HA机制下, 调用这个方法会完成BETWEEN_LOG_SEGMENTS状态到IN_SEGMENT状态的转换
- endCurrentLogSegment:endCurrentLogSegment()会将当前正在写入的日志段落关闭, 它调用了journalSet.finalizeLogSegment()方法将curSegmentTxid -> lastTxId之间的操作持久化到磁盘上
- close:close()方法用于关闭editlog文件的存储, 完成了IN_SEGMENT到CLOSED状态的改变。 close()会首先等待sync操作完成, 然后调用endCurrentLogSegment()方法, 将当前正在进行写操作的日志段落结束。 之后close()方法会关闭journalSet对象, 并将FSEditLog状态机转变为CLOSED状态。
2.5 重要方法
2.5.1 事务相关
private long beginTransaction() {
assert Thread.holdsLock(this);
// get a new transactionId
txid++;
//
// record the transactionId when new data was written to the edits log
//
// 更新transactionId中的txid
TransactionId id = myTransactionId.get();
id.txid = txid;
return monotonicNow();
}
private void endTransaction(long start) {
assert Thread.holdsLock(this);
// update statistics
long end = monotonicNow();
numTransactions++;
totalTimeTransactions += (end-start);
if (metrics != null) // Metrics is non-null only when used inside name node
metrics.addTransaction(end-start);
}
2.5.2 logEdit
会传入一个FSEditLogOp对象来标识当前需要被记录的操作类型以及操作的信息。
/**
* Write an operation to the edit log.
* <p/>
* Additionally, this will sync the edit log if required by the underlying
* edit stream's automatic sync policy (e.g. when the buffer is full, or
* if a time interval has elapsed).
*/
void logEdit(final FSEditLogOp op) {
boolean needsSync = false;
synchronized (this) {
assert isOpenForWrite() :
"bad state: " + state;
// wait if an automatic sync is scheduled
// 根据是否开启了自动同步,如果开启了,等待同步被调度
waitIfAutoSyncScheduled();
// check if it is time to schedule an automatic sync
// 开启edit事务,并更新needSync变量的值
// 最后会调用EditLogFileOutputStream#shouldForceSync方法更新
needsSync = doEditTransaction(op);
if (needsSync) {
isAutoSyncScheduled = true;
}
}
// Sync the log if an automatic sync is required.
// 是否进行同步,后面在仔细分析logSync方法
if (needsSync) {
logSync();
}
}
synchronized boolean doEditTransaction(final FSEditLogOp op) {
long start = beginTransaction();
op.setTransactionId(txid);
try {
editLogStream.write(op);
} catch (IOException ex) {
// All journals failed, it is handled in logSync.
} finally {
op.reset();
}
endTransaction(start);
return shouldForceSync();
}
logEdit()方法会调用beginTransaction()方法在editlog文件中开启一个新的transaction, 然后使用editlog输入流写入要被记录的操作, 接下来调用endTransaction()方法关闭这个transaction, 最后调用logSync()方法将写入的信息同步到磁盘上。
logEdit()方法调用beginTransaction()、 editLogStream.write()以及endTransaction()三个方法时使用了synchronized关键字进行同步操作, 这样就保证了多个线程调用FSEditLog.logEdit()方法向editlog文件中写数据时, editlog文件记录的内容不会相互影响。 同时, 也保证了这几个并发线程保存操作对应的transactionId(通过调用beginTransaction()方法获得) 是唯一并递增的。
logEdit()方法中调用logSync()方法执行刷新操作的语句并不在synchronized代码段中。 这是因为调用logSync()方法必然会触发写editlog文件的磁盘操作, 这是一个非常耗时的操作, 如果放入同步模块中会造成其他调用FSEditLog.logSync()线程的等待时间过长。 所以, HDFS设计者将需要进行同步操作的synchronized代码段放入logSync()方法中, 也就让输出日志记录和刷新缓冲区数据到磁盘这两个操作分离了。 同时, 利用EditLogOutputStream的两个缓冲区, 使得日志记录和刷新缓冲区数据这两个操作可以并发执行, 大大地提高了Namenode的吞吐量。
2.5.3 logDelete
logDelete()方法用于在editlog文件中记录删除HDFS文件的操作。
logDelete()方法首先会构造一个DeleteOp对象, 这个DeleteOp类是FSEditLogOp类的子类, 用于记录删除操作的相关信息, 包括了ClientProtocol.delete()调用中所有参数携带的信息。构造DeleteOp对象后, logDelete()方法会调用logRpcIds()方法在DeleteOp对象中添加RPC调用相关信息, 之后logDelete()方法会调用logEdit()方法在editlog文件中记录这次删除操作。
/**
* Add delete file record to edit log
*/
void logDelete(String src, long timestamp, boolean toLogRpcIds) {
DeleteOp op = DeleteOp.getInstance(cache.get())
.setPath(src)
.setTimestamp(timestamp);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
2.5.4 logSync
logEdit()方法通过调用beginTransaction()方法成功地获取一个transactionId之后, 就会通过输出流向editlog文件写数据以记录当前的操作, 但是写入的这些数据并没有直接保存在editlog文件中, 而是暂存在输出流的缓冲区中。 所以当logEdit()方法将一个完整的操作写入输出流后, 需要调用logSync()方法同步当前线程对editlog文件所做的修改。
对比这个transactionId和已经同步到editlog文件中的transactionId。 如果当前线程的transactionId大于editlog文件中的transactionId, 则表明editlog文件中记录的数据不是最新的, 同时如果当前没有别的线程执行同步操作, 则开始同步操作将输出流缓存中的数据写入editlog文件中。
每一个sync操作主要通过下面三个步骤完成:
- 判断当前操作是否已经同步到了editLog文件中,如果尚未同步,则标记isSyncRunning为true,且交换内存,做同步准备;
- 未同步,刷新数据到磁盘中,过程中耗时较久,因此不加锁,因为在上一段同步代码中已经将双buffer调换了位置, 不会有线程向同步缓存中插入新操作;
- 重置isSyncRunning标志位, 并且通知等待的线程, 这部分代码需要进行加锁
/**
* Sync all modifications done by this thread.
*
* The internal concurrency design of this class is as follows:
* - Log items are written synchronized into an in-memory buffer,
* and each assigned a transaction ID.
* - When a thread (client) would like to sync all of its edits, logSync()
* uses a ThreadLocal transaction ID to determine what edit number must
* be synced to.
* - The isSyncRunning volatile boolean tracks whether a sync is currently
* under progress.
*
* The data is double-buffered within each edit log implementation so that
* in-memory writing can occur in parallel with the on-disk writing.
*
* Each sync occurs in three steps:
* 1. synchronized, it swaps the double buffer and sets the isSyncRunning
* flag.
* 2. unsynchronized, it flushes the data to storage
* 3. synchronized, it resets the flag and notifies anyone waiting on the
* sync.
*
* The lack of synchronization on step 2 allows other threads to continue
* to write into the memory buffer while the sync is in progress.
* Because this step is unsynchronized, actions that need to avoid
* concurrency with sync() should be synchronized and also call
* waitForSyncToFinish() before assuming they are running alone.
*/
public void logSync() {
// Fetch the transactionId of this thread.
logSync(myTransactionId.get().txid);
}
// mytxid是ThreadLocal变量myTransactionId中存储的当前线程需要同步的txid
protected void logSync(long mytxid) {
long syncStart = 0;
boolean sync = false;
long editsBatchedInSync = 0;
try {
EditLogOutputStream logStream = null;
synchronized (this) {
try {
printStatistics(false);
// if somebody is already syncing, then wait
// 当前txid大于editlog中已经同步的txid,并且有线程正在同步, 则等待
while (mytxid > synctxid && isSyncRunning) {
try {
wait(1000);
} catch (InterruptedException ie) {
}
}
//
// If this transaction was already flushed, then nothing to do
// 如果txid小于editlog中已经同步的txid, 则表明当前操作已经被同步到存储上, 不需要再次同步
if (mytxid <= synctxid) {
return;
}
// now, this thread will do the sync. track if other edits were
// included in the sync - ie. batched. if this is the only edit
// synced then the batched count is 0
// 开启同步,在此过程中将isSyncRunning置为true
editsBatchedInSync = txid - synctxid - 1;
syncStart = txid;
isSyncRunning = true;
sync = true;
// swap buffers
try {
if (journalSet.isEmpty()) {
throw new IOException("No journals available to flush");
}
// 双缓存交换内存
editLogStream.setReadyToFlush();
} catch (IOException e) {
final String msg =
"Could not sync enough journals to persistent storage " +
"due to " + e.getMessage() + ". " +
"Unsynced transactions: " + (txid - synctxid);
LOG.error(msg, new Exception());
synchronized(journalSetLock) {
IOUtils.cleanupWithLogger(LOG, journalSet);
}
terminate(1, msg);
}
} finally {
// Prevent RuntimeException from blocking other log edit write
// 防止其他log edit 写入阻塞, 引起的RuntimeException,唤醒其他等待的线程
doneWithAutoSyncScheduling();
}
//editLogStream may become null,
//so store a local variable for flush.
logStream = editLogStream;
}
// do the sync
long start = monotonicNow();
try {
// 刷写数据,由于此过程耗时较久,因此将此操作移出同步代码块中
if (logStream != null) {
logStream.flush();
}
} catch (IOException ex) {
synchronized (this) {
final String msg =
"Could not sync enough journals to persistent storage. "
+ "Unsynced transactions: " + (txid - synctxid);
LOG.error(msg, new Exception());
synchronized(journalSetLock) {
IOUtils.cleanupWithLogger(LOG, journalSet);
}
terminate(1, msg);
}
}
long elapsed = monotonicNow() - start;
if (metrics != null) { // Metrics non-null only when used inside name node
metrics.addSync(elapsed);
metrics.incrTransactionsBatchedInSync(editsBatchedInSync);
numTransactionsBatchedInSync.addAndGet(editsBatchedInSync);
}
} finally {
// Prevent RuntimeException from blocking other log edit sync
synchronized (this) {
if (sync) {
// 已同步txid赋值为开始sync操作的txid
synctxid = syncStart;
for (JournalManager jm : journalSet.getJournalManagers()) {
/**
* {@link FileJournalManager#lastReadableTxId} is only meaningful
* for file-based journals. Therefore the interface is not added to
* other types of {@link JournalManager}.
*/
if (jm instanceof FileJournalManager) {
((FileJournalManager)jm).setLastReadableTxId(syncStart);
}
}
isSyncRunning = false;
}
this.notifyAll();
}
}
}
3 EditLogOutputStream
在FSEditLog#doEditTransaction中FSEditLog类会调用FSEditLog.editLogStream字段的write()方法在editlog文件中记录一个操作, 数据会先被写入到editlog文件输出流的缓存中, 然后FSEditLog类会调用editLogStream.flush()方法将缓存中的数据同步到磁盘上。
synchronized boolean doEditTransaction(final FSEditLogOp op) {
long start = beginTransaction();
op.setTransactionId(txid);
try {
editLogStream.write(op);
} catch (IOException ex) {
// All journals failed, it is handled in logSync.
} finally {
op.reset();
}
endTransaction(start);
return shouldForceSync();
}
editLogStream字段是EditLogOutputStream类型的
3.1 JournalSetOutputStream
JournalSetOutputStream类是EditLogOutputStream的子类, 在JournalSetOutputStream对象上调用的所有EditLogOutputStream接口方法都会被前转到FSEditLog.journalSet字段中保存的editlog文件在所有存储位置上的输出流对象(通过调用mapJournalsAndReportErrors()方法实现) 。
FSEditLog的editLogStream字段就是JournalSetOutputStream类型的(是在startLogSegment()方法中赋值的) , 通过调用JournalSetOutputStream对象提供的方法, FSEditLog可以将Namenode多个存储位置上的editlog文件输出流对外封装成一个输出流, 大大方便了调用。
JournalSetOutputStream类是通过mapJournalsAndReportErrors()方法, 将EditLogOutputStream接口上的write()调用前转到了FSEditLog中保存的所有存储路径上editlog文件对应的EditLogOutputStream输出流对象上的。 这个方法会遍历FSEditLog.journalSet.journals集合, 然后将write()请求前转到journals集合中保存的所有JournalAndStream对象上。 journalSet的journals字段是一个JournalAndStream对象的集合,JournalAndStream对象封装了一个JournalManager对象, 以及在这个JournalManager上打开的editlog文件的EditLogOutputStream对象。
journalSet.journals字段是在FSEditLog.startLogSegment()方法中赋值的 , 这个方法调用了journalSet.startLogSegment()方法在所有editlog文件的存储路径上构造输出流, 并将这些输出流保存在FSEditLog的journalSet.journals字段中。
@Override
public void write(final FSEditLogOp op)
throws IOException {
// 调用mapJournalsAndReportErrors方法
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
// 获取当前的JournalAndStream对应的EditLogFileOutputStream#write方法
// 往editLog文件中写入数据
jas.getCurrentStream().write(op);
}
}
}, "write op");
}
/**
* Apply the given operation across all of the journal managers, disabling
* any for which the closure throws an IOException.
* @param closure {@link JournalClosure} object encapsulating the operation.
* @param status message used for logging errors (e.g. "opening journal")
* @throws IOException If the operation fails on all the journals.
*/
private void mapJournalsAndReportErrors(
JournalClosure closure, String status) throws IOException{
List<JournalAndStream> badJAS = Lists.newLinkedList();
for (JournalAndStream jas : journals) {
try {
// 回调apply方法
closure.apply(jas);
} catch (Throwable t) {
if (jas.isRequired()) {
final String msg = "Error: " + status + " failed for required journal ("
+ jas + ")";
LOG.error(msg, t);
// If we fail on *any* of the required journals, then we must not
// continue on any of the other journals. Abort them to ensure that
// retry behavior doesn't allow them to keep going in any way.
abortAllJournals();
// the current policy is to shutdown the NN on errors to shared edits
// dir. There are many code paths to shared edits failures - syncs,
// roll of edits etc. All of them go through this common function
// where the isRequired() check is made. Applying exit policy here
// to catch all code paths.
terminate(1, msg);
} else {
LOG.error("Error: " + status + " failed for (journal " + jas + ")", t);
badJAS.add(jas);
}
}
}
disableAndReportErrorOnJournals(badJAS);
if (!NameNodeResourcePolicy.areResourcesAvailable(journals,
minimumRedundantJournals)) {
String message = status + " failed for too many journals";
LOG.error("Error: " + message);
throw new IOException(message);
}
}
3.2 EditFileOutputStream
EditLogFileOutputStream是向本地文件系统中保存的editlog文件写数据的输出流, 向EditLogFileOutputStream写数据时, 数据首先被写入到输出流的缓冲区中, 当显式地调用flush()操作后, 数据才会从缓冲区同步到editlog文件中。
3.2.1 构造函数
/**
* Creates output buffers and file object.
*
* @param conf
* Configuration object
* @param name
* File name to store edit log
* @param size
* Size of flush buffer
* @throws IOException
*/
public EditLogFileOutputStream(Configuration conf, File name, int size)
throws IOException {
super();
shouldSyncWritesAndSkipFsync = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH,
DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT);
file = name;
doubleBuf = new EditsDoubleBuffer(size);
RandomAccessFile rp;
if (shouldSyncWritesAndSkipFsync) {
rp = new RandomAccessFile(name, "rws");
} else {
rp = new RandomAccessFile(name, "rw");
}
fp = new FileOutputStream(rp.getFD()); // open for append
fc = rp.getChannel();
fc.position(fc.size());
}
3.2.2 变量
public static final int MIN_PREALLOCATION_LENGTH = 1024 * 1024;
// 输出流对应的editlog文件。
private File file;
// editlog文件对应的输出流。
private FileOutputStream fp; // file stream for storing edit logs
// editlog文件对应的输出流通道。
private FileChannel fc; // channel of the file stream for sync
// 一个具有两块缓存的缓冲区, 数据必须先写入缓存, 然后再由缓存同步到磁盘上。
private EditsDoubleBuffer doubleBuf;
//用来扩充editlog文件大小的数据块。 当要进行同步操作时,如果editlog文件不够大,
// 则使用fill来扩充editlog,文件最小1M
static final ByteBuffer fill = ByteBuffer.allocateDirect(MIN_PREALLOCATION_LENGTH);
private boolean shouldSyncWritesAndSkipFsync = false;
private static boolean shouldSkipFsyncForTests = false;
// EditLogFileOutputStream有一个static的代码段, 将fill字段用
// FSEditLogOpCodes.OP_INVALID 字节填满。
// 在创建edit inprocess文件时,首先会用"-1"填充1M大小的文件空间,然后将写入的指针归0
static {
fill.position(0);
for (int i = 0; i < fill.capacity(); i++) {
fill.put(FSEditLogOpCodes.OP_INVALID.getOpCode());
}
}
3.2.3 写数据
首先调用write方法:
@Override
public void write(FSEditLogOp op) throws IOException {
doubleBuf.writeOp(op, getCurrentLogVersion());
}
public void writeOp(FSEditLogOp op, int logVersion) throws IOException {
bufCurrent.writeOp(op, logVersion);
}
public void writeOp(FSEditLogOp op, int logVersion) throws IOException {
if (firstTxId == HdfsServerConstants.INVALID_TXID) {
firstTxId = op.txid;
} else {
assert op.txid > firstTxId;
}
writer.writeOp(op, logVersion);
numTxns++;
}
3.2.4 EditsDoubleBuffer
EditsDoubleBuffer类就是editlog写入数据中提及到的双缓存机制的主要实现,包含两块内存,数据先写入其中一块内存,而另一块可能在进行同步操作,因此此设计能够保证在同步时不会由于写入而影响或由于同步而无法写入:
//正在写入的缓冲区
private TxnBuffer bufCurrent; // current buffer for writing
//准备好同步的缓冲区
private TxnBuffer bufReady; // buffer ready for flushing
//缓冲区的大小 默认 512K
private final int initBufferSize;
输出流要进行同步操作时, 首先要调用EditsDoubleBuffer.setReadyToFlush()方法交换两个缓冲区, 将正在写入的缓存改变为同步缓存, 然后才可以进行同步操作。
public void setReadyToFlush() {
assert isFlushed() : "previous data not flushed yet";
TxnBuffer tmp = bufReady;
bufReady = bufCurrent;
bufCurrent = tmp;
}
// 完成了setReadyToFlush()调用之后,
// 输出流就可以调用flushTo()方法将同步缓存中的数据写入到文件中。
/**
* Writes the content of the "ready" buffer to the given output stream,
* and resets it. Does not swap any buffers.
*/
public void flushTo(OutputStream out) throws IOException {
bufReady.writeTo(out); // write data to file
bufReady.reset(); // erase all data in the buffer
}
|