IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> hadoop put流程代码 -> 正文阅读

[大数据]hadoop put流程代码

hadoop fs -put xxxx

解析命令

FsShell类的main方法进入
创建实例FsShell shell = newShellInstance();
ToolRunner.run(shell, argv)
进入FsShell的run方法

先init方法,主要是commandFactory = new CommandFactory(getConf())

Command instance = commandFactory.getInstance(cmd);
instance.run(argv)
根据命令判断用Command 的那个实现类。模板模式。
判断出来的是 CopyCommands.Put 类

Command 的run方法主要调用两个方法

 public int run(String...argv) {
    LinkedList<String> args = new LinkedList<String>(Arrays.asList(argv));
    try {
      if (isDeprecated()) {
        displayWarning(
            "DEPRECATED: Please use '"+ getReplacementCommand() + "' instead.");
      }
      //参数的预处理,在这里会把参数中的一些参数给剥离出来
      processOptions(args);
      processRawArguments(args);
    } catch (CommandInterruptException e) {
      displayError("Interrupted");
      return 130;
    } catch (IOException e) {
      displayError(e);
    }
    
    return (numErrors == 0) ? exitCode : exitCodeForError();
  }

processOptions是个抽象方法,为CopyCommands.Put 类中的方法

protected void processOptions(LinkedList<String> args) throws IOException {
      CommandFormat cf =
          new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
      cf.parse(args);
      setOverwrite(cf.getOpt("f"));
      setPreserve(cf.getOpt("p"));
      setLazyPersist(cf.getOpt("l"));
      setDirectWrite(cf.getOpt("d"));
      getRemoteDestination(args);
      // should have a -r option
      setRecursive(true);
    }

CopyCommands.Put 类重写了processRawArguments里面调用processArguments方法
经过几轮调用到CommandWithDestination类的processPath

  protected void processPath(PathData src, PathData dst) throws IOException {
    if (src.stat.isSymlink()) {
      // TODO: remove when FileContext is supported, this needs to either
      // copy the symlink or deref the symlink
      throw new PathOperationException(src.toString());        
    } else if (src.stat.isFile()) {
      copyFileToTarget(src, dst);
    } else if (src.stat.isDirectory() && !isRecursive()) {
      throw new PathIsDirectoryException(src.toString());
    }
  }

copyFileToTarget(src, dst)这个方法是上传文件的核心

namenode判断是否存在目录,创建目录,生成块的元数据信息

TargetFileSystem类的writeStreamToFile
进过一系列调create方法,中间经过DistributedFileSystem类,最终到DFSOutputStream的newStreamForCreate方法

stat = dfsClient.namenode.create(src, masked, 
dfsClient.clientName,
new EnumSetWritable<>(flag), 
createParent,
replication,
blockSize, 
SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);

调用NameNodeRpcServer类的create方法,使用RPC调用远程服务端的方法
核心方法为

    try {
      PermissionStatus perm = new PermissionStatus(getRemoteUser()
          .getShortUserName(), null, masked);
      status = namesystem.startFile(src, perm, clientName, clientMachine,
          flag.get(), createParent, replication, blockSize, supportedVersions,
          ecPolicyName, cacheEntry != null);
    } finally {
      RetryCache.setState(cacheEntry, status != null, status);
    }

进入FSNamesystem类的startFileInt方法
先后调用 FSDirWriteFileOp类的startFile,addFile,newINodeFile->(FSDirectory)fsd.addINode->addLastINode
FSDirWriteFileOp.startFile后 fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry)写fseditLog

创建输出流

回到DFSOutputStream的newStreamForCreate方法,create结束后,创建输出流

 if(stat.getErasureCodingPolicy() != null) {
        out = new DFSStripedOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes);
      } else {
        out = new DFSOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes, true);
      }
      out.start();

new DFSOutputStream中调用computePacketChunkSize方法

out.start方法,调用一个DataStreamer线程run。
开始里面有空的dataQueue在等待。

数据流发送到datanode

从create一层层出来,到TargetFileSystem类

  void writeStreamToFile(InputStream in, PathData target,
        boolean lazyPersist, boolean direct)
        throws IOException {
      FSDataOutputStream out = null;
      try {
        out = create(target, lazyPersist, direct);
        IOUtils.copyBytes(in, out, getConf(), true);
      } finally {
        IOUtils.closeStream(out); // just in case copyBytes didn't
      }
    }

IOUtils.copyBytes为写文件方法,写完后_copy也就重命名为正常的文件夹名。
->FSOutputSummer.write-> FSOutputSummer.flushBuffer()->FSOutputSummer.writeChecksumChunks->DFSOutputStream.writeChunk

  protected synchronized void writeChunk(byte[] b, int offset, int len,
      byte[] checksum, int ckoff, int cklen) throws IOException {
    writeChunkPrepare(len, ckoff, cklen);

    currentPacket.writeChecksum(checksum, ckoff, cklen);
    currentPacket.writeData(b, offset, len);
    currentPacket.incNumChunks();
    getStreamer().incBytesCurBlock(len);

    // If packet is full, enqueue it for transmission
    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
        getStreamer().getBytesCurBlock() == blockSize) {
      enqueueCurrentPacketFull();
    }
  }

enqueueCurrentPacketFull()方法

  synchronized void enqueueCurrentPacketFull() throws IOException {
    LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
            + " appendChunk={}, {}", currentPacket, src, getStreamer()
            .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
        getStreamer());
    enqueueCurrentPacket();
    adjustChunkBoundary();
    endBlock();
  }

enqueueCurrentPacket调用了DataStreamer的waitAndQueuePacket方法。
如果队列不满调用queuePacket方法

  void queuePacket(DFSPacket packet) {
    synchronized (dataQueue) {
      if (packet == null) return;
      packet.addTraceParent(Tracer.getCurrentSpanId());
      dataQueue.addLast(packet);
      lastQueuedSeqno = packet.getSeqno();
      LOG.debug("Queued {}, {}", packet, this);
      dataQueue.notifyAll();
    }
  }

加到队列中,并通知
DataStreamer线程run这时就会唤醒,
调用dataQueue.getFirst()获取第一个数据包,续通过setPipeline(nextBlockOutputStream())创建管道。
nextBlockOutputStream获取块信息

1.调用 locateFollowingBlock方法->
DFSOutputStream.addBlock
里面开始和服务端通信dfsClient.namenode.addBlock
里面一直调用namesystem.getAdditionalBlock->FSDirWriteFileOp.chooseTargetForNewBlock->blockplacement.chooseTarget->BlockPlacementPolicy.chooseTarget->BlockPlacementPolicyDefault.chooseTarget->chooseTargetInOrder进行机架感知,选择机架
2.调用createBlockOutputStream方法

   OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
        InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
        IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
            unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
        unbufOut = saslStreams.out;
        unbufIn = saslStreams.in;
        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
            DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
        blockReplyStream = new DataInputStream(unbufIn);

        //
        // Xmit header info to datanode
        //

        BlockConstructionStage bcs = recoveryFlag ?
            stage.getRecoveryStage() : stage;

        // We cannot change the block length in 'block' as it counts the number
        // of bytes ack'ed.
        ExtendedBlock blockCopy = block.getCurrentBlock();
        blockCopy.setNumBytes(stat.getBlockSize());

        boolean[] targetPinnings = getPinnings(nodes);
        // send the request
        new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
            dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
            nodes.length, block.getNumBytes(), bytesSent, newGS,
            checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
            (targetPinnings != null && targetPinnings[0]), targetPinnings,
            nodeStorageIDs[0], nodeStorageIDs);

        // receive ack for connect
        BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
            PBHelperClient.vintPrefixed(blockReplyStream));
        Status pipelineStatus = resp.getStatus();
        firstBadLink = resp.getFirstBadLink();

创建输入输出流,通过socket方式发送数据流
send(out, Op.WRITE_BLOCK, proto.build());
如果失败
dfsClient.namenode.abandonBlock

datanode接受数据流

主要逻辑在DataXceiverServer类中run方法
先创建一个线程
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
会选择做什么事,WRITE_BLOCK
op = readOp();
processOp(op)

case WRITE_BLOCK:
opWriteBlock(in);
break;
在opWriteBlock方法中,调用DataXceiver类的writeBlock方法
1.写第一个机器块

setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,peer.getRemoteAddressString(),peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,clientname, srcDataNode, datanode, requestedChecksum,cachingStrategy, allowLazyPersist, pinning, storageId));

getBlockReceiver方法
new BlockReceiver对象,
case PIPELINE_SETUP_CREATE:管道建立阶段
replicaHandler = datanode.data.createRbw(storageType, storageId,
block, allowLazyPersist);
写文件到磁盘
实现类为FsDatasetImpl的recoverRbw方法
2.targets.length > 0找下一个机器
继续写

写完后DataStreamer线程run,
1.获取返回应答 通过initDataStreaming()开启线程
response = new ResponseProcessor(nodes);
response.start();

ResponseProcessor的run方法
ackQueue.removeFirst();
packetSendTime.remove(seqno);
dataQueue.notifyAll();

dataQueue.removeFirst();
ackQueue.addLast(one);
packetSendTime.put(one.getSeqno(), Time.monotonicNow());
dataQueue.notifyAll();
3.最终写数据
(DFSPacket)ne.writeTo(blockStream);

block是最大的一个单位,它是最终存储于DataNode上的数据粒度,由dfs.block.size参数决定,默认是64M;注:这个参数由客户端配置决定;
packet是中等的一个单位,它是数据由DFSClient流向DataNode的粒度,以dfs.write.packet.size参数为参考值,默认是64K;注:这个参数为参考值,是指真正在进行数据传输时,会以它为基准进行调整,调整的原因是一个packet有特定的结构,调整的目标是这个packet的大小刚好包含结构中的所有成员,同时也保证写到DataNode后当前block的大小不超过设定值;
chunk是最小的一个单位,它是DFSClient到DataNode数据传输中进行数据校验的粒度,由io.bytes.per.checksum参数决定,默认是512B;注:事实上一个chunk还包含4B的校验值,因而chunk写入packet时是516B;数据与检验值的比值为128:1,所以对于一个128M的block会有一个1M的校验文件与之对应;

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-04 11:16:50  更:2021-08-04 11:17:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/25 20:22:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码