2021SC@SDUSC
Hadoop源码分析(四)——DFSClient HDFS客户端(2)
上篇博客分析了DFSClient内部类,本篇,我们来分析DFSOutputStream中的成员变量和成员方法。
DFSClient HDFSk客户端源码1
3.成员变量
private Socket s;
boolean closed = false;
private String src;
private DataOutputStream blockstream;
private DatalnputStream blockReplyStream;
private Block block;
final private long blocksize;
private DataChecksum checksum;
private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
private Packet currentpacket = null;
private int maxPackets = 80;
private DataStreamer streamer = new DataStreamer();
private ResponseProcessor response = null;
private long currentSeqno = 0;
private long lastQueuedSeqno = -1;
private long lastAckedSeqno = -1;
private long bytesCurBlock - 0;
private int packetsize = 0;
private int chunksPerPacket = 0;
private Datanodelnfo[] nodes = null;
private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
private boolean persistBlocks = false;
private int recoveryErrorCount = 0;
private int maxRecoveryErrorCount = 5;
private volatile boolean appendChunk = false;
private long initialFileSize = 0;
private short blockReplication;
4.构造方法
DFSOutputStream 中包含三个重载的构造方法。
private DFSOutputStream(String src, long blocksize, Progressable progress, int bytesPerChecksum, short replication) throws lOException {
super(new CRC32(), bytesPerChecksum, 4);
this.src = src;
this.blocksize = blocksize;
this<blockReplication = replication;
this.progress = progress;
if (progress != null) {
LOG.debug ("Set non-null progress callback on DFSOutputStream " + src);
}
if ( bytesPerChecksum < 1 || blocksize % bytesPerChecksum != 0) {
throw new lOException(Hio.bytes.per.checksum(" + bytesPerChecksum + ") and blocksize(" + blocksize +
”) do not match. " + "blocksize should be a " + "multiple of io.bytes.per.checksum");
}
checksum = DataChecksum.newDataChecksrun(DataChecksum.CHECKSUM_CRC32, bytesPerChecksum);
}
私有的构造方法,如果指定的bytesPerChecksum小于1或者blockSize不能整除bytesPerChecksum, 那么则会抛出相应的异常。而且在方法中完成了对DataChecksum校验和对象的初始化工作。
DFSOutputStream(String src, FsPerndssion masked, boolean overwrite,
boolean createParent, short replication, long blocksize, Progressable progress,
int buffersize, int bytesPerChecksum) throws IOException {
this(src, blockSize, progress, bytesPerChecksum, replication);
computePacketChunkSize(writePacketSize, bytesPerChecksum);
try {
namenode.create(
src, masked, clientName, overwrite, createParent, replication, blocksize);
} catch(RemoteException re) (
throw re.unwrapRemoteException(AccessControlException.class, FileMreadyExistsException.class,FileNotFoundException.class,NSQuotaExceededException.class,
DSQuotaExceededException.class);}
streamer.start();
}
该构造方法会被DFSClient的create方法来调用。首先会调用上面的私有构造方法,然后调用 computePacketChunkSize方法来计算岀Packet和Chunk的大小,之后调用ClientProtocol的create方法来 在NameNode的命名空间中创建一个用于接收DFSOutputStream中的数据的文件,最后启动用于向 DataNode写入数据的streamer线程。
另一个DFSOutputStream构造方法会被DFSClient的append方法来调用,它的处理逻辑如下:
this(src, stat.getBlockSize()r progress, bytesPerChecksum, stat.getReplication());
首先调用私有的DFSOutputStream构造方法。
initialFileSize = stat.getLen();
取得要进行写入操作的文件的原始大小。
if (lastBlock != null)
文件中的最后一个数据块必须被数据填满,如果文件的最后一?个数据块不为空,那么将数据写入到 最后一个数据块中。
block = lastBlock.getBlock();
取得最后一个数据Block对象。
accessToken = lastBlock.getBlockToken();
取得最后一个数据Block的访问令牌。
long usedlnLastBlock = stat.getLen() % blocksize;
计算出最后一个数据Block已经被使用的空间的大小。
int freelnLastBlock = (int)(blocksize - usedlnLastBlock);
计算出最后一个数据Block还未被使用的空间的大小。
int usedlnCksum = (int)(stat.getLen() % bytesPerChecksum);
计算出文件中校验和的个数。
int freelnCksum = bytesPerChecksum. - usedlnCksum;
计算出最后一个crc chunk数据块中空闲的空间的大小。
if (freelnLastBlock > blocksize) {
throw new IOException ("The last block for file" +src + " is full.");
}
如果最后一个数据Block还未被使用的空间的大小大于整个数据Block的大小,那么会抛出对应的 异常。
bytesCurBlock = lastBlock.getBlockSize();
取得最后一个数据Block的大小。
if (usedlnCksum > 0 && freelnCksim > 0) { computePacketChunkSize(0, freelnCksum);
resetChecksumChunk(freelnCksum);
this.appendChunk = true;
} else {
computePacketChunkSize(Math.min(writePacketSize, freelnLastBlock),bytesPerChecksum);
}
如果usedlnCksum和freelnCksum都大于0即表明最后一个Chunk还有空闲的空间,那么需要使用 将要发送的Packet来填充还有空闲空间的Chunk。
nodes = lastBlock.getLocations();
取得最后一个数据Block所在的DataNode集合。
errorlndex = -1;
用于标识写入数据出错的DataNode的索引。
if (nodes.length < 1) {
throw new IOException ("Unable to retrieve blocks loGations" +
"for append to last block" + block +" of file " + src);
}
如果取得的DataNode的集合小于1,则会抛出对应的异常。
while (processDatanodeError(true, true)) {
try {
Thread.sleep(1000);
} catch (InterruptedException e){
}}
循环的尝试建立起用于写入数据的DataNode管道。
streamer.start();
启动用于向DataNode写入数据的streamer线程。
else (
computePacketChunkSize(writePacketSize, bytesPerChecksum);
streamer.start();
}
如果文件中的最后一个数据Block已经没有空闲的空间了,那么直接调用computePacketChunkSize 方法来计算出Packet和Chunk的大小,并启动streamer线程。 computePacketChunkSize方法的处理逻辑如下:
int chunksize = csize + checksum.getChecksuinSize ();
chunk的大小为原生数据的大小加上与数据所对应的校验和的大小。
int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunksize, 1);
packetsize = n + chunkSize*chunksPerPacket;
packet的大小为packet的头部信息的大小加上packet中所有chunk的总大小。 private boolean createBlockOutputStream(Datanodelnfo[] nodes, String client, boolean recoveryFlag)方法 用于建立起到DataNode管道中的第一个DataNode的Socket连接。其中nodes参数代表所有接收数据的 DataNode集合,client是客户端名称,recoveryFlag用于标识是否为错误重新恢复建立的连接。主要的处 理逻辑如下:
InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
s = socketFactory.createSocket();
与远程DataNode管道中的第一个DataNode建立起Socket连接。
timeoutvalue = 3000 * nodes.length + socketTimeout;
NetUtils.connect(s, target, timeoutvalue);
s.setSoTimeout(timeoutValue);
s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
设置Socket的超时时间和发送缓存区的大小。
DataOutputStream out = new DataOutputStream (new BufferedOutputStream (NetUtils.getOutputStream(s, writeTimeout), DataNode.SMALL_BUFFER_SIZE));
创建用于向远程的DataNode写入数据的输出流。
blockReplyStream = new DataInputStream(NetUtils.getlnputStream(s));
创建用于接收DataNode发送过来的响应信息的输入流。
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol.OP_WRITE_BLOCK );
out.writeLong( block.getBlockld());
out.writeLong( block.getGenerationStamp());
out.writelnt( nodes.length );
out.writeBoolean( recoveryFlag );
Text.writeString( out, client );
out.writeBoolean(false);
out.writelnt( nodes.length - 1 );
for (int i = 1; i < nodes.length; i++) {
nodes[i].write(out);
}
accessToken.write(out);
checksum.writeHeader( out );
out.flush();
|