IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 2021年山东大学软件工程应用与实践项目——Hadoop源码分析(四) -> 正文阅读

[大数据]2021年山东大学软件工程应用与实践项目——Hadoop源码分析(四)

2021SC@SDUSC

Hadoop源码分析(四)——DFSClient HDFS客户端(2)


上篇博客分析了DFSClient内部类,本篇,我们来分析DFSOutputStream中的成员变量和成员方法。
DFSClient HDFSk客户端源码1

3.成员变量

private Socket s;
//与目标DataNode所建立起来的Socket连接。
boolean closed = false;
//该输出流是否关闭的标识。
private String src;
//在DataNode上创建的用于保存DFSOutputStream中的数据的文件的路径字符串。
private DataOutputStream blockstream;
//用于将数据写入到DataNode的socket输出流。
private DatalnputStream blockReplyStream;
//用于接收从DataNode返回的确认信息的socket输入流。
private Block block;
//当前正在被写入的数据块对象。
final private long blocksize;
//数据块的大小。
private DataChecksum checksum;
//数据的校验和。
private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
//数据队列,用于保存等待发送给DataNode的数据包Packet。
private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
//确认队列,用于保存还没有被DataNode确认接收的数据包Packeto
private Packet currentpacket = null;
//当前正在被写入的数据包。
private int maxPackets = 80;
//dataQueue和ackQueue队列中一共允许存在的数据包Packet的最多个数。
private DataStreamer streamer = new DataStreamer();
//streamer线程,该线程不停地从dataQueue中取出数据包,然后发送给DataNode管道中的第一个DataNode。
private ResponseProcessor response = null;
//response线程,该线程用于接收从DataNode返回的反馈信息。
private long currentSeqno = 0;
//当前正在被DataStreamer发送的Packet在整个数据Block中的序列号。
private long lastQueuedSeqno = -1;
//dataQueue队列中最后一个Packet的序列号。
private long lastAckedSeqno = -1;
//ackQueue队列中最后一个Packet的序列号。
private long bytesCurBlock - 0;
//写入到当前Block中的字节数。
private int packetsize = 0;
//被发送的Packet的大小,其中头信息的大小也包含在内。
private int chunksPerPacket = 0;
//每个Packet中chunk的数量。
private Datanodelnfo[] nodes = null;
//保存当前正在被写入的Block的DadaNode集合。
private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
//用于保存返回错误的响应信息的DataNode的集合。
private boolean persistBlocks = false;
// 被写入的Block是否被记录到NameNode的标识。
private int recoveryErrorCount = 0;
private int maxRecoveryErrorCount = 5;
//最多可以对写入错误进行恢复执行的次数。
private volatile boolean appendChunk = false;
//该变量用于标识是否向已经存在的数据Block中添加Chunk。
private long initialFileSize = 0;
//当文件被打开时的文件的大小。
private short blockReplication;
//数据Block的副本数。

4.构造方法

DFSOutputStream 中包含三个重载的构造方法。

private DFSOutputStream(String src, long blocksize, Progressable progress, int bytesPerChecksum, short replication) throws lOException { 
super(new CRC32(), bytesPerChecksum, 4);
this.src = src;
this.blocksize = blocksize;
this<blockReplication = replication;
this.progress = progress;
if (progress != null) {
LOG.debug ("Set non-null progress callback on DFSOutputStream " + src);
}
if ( bytesPerChecksum < 1 || blocksize % bytesPerChecksum != 0) {
throw new lOException(Hio.bytes.per.checksum(" + bytesPerChecksum + ") and blocksize(" + blocksize +) do not match. " + "blocksize should be a " + "multiple of io.bytes.per.checksum");
}
checksum = DataChecksum.newDataChecksrun(DataChecksum.CHECKSUM_CRC32, bytesPerChecksum);
}

私有的构造方法,如果指定的bytesPerChecksum小于1或者blockSize不能整除bytesPerChecksum, 那么则会抛出相应的异常。而且在方法中完成了对DataChecksum校验和对象的初始化工作。

DFSOutputStream(String src, FsPerndssion masked, boolean overwrite,
boolean createParent, short replication, long blocksize, Progressable progress, 
int buffersize, int bytesPerChecksum) throws IOException {
this(src, blockSize, progress, bytesPerChecksum, replication);
computePacketChunkSize(writePacketSize, bytesPerChecksum);
try {
namenode.create(
src, masked, clientName, overwrite, createParent, replication, blocksize);
} catch(RemoteException re) (
throw re.unwrapRemoteException(AccessControlException.class, FileMreadyExistsException.class,FileNotFoundException.class,NSQuotaExceededException.class,
DSQuotaExceededException.class);}
streamer.start();
}

该构造方法会被DFSClient的create方法来调用。首先会调用上面的私有构造方法,然后调用 computePacketChunkSize方法来计算岀Packet和Chunk的大小,之后调用ClientProtocol的create方法来 在NameNode的命名空间中创建一个用于接收DFSOutputStream中的数据的文件,最后启动用于向 DataNode写入数据的streamer线程。

另一个DFSOutputStream构造方法会被DFSClient的append方法来调用,它的处理逻辑如下:

this(src, stat.getBlockSize()r progress, bytesPerChecksum, stat.getReplication());

首先调用私有的DFSOutputStream构造方法。

initialFileSize = stat.getLen();

取得要进行写入操作的文件的原始大小。

if (lastBlock != null)

文件中的最后一个数据块必须被数据填满,如果文件的最后一?个数据块不为空,那么将数据写入到 最后一个数据块中。

block = lastBlock.getBlock();

取得最后一个数据Block对象。

accessToken = lastBlock.getBlockToken();

取得最后一个数据Block的访问令牌。

long usedlnLastBlock = stat.getLen() % blocksize;

计算出最后一个数据Block已经被使用的空间的大小。

int freelnLastBlock = (int)(blocksize - usedlnLastBlock);

计算出最后一个数据Block还未被使用的空间的大小。

int usedlnCksum = (int)(stat.getLen() % bytesPerChecksum);

计算出文件中校验和的个数。

int freelnCksum = bytesPerChecksum. - usedlnCksum;

计算出最后一个crc chunk数据块中空闲的空间的大小。

if (freelnLastBlock > blocksize) {
throw new IOException ("The last block for file" +src + " is full.");
}

如果最后一个数据Block还未被使用的空间的大小大于整个数据Block的大小,那么会抛出对应的 异常。

bytesCurBlock = lastBlock.getBlockSize();

取得最后一个数据Block的大小。

if (usedlnCksum > 0 && freelnCksim > 0) { computePacketChunkSize(0, freelnCksum);
resetChecksumChunk(freelnCksum);
this.appendChunk = true;
} else {
computePacketChunkSize(Math.min(writePacketSize, freelnLastBlock),bytesPerChecksum);
}

如果usedlnCksum和freelnCksum都大于0即表明最后一个Chunk还有空闲的空间,那么需要使用 将要发送的Packet来填充还有空闲空间的Chunk。

nodes = lastBlock.getLocations();

取得最后一个数据Block所在的DataNode集合。

errorlndex = -1;
用于标识写入数据出错的DataNode的索引。
if (nodes.length < 1) {
throw new IOException ("Unable to retrieve blocks loGations" +
"for append to last block" + block +" of file " + src);
}

如果取得的DataNode的集合小于1,则会抛出对应的异常。

while (processDatanodeError(true, true)) {
try {
Thread.sleep(1000);
} catch (InterruptedException e){
}}

循环的尝试建立起用于写入数据的DataNode管道。

streamer.start();

启动用于向DataNode写入数据的streamer线程。

else (
computePacketChunkSize(writePacketSize, bytesPerChecksum);
streamer.start();
}

如果文件中的最后一个数据Block已经没有空闲的空间了,那么直接调用computePacketChunkSize 方法来计算出Packet和Chunk的大小,并启动streamer线程。
computePacketChunkSize方法的处理逻辑如下:

int chunksize = csize + checksum.getChecksuinSize ();

chunk的大小为原生数据的大小加上与数据所对应的校验和的大小。

int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunksize, 1);
packetsize = n + chunkSize*chunksPerPacket;

packet的大小为packet的头部信息的大小加上packet中所有chunk的总大小。
private boolean createBlockOutputStream(Datanodelnfo[] nodes, String client, boolean recoveryFlag)方法 用于建立起到DataNode管道中的第一个DataNode的Socket连接。其中nodes参数代表所有接收数据的 DataNode集合,client是客户端名称,recoveryFlag用于标识是否为错误重新恢复建立的连接。主要的处 理逻辑如下:

InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
s = socketFactory.createSocket();

与远程DataNode管道中的第一个DataNode建立起Socket连接。

timeoutvalue = 3000 * nodes.length + socketTimeout;
NetUtils.connect(s, target, timeoutvalue);
s.setSoTimeout(timeoutValue);
s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);

设置Socket的超时时间和发送缓存区的大小。

DataOutputStream out = new DataOutputStream (new BufferedOutputStream (NetUtils.getOutputStream(s, writeTimeout), DataNode.SMALL_BUFFER_SIZE));

创建用于向远程的DataNode写入数据的输出流。

blockReplyStream = new DataInputStream(NetUtils.getlnputStream(s));

创建用于接收DataNode发送过来的响应信息的输入流。

out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol.OP_WRITE_BLOCK );
out.writeLong( block.getBlockld());
out.writeLong( block.getGenerationStamp());
out.writelnt( nodes.length );
out.writeBoolean( recoveryFlag );
Text.writeString( out, client );
out.writeBoolean(false);
out.writelnt( nodes.length - 1 );
for (int i = 1; i < nodes.length; i++) {
nodes[i].write(out);
}
accessToken.write(out);
checksum.writeHeader( out );
out.flush();
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-07 12:05:45  更:2021-12-07 12:07:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 14:02:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码