hadoop fs -put xxxx
解析命令
FsShell类的main方法进入 创建实例FsShell shell = newShellInstance(); ToolRunner.run(shell, argv) 进入FsShell的run方法
先init方法,主要是commandFactory = new CommandFactory(getConf())
Command instance = commandFactory.getInstance(cmd); instance.run(argv) 根据命令判断用Command 的那个实现类。模板模式。 判断出来的是 CopyCommands.Put 类
Command 的run方法主要调用两个方法
public int run(String...argv) {
LinkedList<String> args = new LinkedList<String>(Arrays.asList(argv));
try {
if (isDeprecated()) {
displayWarning(
"DEPRECATED: Please use '"+ getReplacementCommand() + "' instead.");
}
processOptions(args);
processRawArguments(args);
} catch (CommandInterruptException e) {
displayError("Interrupted");
return 130;
} catch (IOException e) {
displayError(e);
}
return (numErrors == 0) ? exitCode : exitCodeForError();
}
processOptions是个抽象方法,为CopyCommands.Put 类中的方法
protected void processOptions(LinkedList<String> args) throws IOException {
CommandFormat cf =
new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
cf.parse(args);
setOverwrite(cf.getOpt("f"));
setPreserve(cf.getOpt("p"));
setLazyPersist(cf.getOpt("l"));
setDirectWrite(cf.getOpt("d"));
getRemoteDestination(args);
setRecursive(true);
}
CopyCommands.Put 类重写了processRawArguments里面调用processArguments方法 经过几轮调用到CommandWithDestination类的processPath
protected void processPath(PathData src, PathData dst) throws IOException {
if (src.stat.isSymlink()) {
throw new PathOperationException(src.toString());
} else if (src.stat.isFile()) {
copyFileToTarget(src, dst);
} else if (src.stat.isDirectory() && !isRecursive()) {
throw new PathIsDirectoryException(src.toString());
}
}
copyFileToTarget(src, dst)这个方法是上传文件的核心
namenode判断是否存在目录,创建目录,生成块的元数据信息
TargetFileSystem类的writeStreamToFile 进过一系列调create方法,中间经过DistributedFileSystem类,最终到DFSOutputStream的newStreamForCreate方法
stat = dfsClient.namenode.create(src, masked,
dfsClient.clientName,
new EnumSetWritable<>(flag),
createParent,
replication,
blockSize,
SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
调用NameNodeRpcServer类的create方法,使用RPC调用远程服务端的方法 核心方法为
try {
PermissionStatus perm = new PermissionStatus(getRemoteUser()
.getShortUserName(), null, masked);
status = namesystem.startFile(src, perm, clientName, clientMachine,
flag.get(), createParent, replication, blockSize, supportedVersions,
ecPolicyName, cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, status != null, status);
}
进入FSNamesystem类的startFileInt方法 先后调用 FSDirWriteFileOp类的startFile,addFile,newINodeFile->(FSDirectory)fsd.addINode->addLastINode FSDirWriteFileOp.startFile后 fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry)写fseditLog
创建输出流
回到DFSOutputStream的newStreamForCreate方法,create结束后,创建输出流
if(stat.getErasureCodingPolicy() != null) {
out = new DFSStripedOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
} else {
out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes, true);
}
out.start();
new DFSOutputStream中调用computePacketChunkSize方法
out.start方法,调用一个DataStreamer线程run。 开始里面有空的dataQueue在等待。
数据流发送到datanode
从create一层层出来,到TargetFileSystem类
void writeStreamToFile(InputStream in, PathData target,
boolean lazyPersist, boolean direct)
throws IOException {
FSDataOutputStream out = null;
try {
out = create(target, lazyPersist, direct);
IOUtils.copyBytes(in, out, getConf(), true);
} finally {
IOUtils.closeStream(out);
}
}
IOUtils.copyBytes为写文件方法,写完后_copy也就重命名为正常的文件夹名。 ->FSOutputSummer.write-> FSOutputSummer.flushBuffer()->FSOutputSummer.writeChecksumChunks->DFSOutputStream.writeChunk
protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
writeChunkPrepare(len, ckoff, cklen);
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.incNumChunks();
getStreamer().incBytesCurBlock(len);
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
enqueueCurrentPacketFull();
}
}
enqueueCurrentPacketFull()方法
synchronized void enqueueCurrentPacketFull() throws IOException {
LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
+ " appendChunk={}, {}", currentPacket, src, getStreamer()
.getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
getStreamer());
enqueueCurrentPacket();
adjustChunkBoundary();
endBlock();
}
enqueueCurrentPacket调用了DataStreamer的waitAndQueuePacket方法。 如果队列不满调用queuePacket方法
void queuePacket(DFSPacket packet) {
synchronized (dataQueue) {
if (packet == null) return;
packet.addTraceParent(Tracer.getCurrentSpanId());
dataQueue.addLast(packet);
lastQueuedSeqno = packet.getSeqno();
LOG.debug("Queued {}, {}", packet, this);
dataQueue.notifyAll();
}
}
加到队列中,并通知 DataStreamer线程run这时就会唤醒, 调用dataQueue.getFirst()获取第一个数据包,续通过setPipeline(nextBlockOutputStream())创建管道。 nextBlockOutputStream获取块信息
1.调用 locateFollowingBlock方法-> DFSOutputStream.addBlock 里面开始和服务端通信dfsClient.namenode.addBlock 里面一直调用namesystem.getAdditionalBlock->FSDirWriteFileOp.chooseTargetForNewBlock->blockplacement.chooseTarget->BlockPlacementPolicy.chooseTarget->BlockPlacementPolicyDefault.chooseTarget->chooseTargetInOrder进行机架感知,选择机架 2.调用createBlockOutputStream方法
OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
blockReplyStream = new DataInputStream(unbufIn);
BlockConstructionStage bcs = recoveryFlag ?
stage.getRecoveryStage() : stage;
ExtendedBlock blockCopy = block.getCurrentBlock();
blockCopy.setNumBytes(stat.getBlockSize());
boolean[] targetPinnings = getPinnings(nodes);
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS,
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
(targetPinnings != null && targetPinnings[0]), targetPinnings,
nodeStorageIDs[0], nodeStorageIDs);
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(blockReplyStream));
Status pipelineStatus = resp.getStatus();
firstBadLink = resp.getFirstBadLink();
创建输入输出流,通过socket方式发送数据流 send(out, Op.WRITE_BLOCK, proto.build()); 如果失败 dfsClient.namenode.abandonBlock
datanode接受数据流
主要逻辑在DataXceiverServer类中run方法 先创建一个线程 new Daemon(datanode.threadGroup, DataXceiver.create(peer, datanode, this)) .start(); 会选择做什么事,WRITE_BLOCK op = readOp(); processOp(op)
case WRITE_BLOCK: opWriteBlock(in); break; 在opWriteBlock方法中,调用DataXceiver类的writeBlock方法 1.写第一个机器块
setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,peer.getRemoteAddressString(),peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,clientname, srcDataNode, datanode, requestedChecksum,cachingStrategy, allowLazyPersist, pinning, storageId));
getBlockReceiver方法 new BlockReceiver对象, case PIPELINE_SETUP_CREATE:管道建立阶段 replicaHandler = datanode.data.createRbw(storageType, storageId, block, allowLazyPersist); 写文件到磁盘 实现类为FsDatasetImpl的recoverRbw方法 2.targets.length > 0找下一个机器 继续写
写完后DataStreamer线程run, 1.获取返回应答 通过initDataStreaming()开启线程 response = new ResponseProcessor(nodes); response.start();
ResponseProcessor的run方法 ackQueue.removeFirst(); packetSendTime.remove(seqno); dataQueue.notifyAll();
dataQueue.removeFirst(); ackQueue.addLast(one); packetSendTime.put(one.getSeqno(), Time.monotonicNow()); dataQueue.notifyAll(); 3.最终写数据 (DFSPacket)ne.writeTo(blockStream);
block是最大的一个单位,它是最终存储于DataNode上的数据粒度,由dfs.block.size参数决定,默认是64M;注:这个参数由客户端配置决定; packet是中等的一个单位,它是数据由DFSClient流向DataNode的粒度,以dfs.write.packet.size参数为参考值,默认是64K;注:这个参数为参考值,是指真正在进行数据传输时,会以它为基准进行调整,调整的原因是一个packet有特定的结构,调整的目标是这个packet的大小刚好包含结构中的所有成员,同时也保证写到DataNode后当前block的大小不超过设定值; chunk是最小的一个单位,它是DFSClient到DataNode数据传输中进行数据校验的粒度,由io.bytes.per.checksum参数决定,默认是512B;注:事实上一个chunk还包含4B的校验值,因而chunk写入packet时是516B;数据与检验值的比值为128:1,所以对于一个128M的block会有一个1M的校验文件与之对应;
|