IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 2021年山东大学软件工程应用与实践项目——Hadoop源码分析(十一) -> 正文阅读

[大数据]2021年山东大学软件工程应用与实践项目——Hadoop源码分析(十一)

2021SC@SDUSC

Hadoop源码分析(十一)—— Datanode实现(3)

Hadoop源码分析(九)—— Datanode实现(1)
Hadoop源码分析(十)—— Datanode实现(2)

5. BlockReceiver 数据块接收器

BlockReceiver 所在的包为 org.apache.hadoop.hdfs.server.Datanode,该类是 Datanode 节点上的数据块接收器。BlockReceiver的源代码如下:

5.1 成员变量

private Block block;
//该变量代表待接收的Block。
protected boolean finalized;
//该变量用于标识接收过程是否结束。
private DatalnputStream in = null;
//该变量代表Block的数据读取流。
private DataChecksum checksum;
//该变量代表Block数据的校验器
private Outputstream out = null;
//该变量代表Block数据的写入流(写入Datanode的本地磁盘)
private DataOutputStream checksumOut = null;
//该变量代表Block数据校验和写入流(写入Datanode的本地磁盘)
private int bytesPerChecksum;
//该变量代表数据校验块大小。
private int checksumsize;
//该变量代表数据校验快对应的校验和大小。
private ByteBuffer buf;
//该变量代表packet数据缓存块。
private int bufRead;
//该变量代表缓存块中已读取的数据大小。
private int maxPacketReadLen;
//该变量代表数据包读取数据的最大长度。
protected long offsetlnBlock;
//该变量代表接受的packet在Block中的起始位置。
protected final String inAddr;
//该变量用于标识发送端(Datanode/Client)的ip地址。
protected final String myAddr;
//该变量用于标识当前接收端(Datanode)的ip地址。
private String mirrorAddr;
//该变量代表下一个接收端(Datanode)的ip地址。
private DataOutputStream mirrorOut;
//该变量标识向下一个接收端发送packet的写入流。
private Daemon responder = null;
//该变量代表packet的响应器。
private FSDataset.BlockWriteStreams streams;
//该变量代表Block写入流。
private boolean isRecovery = false;
//该变量用于标识是否执行恢复操作。
private String clientName;
//该变量代表发送数据的客户端。
Datanodelnfo srcDatanode = null;
//该变量代表数据源所在的数据结点。
private Checksum partialCrc = null;
//该变量代表校验和。

5.2 成员方法

BlockReceiver (Block block, DataInputStream in, String  inAddr, String myAddr, boolean isRecovery, String clientName, DatanodeInfo srcDatanode, Datanode Datanode) throws IOException {
try{
this.block = block;
this.in = in;
this.inAddr = inAddr;
this.myAddr = myAddr;
this.isRecovery = isRecovery;
this.clientName = clientName;
this.offsetlnBlock = 0;
this.srcDatanode = srcDatanode;
this.Datanode = Datanode;
this.checksum = DataChecksum.newDataChecksum(in) ; //从头部信息中创建数据校验器
this.bytesPerChecksum = checksum.getBytesPerChecksum();	//数据校验块的大小
this.checksumSize = checksum.getChecksumSize();	//数据校验块对应的校大小
//为即将接受的Block数据创建临时存储空间(创建Block的数据文件和校验和文件)
streams = Datanode.data.writeToBlock(block, isRecovery);
this.finalized = Datanode.data.isValidBlock(block);
if (streams != null) {
this.out = streams.dataOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream ( streams.checksumOut, SMALL_BUFFER_SIZE));
if (Datanode.blockscanner != null && isRecovery) { Datanode.blockscanner.deleteBlock(block);
}
}
} catch (BlockAlreadyExistsException bae) {
throw bae;
} catch(IOException ioe) {
IOUtils.closeStream(this);
cleanupBlock();
IOException cause = FSDataset.getCauselfDiskError(ioe);
if (cause != null) {	// possible disk error
ioe = cause;
Datanode.checkDiskError(ioe); // may throw an exception here
}
throw ioe;
}
}

通过构造方法完成BlockReceiver对象的初始化工作,这里的初始化是指为即将到来的Block数据申 请本地磁盘上的存储空间,并且根据接受的头部信息为该Block创建校验器,以及获取对应的校验配置信息。

private int readToBuf(int toRead) throws IOException {
if (toRead < 0) {
toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity()) - buf.limit();
}
//读取数据到buf中
int nRead = in.read(buf.array(), buf.limit(), toRead);
if (nRead < 0) {
throw new EOFException("while trying to read " + toread + " bytes");
}
bufRead = buf.limit() + nRead;
buf.limit(bufRead);
//返回本次读取数据的大小
return nRead;
}

该方法用于接收当前packet中的数据,并保存到缓存块中。

6. DataBlockScanner 数据块扫描器

DataBlockScanner 所在的包为 org.apachc.hadoop.hdfs.server.Datanode,该类主要用于保证 Datanode 上数据块的完整性。此外Hadoop还提供了校验和的方式来保证数据的完整性。在Datanode节点上开启一个DataBlockScanner后台线程,来定期验证存储在其上的所有块,这个是防止物理介质出现损减情况而造成的数据损坏。DataBlockScanner的源代码如下:

6.1 成员变量

private static final int MAX_SCAN_RATE = 8 * 1024 * 1024;
//该变量用于表示最大的扫描速度为8MB/So
private static final int MIN_SCAN_RATE = 1 * 1024 * 1024;
//该变量用于表示最小的扫描速度为IMB/s。
static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L;
//该变量用于表示默认扫描周期是3周,扫描周期可通过配置$dfs.Datanode.scan.period.hours}来设置。
static final String verificationlogFile = "dncp_block_verification.log";
//该变量用于表示保存扫描日志信息的文件名前缀。其中在DataBlockScanner工作的过程中共产生两个日志:当前日志,文件名后缀是.curr;前一个日志,文件名后缀是.prev。
static final int verficationLogLimit = 5;
//该变量用于表示每扫描5个数据块就进行一次日志记录操作。
FSDataset dataset;
//该变量代表数据块管理器。
TreeSet<BlockScanInfo> blockInfoSet;
//该变量代表数据块扫描信息集合,按照上一次扫描时间和数据块id升序排序,以便快速获取验证到 期的数据块。
HashMap<Block, BlockScanInfo> blockMap;
//该变量代表数据块和数据块扫描信息的映射,以便能够根据数据块快速获取对应的扫描信息。
long totalScans = 0;
//该变量代表扫描的总次数。
long totalverifications = 0;
//该变量代表进行数据块验证的总次数。
long totalScanErrors =0;
//该变量代表扫描出错的总次数。
long totalTransientErrors = 0;
//该变量代表扫描过程中出现的短暂错误的总次数。
long bytesLeft = 0;
//该变量代表一个扫描周期中还剩下需要扫描的数据量。
long totalBytesToScan = 0;
//该变量代表一个扫描周期中需要扫描的总数据量。
private LogFileHandler verificationLog;
//该变量代表数据块的扫描验证日志记录器。
BlockTransferThrottler throttler = null;
//该变量代表扫描时I/O速度控制器,需要根据totalBytesToScan和bytesLeft信息来衡量。
private static enum ScanType {
//当某个数据块被远程客户端访问时,所进行的扫描操作
REMOTE_READ,
//当进有数据块的完整,性验证时,所进行的扫描操作
VERIFTCATI ON_SCAN,
NONE,
}
该变量代表扫描操作的类型。

6.2 成员方法

private void init() {
//从“磁盘”上获取所有的数据块基本信息
Block arr[] = dataset.getBlockReport();
Collections.shuffle(Arrays.asList(arr));
blockInfoSet = new TreeSet<BlockScanInfo>();
blockMap = new HashMap<Block, BlockScanInfo>();
long scanTime = -1;
for (Block block : arr) {
//为每一个Block建立扫描验证信息 
BlockScanInfo info = new BlockScanInfo( block ); info.lastScanTime = scanTime--;
addBlocklnfo(info);
}
//寻找一个合适的扫描验证日志文件
File dir = null;
FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
for(FSDataset.FSVolume vol : volumes) {
if (LogFileHandler.isFilePresent(vol.getDir(),  verificationLogFile)) {
dir = vol.getDir();
break;
}
}
if (dir == null) {
dir = volumes[0].getDir();
}
try {
//创建一个日志记录器
verificationLog = new LogFileHandler(dir, verificationLogFile, 100);
} catch (IOException e) {
LOG.warn("Could not open verfication log. " + "Verification times are not stored.");
}
synchronized (this) {
//创建一个扫描速度控制器
throttler = new BlockTransferThrottler(200, MAX_SCAN_RATE);
)
}

初始化方法主要完成的工作包括:为每一个Block创建对应的BlockScanlnfo对象,创建扫描日志记 录器,创建扫描速度控制器等。

private void updateBytesToScan(long len, long lastScanTime) { 
totalBytesToScan += len;
//新添加的Block需要在需要在此次中扫描验证
if ( lastScanTime < currentPeriodstart ) {
bytesLeft += len;
}
}

该方法用于更新此次扫描操作需要扫描的字节数。

private synchronized void addBlockInfo(BlockScanInfo info) {
boolean added = blockInfoSet.add(info);
blockMap.put(info.block, info);
if ( added ) {
LogFileHandler log = verificationLog;
if (log != null) {
log.setMaxNumLines(blockMap.size() * verficationLogLimit);
}
updateBytesToScan(info.block.getNumBytes(),  info.lastScanTime);
}
}
private synchronized void delBlocklnfo(BlockScanlnfo info) {
boolean exists = blockInfoSet.remove(info);
blockMap.remove(info.block);
if ( exists ) {
LogFileHandler log = verificationLog;
if (log != null) {
log.setMaxNumLines(blockMap.size() * verficationLogLimit);
}
updateBytesToScan(-info.block.getNumBytes(), info.lastScanTime);
}
}

上面的两个方法分别用于添加和删除数据块对应的扫描信息。

private boolean assignlnitialVerificationTimes() {
int numBlocks = 1;
synchronized (this) {
numBlocks = Math.max(blockMap.size(), 1);
}
//读取数据块的验证日志文件
LogFileHandler.Reader logReader = null;
try {
if (verificationLog != null) {
logReader = verificationLog.new Reader(false);
}
} catch (IOException e) {
LOG.warn("Could not read previous verification times :" + StringUtils.stringifyException(e)); }
if (verificationLog != null) { verificationLog.updateCurNumLines();
}
try {
//用日志信息来更新记录的Block上一次验证时间
while (logReader != null && logReader.hasNext()) {
if (!Datanode.shouldRun || Thread.interrupted()) { return false;
}
LogEntry entry = LogEntry.parseEntry(logReader.next());
if (entry != null) {
updateBlocklnfo(entry);
)
}
} finally {
IOUtils.closeStream(logReader);
}
//计算Blocks之间验证的间隔时间
long verifylnterval = (long) (Math.min(  scanPeriod/2.O/numBlocks, 10*60*1000 ));
long lastScanTime = System.currentTimeMillis() -  scanPeriod;
//初始化剩余Blocks的上一次验证时间
synchronized (this) {
if (blocklnfoSet.size() > 0 ) {
BlockScanlnfo info;
while ((info = blocklnfoSet.first()).lastScanTime < 0) {
delBlockInfo(info);
info.lastScanTime = lastScanTime;
lastScanTime += verifyInterval;
addBlocklnfo(info);
}
}
}
return true;

该方法用于为每一个Block分配上一次验证的时间。

private synchronized void adjustThrottler() {
//本次扫描验证还剰余的时间
long timeLeft = currentPeriodStart+scanPeriod - System.currentTimeMillis();
//根据本次验证扫描剩余的工作量和时间来计算速度
long bw = Math.max(bytesLeft*1000/timeLeft,  MIN_SCAN_RATE);
throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
}

该方法用于调整扫描速度。在一次Blocks扫描验证周期中,DataBlockScanner需要进行大量的磁盘 I/O,为了不影响Datanode节点上其他线程的工作资源,同时也为了自身工作的有效性,所以 DataBlockScanner采用了扫描验证速度控制器,并根据当前的工作量来控制当前数据块的验证速度。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-26 22:16:11  更:2021-12-26 22:16:42 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 3:46:13-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码