IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Hadoop源码分析(12) -> 正文阅读

[大数据]Hadoop源码分析(12)

Hadoop源码分析(12)

1、 journalnode客户端

? 在文档(11)中分析了初始化editlog的方法。在初始化之前其会根据集 群的配置状态选择不同的方式来进行初始化。在HA状态下,其会调用一个 initJournals方法来进行初始化,这个方法会创建一个journalset对象,并创建 journalmanager对象传入journalset中,进行统一管理。

在文档(10)中分析了的在初始化完editlog后,接下来就要利用editlog获取 操作日志。在文档(10)中实现这个操作的是editlog的selectInputStreams方法, 这个方法的内容如下:

 public Collection<EditLogInputStream> selectInputStreams(
      long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
      boolean inProgressOk) throws IOException {

    List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
    synchronized(journalSetLock) {
      Preconditions.checkState(journalSet.isOpen(), "Cannot call " +
          "selectInputStreams() on closed FSEditLog");
      selectInputStreams(streams, fromTxId, inProgressOk);
    }

    try {
      checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
    } catch (IOException e) {
      if (recovery != null) {
        // If recovery mode is enabled, continue loading even if we know we
        // can't load up to toAtLeastTxId.
        LOG.error(e);
      } else {
        closeAllStreams(streams);
        throw e;
      }
    }
    return streams;
  }

? 这里的重点是第9行,这里调用了一个重载方法,其内容如下:

  @Override
  public void selectInputStreams(Collection<EditLogInputStream> streams,
      long fromTxId, boolean inProgressOk) throws IOException {
    journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
  }

? 这里很简单,就是调用了journalset的selectInputStreams方法。其内容如下:

  @Override
  public void selectInputStreams(Collection<EditLogInputStream> streams,
      long fromTxId, boolean inProgressOk) throws IOException {
    final PriorityQueue<EditLogInputStream> allStreams = 
        new PriorityQueue<EditLogInputStream>(64,
            EDIT_LOG_INPUT_STREAM_COMPARATOR);
    for (JournalAndStream jas : journals) {
      if (jas.isDisabled()) {
        LOG.info("Skipping jas " + jas + " since it's disabled");
        continue;
      }
      try {
        jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
      } catch (IOException ioe) {
        LOG.warn("Unable to determine input streams from " + jas.getManager() +
            ". Skipping.", ioe);
      }
    }
    chainAndMakeRedundantStreams(streams, allStreams, fromTxId);
  }

? 首先是第7行这里会直接遍历journals。这个journals是 journalset的属性,其类型是list。在文档(11)中提到在初始化editlog时, 调用了一个initJournals方法。这个方法里会调用journalset的add方法 存储创建的journalmanager。这个add方法实际是将数据存储到journals中, 这个方法的内容如下:

  void add(JournalManager j, boolean required, boolean shared) {
    JournalAndStream jas = new JournalAndStream(j, required, shared);
    journals.add(jas);
  }

? 这里可以看见它对传入journalmanager继续封装成了一个 JournalAndStream类,然后将创建的jas添加到journals中。这里 JournalAndStream的构造方法如下:

    public JournalAndStream(JournalManager manager, boolean required,
        boolean shared) {
      this.journal = manager;
      this.required = required;
      this.shared = shared;
    }

? 这里的构造方法只是单纯的赋值。/p>

?然后继续看selectInputStreams方法,在for里拿到journals中的 jas后,会先判断其是否可用(第8行),然后调用jas的getManager方法,再然后 调用其返回值的selectInputStreams方法(第13行)。 getManager方法很简单,会直接返回创建时传入的manager对象。其内容如下:

JournalManager getManager() {
      return journal;
    }

? 参考之前的文档可知,这里的journal实际是QuorumJournalManager, 所以这里实际是调用的QuorumJournalManager的selectInputStreams方法。 这个方法的内容如下:

public void selectInputStreams(Collection<EditLogInputStream> streams,
      long fromTxnId, boolean inProgressOk) throws IOException {

    QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
        loggers.getEditLogManifest(fromTxnId, inProgressOk);
    Map<AsyncLogger, RemoteEditLogManifest> resps =
        loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
            "selectInputStreams");

    LOG.debug("selectInputStream manifests:\n" +
        Joiner.on("\n").withKeyValueSeparator(": ").join(resps));

    final PriorityQueue<EditLogInputStream> allStreams = 
        new PriorityQueue<EditLogInputStream>(64,
            JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
    for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
      AsyncLogger logger = e.getKey();
      RemoteEditLogManifest manifest = e.getValue();

      for (RemoteEditLog remoteLog : manifest.getLogs()) {
        URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());

        EditLogInputStream elis = EditLogFileInputStream.fromUrl(
            connectionFactory, url, remoteLog.getStartTxId(),
            remoteLog.getEndTxId(), remoteLog.isInProgress());
        allStreams.add(elis);
      }
    }
    JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId);
  }

? 首先是第4行,这里会调用getEditLogManifest方法与journalnode 进行连接,获取journalnode存储的editlog列表。

? 然后是第6行,因为namenode和journalnode不在同一个jvm中, 甚至不在同一台机器中,所以namenode和journalnode是使用远程调用(RPC) 来进行通信的。因此,在这里需要调用waitForWriteQuorum方法来等远程调用 返回结果。这个方法很重要,如果在这里与journalnode通信失败,在HA模式下, namenode会自动退出。

? 然后是第16行使用for循环遍历所有的editlog,并通过URL的方法 获取到具体的editlog的输入流。注意这里是有两个for循环,这是因为editlog 是分段存储的,不同的分段通过txid来标识,而在journalnode中,每一个 journalnode都会存储一份editlog。所以这里需要用两个for循环每个 journalnode上的每个editlog分段。

? 最后再 调用JournalSet的chainAndMakeRedundantStreams检查输入流。

? 这里先分析getEditLogManifest方法,并详细解析hdfs的远程调用。 首先是getEditLogManifest方法的内容如下:

 public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
      long fromTxnId, boolean inProgressOk) {
    Map<AsyncLogger,
        ListenableFuture<RemoteEditLogManifest>> calls
        = Maps.newHashMap();
    for (AsyncLogger logger : loggers) {
      ListenableFuture<RemoteEditLogManifest> future =
          logger.getEditLogManifest(fromTxnId, inProgressOk);
      calls.put(logger, future);
    }
    return QuorumCall.create(calls);
  }

? 这里的方法也很简单。首先是第6行使用for循环遍历loggers对象, 这个对象在文档(11)中解析过,其中存储的是与journalnode对应的 IPCLoggerChannel对象。然后调用IPCLoggerChannel对象的 getEditLogManifest方法,并将其返回的Future对象放入map中。 最后再利用这个map创建一个QuorumCall。 IPCLoggerChannel的getEditLogManifest方法内容如下:

public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
      final long fromTxnId, final boolean inProgressOk) {
    return parallelExecutor.submit(new Callable<RemoteEditLogManifest>() {
      @Override
      public RemoteEditLogManifest call() throws IOException {
        GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
            journalId, fromTxnId, inProgressOk);
        // Update the http port, since we need this to build URLs to any of the
        // returned logs.
        constructHttpServerURI(ret);
        return PBHelper.convert(ret.getManifest());
      }
    });
  }

?这个方法实际就一行,使用parallelExecutor的submit方法提交了 一个匿名的callable方法。parallelExecutor是一个线程池,callable是 java中带返回值的线程接口。

? 然后再看callable的call方法,这里首先是第6行会首先调用一个 getProxy方法获取一个代理对象。然后再调用代理对象的getEditLogManifest方 法。这一步获取代理对象实际是在创建一个HDFS的远程调用客户端。

? 这一步若是执行成功后,便代表着远程调用执行成功。最后再将返回值封装便可。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-26 11:47:11  更:2022-04-26 11:49:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 10:54:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码