IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Hadoop源码分析(21) -> 正文阅读

[大数据]Hadoop源码分析(21)

Hadoop源码分析(21)

读取journalnode数据

? 在文档(20)中分析journalnode是如何执行远程调用的getEditLogManifest方法的。当journalnode执行完成后会将结果返回个客户端即namenode。

? 这里接着从最初调用getEditLogManifest的selectInputStreams方法内容如下:

public void selectInputStreams(Collection<EditLogInputStream> streams,
      long fromTxnId, boolean inProgressOk) throws IOException {

    QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
        loggers.getEditLogManifest(fromTxnId, inProgressOk);
    Map<AsyncLogger, RemoteEditLogManifest> resps =
        loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
            "selectInputStreams");

    LOG.debug("selectInputStream manifests:\n" +
        Joiner.on("\n").withKeyValueSeparator(": ").join(resps));

    final PriorityQueue<EditLogInputStream> allStreams = 
        new PriorityQueue<EditLogInputStream>(64,
            JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
    for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
      AsyncLogger logger = e.getKey();
      RemoteEditLogManifest manifest = e.getValue();

      for (RemoteEditLog remoteLog : manifest.getLogs()) {
        URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());

        EditLogInputStream elis = EditLogFileInputStream.fromUrl(
            connectionFactory, url, remoteLog.getStartTxId(),
            remoteLog.getEndTxId(), remoteLog.isInProgress());
        allStreams.add(elis);
      }
    }
    JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId);
  }

? 这个方法在文档(12)中分析过,第5行的getEditLogManifest方法即之前分析的远程调用方法,第6行和第7行的waitForWriteQuorum方法,是用来等journalnode的返回的方法。其内容如下:

<V> Map<AsyncLogger, V> waitForWriteQuorum(QuorumCall<AsyncLogger, V> q,
      int timeoutMs, String operationName) throws IOException {
    int majority = getMajoritySize();
    try {
      q.waitFor(
          loggers.size(), // either all respond 
          majority, // or we get a majority successes
          majority, // or we get a majority failures,
          timeoutMs, operationName);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new IOException("Interrupted waiting " + timeoutMs + "ms for a " +
          "quorum of nodes to respond.");
    } catch (TimeoutException e) {
      throw new IOException("Timed out waiting " + timeoutMs + "ms for a " +
          "quorum of nodes to respond.");
    }

    if (q.countSuccesses() < majority) {
      q.rethrowException("Got too many exceptions to achieve quorum size " +
          getMajorityString());
    }

    return q.getResults();
  }

? 重点在第5行调用QuorumCall对象的waitFor方法等待返回,获取到返回后执行第24行的方法获取结果。

? waitFor方法的内容如下:

 public synchronized void waitFor(
      int minResponses, int minSuccesses, int maxExceptions,
      int millis, String operationName)
      throws InterruptedException, TimeoutException {
    long st = timer.monotonicNow();
    long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD);
    long et = st + millis;
    while (true) {
      restartQuorumStopWatch();
      checkAssertionErrors();
      if (minResponses > 0 && countResponses() >= minResponses) return;
      if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
      if (maxExceptions >= 0 && countExceptions() > maxExceptions) return;
      long now = timer.monotonicNow();

      if (now > nextLogTime) {
        long waited = now - st;
        String msg = String.format(
            "Waited %s ms (timeout=%s ms) for a response for %s",
            waited, millis, operationName);
        if (!successes.isEmpty()) {
          msg += ". Succeeded so far: [" + Joiner.on(",").join(successes.keySet()) + "]";
        }
        if (!exceptions.isEmpty()) {
          msg += ". Exceptions so far: [" + getExceptionMapString() + "]";
        }
        if (successes.isEmpty() && exceptions.isEmpty()) {
          msg += ". No responses yet.";
        }
        if (waited > millis * WAIT_PROGRESS_WARN_THRESHOLD) {
          QuorumJournalManager.LOG.warn(msg);
        } else {
          QuorumJournalManager.LOG.info(msg);
        }
        nextLogTime = now + WAIT_PROGRESS_INTERVAL_MILLIS;
      }
      long rem = et - now;
      if (rem <= 0) {
        // Increase timeout if a full GC occurred after restarting stopWatch
        long timeoutIncrease = getQuorumTimeoutIncreaseMillis(0, millis);
        if (timeoutIncrease > 0) {
          et += timeoutIncrease;
        } else {
          throw new TimeoutException();
        }
      }
      restartQuorumStopWatch();
      rem = Math.min(rem, nextLogTime - now);
      rem = Math.max(rem, 1);
      wait(rem);
      // Increase timeout if a full GC occurred after restarting stopWatch
      long timeoutIncrease = getQuorumTimeoutIncreaseMillis(-rem, millis);
      if (timeoutIncrease > 0) {
        et += timeoutIncrease;
      }
    }
  }

? 这个房会用来检查journalnode的返回,判断其是否超时。注意这里会有countResponses、countSuccesses、countExceptions等方法,这个是因为它判断的并不是某一个journalnode的返回,而是所有journalnode节点的返回。

? 然后selectInputStreams方法之后的代码便是在解析这些返回值,在解析这些代码前,需要先解析清楚这些返回值的具体情况。

? 在上文提到了journalnode中存储日志的过程中采取了过半原则,即有过半的journalnode成功存储了这个日志,便认为这次存储成功了。反之,不过半则认为存储失败。

? 在理想情况下是所有journalnode都存储成,如图中情况A所示。但实际会因为某些journalnode网络故障、异常退出或其他原因使得该服务器存储失败,然后在一段时间后该journalnode可通过重启或其他手段恢复,继续存储日志。这时会出现如图中情况B所示的情景。

日志存储情况示意图

? 在selectInputStreams方法中,在接收到journalnode的返回值后,首先会创建一个allStreams对象,这个对象是PriorityQueue类的对象,这是一个优先队列,会对传入队列中的数据进行排序,排序的方式有传入的对象决定,这里创建的是:JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR。其内容如下:

static final public Comparator<EditLogInputStream>
    EDIT_LOG_INPUT_STREAM_COMPARATOR = new Comparator<EditLogInputStream>() {
      @Override
      public int compare(EditLogInputStream a, EditLogInputStream b) {
        return ComparisonChain.start().
          compare(a.getFirstTxId(), b.getFirstTxId()).
          compare(b.getLastTxId(), a.getLastTxId()).
          result();
      }
    };

? 这里可以看见这个comparator主要是按照log文件的txid来排序的。

? 然后是selectInputStreams方法的第16行到第28行,这里会用两个for循环遍历所有journalnode的所有日志文件,并将其添加到上文建立的队列中。从上文的分析中可以知道每台journalnode都存储了一份日志文件,所以这里的日志文件会重复。因此在真正使用前需要合并重复日志。这里是同过JournalSet的chainAndMakeRedundantStreams方法来合并的。其内容如下:

public static void chainAndMakeRedundantStreams(
      Collection<EditLogInputStream> outStreams,
      PriorityQueue<EditLogInputStream> allStreams, long fromTxId) {
    // We want to group together all the streams that start on the same start
    // transaction ID.  To do this, we maintain an accumulator (acc) of all
    // the streams we've seen at a given start transaction ID.  When we see a
    // higher start transaction ID, we select a stream from the accumulator and
    // clear it.  Then we begin accumulating streams with the new, higher start
    // transaction ID.
    LinkedList<EditLogInputStream> acc =
        new LinkedList<EditLogInputStream>();
    EditLogInputStream elis;
    while ((elis = allStreams.poll()) != null) {
      if (acc.isEmpty()) {
        acc.add(elis);
      } else {
        EditLogInputStream accFirst = acc.get(0);
        long accFirstTxId = accFirst.getFirstTxId();
        if (accFirstTxId == elis.getFirstTxId()) {
          // if we have a finalized log segment available at this txid,
          // we should throw out all in-progress segments at this txid
          if (elis.isInProgress()) {
            if (accFirst.isInProgress()) {
              acc.add(elis);
            }
          } else {
            if (accFirst.isInProgress()) {
              acc.clear();
            }
            acc.add(elis);
          }
        } else if (accFirstTxId < elis.getFirstTxId()) {
          // try to read from the local logs first since the throughput should
          // be higher
          Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR);
          outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
          acc.clear();
          acc.add(elis);
        } else if (accFirstTxId > elis.getFirstTxId()) {
          throw new RuntimeException("sorted set invariants violated!  " +
              "Got stream with first txid " + elis.getFirstTxId() +
              ", but the last firstTxId was " + accFirstTxId);
        }
      }
    }
    if (!acc.isEmpty()) {
      Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR);
      outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
      acc.clear();
    }
  }

? 首先是第10行,这里创建了一个acc对象,这是一个linkedlist,主要用来来存储相同的日志文件。然后是第13行定义了一个while循环从上述的队列中获取数据,然后是while循环内从第14行到第44行,这里是一个if else语句。首先是第14行当acc为空的时候,直接将拿到的日志文件添加到acc中。然后是else的部分,即acc中有内容的情况。首先是第17行先从acc中获取其中的第一条数据,然后第18行取出数据的txid,用这个txid与循环遍历的日志文件的txid进行比较,这里主要分为三种情况:txid相同,acc中的txid小于当前日志的txid,acc中的txid大于日志文件中的txid。

? 当两种txid相同的时候,会执行第23行到31行中的内容。这里主要是对可能出现的inProgress文件进行处理。在当前namenode的启动流程中不会有inprogress文件,但在别的流程中可能出现。这里如果没有inprogress文件的话,实际可以直接将当前日志添加到acc文件中。但这里对有inprogress文件的情况作出了处理,首先是当前文件是inprogress的情况(第22行的if语句),这时如果acc中文件也是inprogress文件,则将当前文件添加到acc中,否则不做任何处理。

? 在journalnode实际可能出现多个inprogress文件,而除了最新的inprogress文件其余的都没有用。出现多个inprogress文件与journalnode的故障有关:在journalnode出现故障退出服务的时候,这时会有一个inprogress文件在日志目录中,这代表了它故障前存储的日志,但一个journalnode故障并不会影响journalnode集群的工作,集群继续工作将当前的inprogress文件写成完整的日志文件,然后创建新的inprogress文件。这时如果故障的journalnode重新恢复后,它持有的inprogress文件与其他journalnode的inprogress文件不同,两者的数据不一致,这时journalnode的处理是去同步集群的inprogress文件,而对其原有的inprogress文件没有其他处理,所以在journalnode的目录下可能出现多个inprogress文件。

? 然后回到上述代码,第26行的else语句,代表了当前文件不是inprogress文件的情况,这时如果acc中的文件是inprogress文件,则代表添加进acc的文件是上述所说的不需要的inprogress文件,所以需要将acc中的数据清除,即第28行的clear语句。然后第29行将当前文件添加到acc中。

?然后是第32行,这里处理acc中的txid小于当前文件的txid的情况。上文解析了传入的队列是一个优先队列,队列中的数据都是按txid排序的。当前文件的txid大于acc存储的txid的时候队列中没有与acc中txid相同的文件了。所以这里首先会对acc中的数据进行排序(第35行),然后将acc中的数据封装成一个新的流,并添加到outStreams中(第36行),然后将acc清空,将当前的日志文件添加到acc中,开始对下一个txid的数据进行处理。

? 然后是第39行对acc中txid大于当前文件的txid的情况。这种情况理论上不会出现,若出现了对其的处理是抛出异常。

? 最后是第46行到第50行,对while结束后acc中存储的数据进行处理。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-06 11:06:45  更:2022-05-06 11:07:32 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 8:04:55-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码