2021SC@SDUSC
Hadoop源码分析(十一)—— Datanode实现(3)
Hadoop源码分析(九)—— Datanode实现(1) Hadoop源码分析(十)—— Datanode实现(2)
5. BlockReceiver 数据块接收器
BlockReceiver 所在的包为 org.apache.hadoop.hdfs.server.Datanode,该类是 Datanode 节点上的数据块接收器。BlockReceiver的源代码如下:
5.1 成员变量
private Block block;
protected boolean finalized;
private DatalnputStream in = null;
private DataChecksum checksum;
private Outputstream out = null;
private DataOutputStream checksumOut = null;
private int bytesPerChecksum;
private int checksumsize;
private ByteBuffer buf;
private int bufRead;
private int maxPacketReadLen;
protected long offsetlnBlock;
protected final String inAddr;
protected final String myAddr;
private String mirrorAddr;
private DataOutputStream mirrorOut;
private Daemon responder = null;
private FSDataset.BlockWriteStreams streams;
private boolean isRecovery = false;
private String clientName;
Datanodelnfo srcDatanode = null;
private Checksum partialCrc = null;
5.2 成员方法
BlockReceiver (Block block, DataInputStream in, String inAddr, String myAddr, boolean isRecovery, String clientName, DatanodeInfo srcDatanode, Datanode Datanode) throws IOException {
try{
this.block = block;
this.in = in;
this.inAddr = inAddr;
this.myAddr = myAddr;
this.isRecovery = isRecovery;
this.clientName = clientName;
this.offsetlnBlock = 0;
this.srcDatanode = srcDatanode;
this.Datanode = Datanode;
this.checksum = DataChecksum.newDataChecksum(in) ;
this.bytesPerChecksum = checksum.getBytesPerChecksum();
this.checksumSize = checksum.getChecksumSize();
streams = Datanode.data.writeToBlock(block, isRecovery);
this.finalized = Datanode.data.isValidBlock(block);
if (streams != null) {
this.out = streams.dataOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream ( streams.checksumOut, SMALL_BUFFER_SIZE));
if (Datanode.blockscanner != null && isRecovery) { Datanode.blockscanner.deleteBlock(block);
}
}
} catch (BlockAlreadyExistsException bae) {
throw bae;
} catch(IOException ioe) {
IOUtils.closeStream(this);
cleanupBlock();
IOException cause = FSDataset.getCauselfDiskError(ioe);
if (cause != null) {
ioe = cause;
Datanode.checkDiskError(ioe);
}
throw ioe;
}
}
通过构造方法完成BlockReceiver对象的初始化工作,这里的初始化是指为即将到来的Block数据申 请本地磁盘上的存储空间,并且根据接受的头部信息为该Block创建校验器,以及获取对应的校验配置信息。
private int readToBuf(int toRead) throws IOException {
if (toRead < 0) {
toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity()) - buf.limit();
}
int nRead = in.read(buf.array(), buf.limit(), toRead);
if (nRead < 0) {
throw new EOFException("while trying to read " + toread + " bytes");
}
bufRead = buf.limit() + nRead;
buf.limit(bufRead);
return nRead;
}
该方法用于接收当前packet中的数据,并保存到缓存块中。
6. DataBlockScanner 数据块扫描器
DataBlockScanner 所在的包为 org.apachc.hadoop.hdfs.server.Datanode,该类主要用于保证 Datanode 上数据块的完整性。此外Hadoop还提供了校验和的方式来保证数据的完整性。在Datanode节点上开启一个DataBlockScanner后台线程,来定期验证存储在其上的所有块,这个是防止物理介质出现损减情况而造成的数据损坏。DataBlockScanner的源代码如下:
6.1 成员变量
private static final int MAX_SCAN_RATE = 8 * 1024 * 1024;
private static final int MIN_SCAN_RATE = 1 * 1024 * 1024;
static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L;
static final String verificationlogFile = "dncp_block_verification.log";
static final int verficationLogLimit = 5;
FSDataset dataset;
TreeSet<BlockScanInfo> blockInfoSet;
HashMap<Block, BlockScanInfo> blockMap;
long totalScans = 0;
long totalverifications = 0;
long totalScanErrors =0;
long totalTransientErrors = 0;
long bytesLeft = 0;
long totalBytesToScan = 0;
private LogFileHandler verificationLog;
BlockTransferThrottler throttler = null;
private static enum ScanType {
REMOTE_READ,
VERIFTCATI ON_SCAN,
NONE,
}
该变量代表扫描操作的类型。
6.2 成员方法
private void init() {
Block arr[] = dataset.getBlockReport();
Collections.shuffle(Arrays.asList(arr));
blockInfoSet = new TreeSet<BlockScanInfo>();
blockMap = new HashMap<Block, BlockScanInfo>();
long scanTime = -1;
for (Block block : arr) {
BlockScanInfo info = new BlockScanInfo( block ); info.lastScanTime = scanTime--;
addBlocklnfo(info);
}
File dir = null;
FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
for(FSDataset.FSVolume vol : volumes) {
if (LogFileHandler.isFilePresent(vol.getDir(), verificationLogFile)) {
dir = vol.getDir();
break;
}
}
if (dir == null) {
dir = volumes[0].getDir();
}
try {
verificationLog = new LogFileHandler(dir, verificationLogFile, 100);
} catch (IOException e) {
LOG.warn("Could not open verfication log. " + "Verification times are not stored.");
}
synchronized (this) {
throttler = new BlockTransferThrottler(200, MAX_SCAN_RATE);
)
}
初始化方法主要完成的工作包括:为每一个Block创建对应的BlockScanlnfo对象,创建扫描日志记 录器,创建扫描速度控制器等。
private void updateBytesToScan(long len, long lastScanTime) {
totalBytesToScan += len;
if ( lastScanTime < currentPeriodstart ) {
bytesLeft += len;
}
}
该方法用于更新此次扫描操作需要扫描的字节数。
private synchronized void addBlockInfo(BlockScanInfo info) {
boolean added = blockInfoSet.add(info);
blockMap.put(info.block, info);
if ( added ) {
LogFileHandler log = verificationLog;
if (log != null) {
log.setMaxNumLines(blockMap.size() * verficationLogLimit);
}
updateBytesToScan(info.block.getNumBytes(), info.lastScanTime);
}
}
private synchronized void delBlocklnfo(BlockScanlnfo info) {
boolean exists = blockInfoSet.remove(info);
blockMap.remove(info.block);
if ( exists ) {
LogFileHandler log = verificationLog;
if (log != null) {
log.setMaxNumLines(blockMap.size() * verficationLogLimit);
}
updateBytesToScan(-info.block.getNumBytes(), info.lastScanTime);
}
}
上面的两个方法分别用于添加和删除数据块对应的扫描信息。
private boolean assignlnitialVerificationTimes() {
int numBlocks = 1;
synchronized (this) {
numBlocks = Math.max(blockMap.size(), 1);
}
LogFileHandler.Reader logReader = null;
try {
if (verificationLog != null) {
logReader = verificationLog.new Reader(false);
}
} catch (IOException e) {
LOG.warn("Could not read previous verification times :" + StringUtils.stringifyException(e)); }
if (verificationLog != null) { verificationLog.updateCurNumLines();
}
try {
while (logReader != null && logReader.hasNext()) {
if (!Datanode.shouldRun || Thread.interrupted()) { return false;
}
LogEntry entry = LogEntry.parseEntry(logReader.next());
if (entry != null) {
updateBlocklnfo(entry);
)
}
} finally {
IOUtils.closeStream(logReader);
}
long verifylnterval = (long) (Math.min( scanPeriod/2.O/numBlocks, 10*60*1000 ));
long lastScanTime = System.currentTimeMillis() - scanPeriod;
synchronized (this) {
if (blocklnfoSet.size() > 0 ) {
BlockScanlnfo info;
while ((info = blocklnfoSet.first()).lastScanTime < 0) {
delBlockInfo(info);
info.lastScanTime = lastScanTime;
lastScanTime += verifyInterval;
addBlocklnfo(info);
}
}
}
return true;
该方法用于为每一个Block分配上一次验证的时间。
private synchronized void adjustThrottler() {
long timeLeft = currentPeriodStart+scanPeriod - System.currentTimeMillis();
long bw = Math.max(bytesLeft*1000/timeLeft, MIN_SCAN_RATE);
throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
}
该方法用于调整扫描速度。在一次Blocks扫描验证周期中,DataBlockScanner需要进行大量的磁盘 I/O,为了不影响Datanode节点上其他线程的工作资源,同时也为了自身工作的有效性,所以 DataBlockScanner采用了扫描验证速度控制器,并根据当前的工作量来控制当前数据块的验证速度。
|