IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Elasticsearch源码(二):scroll原理 -> 正文阅读

[大数据]Elasticsearch源码(二):scroll原理

问题

  1. scroll查询指定_doc排序要比不指定sort或者指定某个字段sort要快很多。
  2. scroll查询每次依然要进行collect及计算打分,并非是第一次查询之后会缓存命中结果。

源码分析

首先看一下Elasticsearch的Collector,主要以下几个
image

这里主要介绍一下TopDocsCollector,TopDocsCollector类在收集完文档后,会返回一个TopDocs对象。TopDocs对象是收集后的文档信息按照某种规则有序的存放在TopDocs对象中,该对象是搜索结果的返回值。

根据不同排序(sorting)规则,TopDocsCollector派生出类图中的3个子类:

  • TopFieldCollector,有如下两个内部子类:

    • SimpleFieldCollector
    • PagingFieldCollector
  • TopScoreDocsCollector,有如下两个内部子类:

    • SimpleTopScoreDocCollector
    • PagingTopScoreDocCollector
  • DiversifiedTopDocsCollector

TopScoreDocCollector类的排序规则是先执行打分,分数相同的文档按文档号排序。

TopFieldCollector则是先按照指定的sort排序,值相同的则再按照文档号排序。

两个Collector的触发逻辑非常简单,就是判断searchContext.sort()是否为null。如果不为null,则走TopFieldCollector。指定_doc排序也属于不为null情况。

TopScoreDocsCollector

对于TopScoreDocsCollector的两个子类SimpleTopScoreDocCollector和PagingTopScoreDocCollector功能上的区别在于PagingTopScoreDocCollector是针对翻页请求的。代码上只是增加了一个对after的判断。

  if (score > after.score || (score == after.score && doc <= afterDoc)) {
    // hit was collected on a previous page
    return;
  }

对于使用TopScoreDocsCollector无论是否为翻页请求,每次请求都会扫描全部的命中文档并计算分值。

使用SimpleTopScoreDocCollector还是PagingTopScoreDocCollector是根据after是否为null决定的。

  public static TopScoreDocCollector create(int numHits, ScoreDoc after) {

    if (numHits <= 0) {
      throw new IllegalArgumentException("numHits must be > 0; please use TotalHitCountCollector if you just need the total hit count");
    }

    if (after == null) {
      return new SimpleTopScoreDocCollector(numHits);
    } else {
      return new PagingTopScoreDocCollector(numHits, after);
    }
  }

对于scroll请求,after = scrollContext.lastEmittedDoc;即上次翻页最大的ScoreDoc

TopFieldCollector

TopFieldCollectortor一样有两个子类,SimpleFieldCollector和PagingFieldCollector,区别也是一个针对分页一个不分页的情况。

  if (topCmp > 0 || (topCmp == 0 && doc <= afterDoc)) {
    // Already collected on a previous page
    return;
  }

是否翻页判断逻辑跟TopScoreDocsCollector也一样

  public static TopFieldCollector create(Sort sort, int numHits, FieldDoc after,
      boolean fillFields, boolean trackDocScores, boolean trackMaxScore)
      throws IOException {

    if (sort.fields.length == 0) {
      throw new IllegalArgumentException("Sort must contain at least one field");
    }

    if (numHits <= 0) {
      throw new IllegalArgumentException("numHits must be > 0; please use TotalHitCountCollector if you just need the total hit count");
    }

    FieldValueHitQueue<Entry> queue = FieldValueHitQueue.create(sort.fields, numHits);

    if (after == null) {
      return new SimpleFieldCollector(sort, queue, numHits, fillFields, trackDocScores, trackMaxScore);
    } else {
      if (after.fields == null) {
        throw new IllegalArgumentException("after.fields wasn't set; you must pass fillFields=true for the previous search");
      }

      if (after.fields.length != sort.getSort().length) {
        throw new IllegalArgumentException("after.fields has " + after.fields.length + " values but sort has " + sort.getSort().length);
      }

      return new PagingFieldCollector(sort, queue, after, numHits, fillFields, trackDocScores, trackMaxScore);
    }
  }

如何提前终止?

在lucene6.4.1版本中,无论是SimpleFieldCollector和PagingFieldCollector都和上面的TopScoreDocsCollector一样,都是不具备提前结束功能的,也就是说都会扫描完全部的命中文档。不过在更高版本的lucene中,具备了这个功能,判断依据是指定的search sort=index sort,如果一致,则在收集topN之后可以通过抛出CollectionTerminatedException异常的方式提前结束收集。

//该部分代码为lucene高版本代码

    if (searchSortPartOfIndexSort == null) {
      final Sort indexSort = context.reader().getMetaData().getSort();
      searchSortPartOfIndexSort = canEarlyTerminate(sort, indexSort);
      if (searchSortPartOfIndexSort) {
        firstComparator.disableSkipping();
      }
    }
      
    ...
    
    boolean thresholdCheck(int doc) throws IOException {
      if (collectedAllCompetitiveHits || reverseMul * comparator.compareBottom(doc) <= 0) {
        // since docs are visited in doc Id order, if compare is 0, it means
        // this document is largest than anything else in the queue, and
        // therefore not competitive.
        if (searchSortPartOfIndexSort) {
          if (hitsThresholdChecker.isThresholdReached()) {
            totalHitsRelation = Relation.GREATER_THAN_OR_EQUAL_TO;
            throw new CollectionTerminatedException();
          } else {
            collectedAllCompetitiveHits = true;
          }
        } else if (totalHitsRelation == TotalHits.Relation.EQUAL_TO) {
          // we can start setting the min competitive score if the
          // threshold is reached for the first time here.
          updateMinCompetitiveScore(scorer);
        }
        return true;
      }
      return false;
    }

Elasticsearch从6.x版本开始也支持了自定义写入的顺序,可以不是_doc而是某个字段值。

回到我们现有的Elasticsearch5.2.2对应Lucene6.4.1。虽然Lucene没有提供提前终止功能,但是Elasticsearch做了这个功能。

    //QueryPhase
    
    ...
    
    final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER;
    if (terminateAfterSet) {
        final Collector child = collector;
        // throws Lucene.EarlyTerminationException when given count is reached

        collector = Lucene.wrapCountBasedEarlyTerminatingCollector(collector, searchContext.terminateAfter());
        if (doProfile) {
            collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TERMINATE_AFTER_COUNT,
                    Collections.singletonList((InternalProfileCollector) child));
        }
    }
    
    ...

当terminateAfterSet=true时,进入Lucene.wrapCountBasedEarlyTerminatingCollector方法,该方法返回EarlyTerminatingCollector,该类重写了collect方法。在进入TopFieldCollector$PagingFieldCollector$1.collect方法之前会先调用这个collect方法。

    //EarlyTerminatingCollector
    ...
    
    @Override
    public void collect(int doc) throws IOException {
        leafCollector.collect(doc);
    
        if (++count >= maxCountHits) {
            throw new EarlyTerminationException("early termination [CountBased]");
        }
    }
    
    ...

方法很简单,判断当前搜集的count是否大于等于maxCountHits(即:searchContext.terminateAfter()),如果成立则抛出EarlyTerminationException,然后在QueryPhase捕获异常,查询结束。

    //QueryPhase
    
    ...
    
    try {
        if (collector != null) {
            if (doProfile) {
                searchContext.getProfilers().getCurrentQueryProfiler().setCollector((InternalProfileCollector) collector);
            }
            searcher.search(query, collector);
        }
    } catch (TimeLimitingCollector.TimeExceededException e) {
        assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
        queryResult.searchTimedOut(true);
    } catch (Lucene.EarlyTerminationException e) {
        assert terminateAfterSet : "EarlyTerminationException thrown even though terminateAfter wasn't set";
        queryResult.terminatedEarly(true);
    } finally {
        searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
    }
    
    ...
    

到此,我们可以确定目前我们使用的Elasticsearch5.2.2是具备提前终止功能的,下面看触发条件:searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER;

从代码我们看出触发条件主要涉及一个变量searchContext.terminateAfter(),这个变量是在这里被定义的

    //QueryPhase
    
    ...
    
    if (searchContext.request().scroll() != null) {
        numDocs = Math.min(searchContext.size(), totalNumDocs);
        after = scrollContext.lastEmittedDoc;

        if (returnsDocsInOrder(query, searchContext.sort())) {
            if (scrollContext.totalHits == -1) {
                // first round
                assert scrollContext.lastEmittedDoc == null;
                // there is not much that we can optimize here since we want to collect all
                // documents in order to get the total number of hits
            } else {
                // now this gets interesting: since we sort in index-order, we can directly
                // skip to the desired doc and stop collecting after ${size} matches
                if (scrollContext.lastEmittedDoc != null) {
                    BooleanQuery bq = new BooleanQuery.Builder()
                        .add(query, BooleanClause.Occur.MUST)
                        .add(new MinDocQuery(after.doc + 1), BooleanClause.Occur.FILTER)
                        .build();
                    query = bq;
                }
                searchContext.terminateAfter(numDocs);
            }
        }
    } else {
        after = searchContext.searchAfter();
    }
    
    ...
    

从代码中推断出触发终止的场景有三个必要条件:

  1. 必须是scroll请求
  2. 必须returnsDocsInOrder(query, searchContext.sort())成立,即必须指定sort为_doc
  3. scrollContext.totalHits != -1,即必须是翻到第二页往上的情况

以上全部成立后,设置terminateAfter为numDocs

    final int totalNumDocs = searcher.getIndexReader().numDocs();
    int numDocs = Math.min(searchContext.from() + searchContext.size(), totalNumDocs);
    
    //如果为scroll请求
    numDocs = Math.min(searchContext.size(), totalNumDocs);

至此,我们确认scroll请求指定_doc排序在从第二页开始,只会收集size个doc,性能上要好上很多。

如何跳跃?

对于scroll请求,由于scroll不支持向前翻页,所以每次查询对于已经查过的数据是没有必要收集的,Elasticsearch对于scroll请求,包装了一层MinDocQuery,用于过滤掉已经翻页过的数据,大大减少文档的命中数,避免收集无用doc,这对于越深度的翻页,性能差别越大。

    after = scrollContext.lastEmittedDoc;
    
    ...
    
    if (scrollContext.lastEmittedDoc != null) {
        BooleanQuery bq = new BooleanQuery.Builder()
            .add(query, BooleanClause.Occur.MUST)
            .add(new MinDocQuery(after.doc + 1), BooleanClause.Occur.FILTER)
            .build();
        query = bq;
    }

这样每次就可以跳过已经翻页过的数据,直接从lastEmittedDoc + 1开始匹配。其实这也是为什么scroll不能往回翻,只能往下翻且不能跳页的原因。

lucene收集命中文档在scoreAll方法中

    static void scoreAll(LeafCollector collector, DocIdSetIterator iterator, TwoPhaseIterator twoPhase, Bits acceptDocs) throws IOException {
      if (twoPhase == null) {
        for (int doc = iterator.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = iterator.nextDoc()) {
          if (acceptDocs == null || acceptDocs.get(doc)) {
            collector.collect(doc);
          }
        }
      } else {
        // The scorer has an approximation, so run the approximation first, then check acceptDocs, then confirm
        final DocIdSetIterator approximation = twoPhase.approximation();
        for (int doc = approximation.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = approximation.nextDoc()) {
          if ((acceptDocs == null || acceptDocs.get(doc)) && twoPhase.matches()) {
            collector.collect(doc);
          }
        }
      }
    }

当执行iterator.nextDoc()时,实际上执行的是ConjunctionDISI.nextDoc()方法,在这里进入doNext方法,开始合并倒排表逻辑,这里就会进入MinDocQuery的advance。

  @Override
  public int nextDoc() throws IOException {
    return doNext(lead1.nextDoc());
  }

...

  private int doNext(int doc) throws IOException {
    advanceHead: for(;;) {
      assert doc == lead1.docID();

      // find agreement between the two iterators with the lower costs
      // we special case them because they do not need the
      // 'other.docID() < doc' check that the 'others' iterators need
      final int next2 = lead2.advance(doc);
      if (next2 != doc) {
        doc = lead1.advance(next2);
        if (next2 != doc) {
          continue;
        }
      }

      // then find agreement with other iterators
      for (DocIdSetIterator other : others) {
        // other.doc may already be equal to doc if we "continued advanceHead"
        // on the previous iteration and the advance on the lead scorer exactly matched.
        if (other.docID() < doc) {
          final int next = other.advance(doc);

          if (next > doc) {
            // iterator beyond the current doc - advance lead and continue to the new highest doc.
            doc = lead1.advance(next);
            continue advanceHead;
          }
        }
      }

      // success - all iterators are on the same doc
      return doc;
    }
  }

MinDocQuery重写了createWeight方法,重新定义了Scorer,在advance方法中,当doc=-1时,直接将doc跳到segmentMinDoc(segmentMinDoc=lastEmittedDoc+1)

 //MinDocQuery
 
    @Override
    public Weight createWeight(IndexSearcher searcher, boolean needsScores) throws IOException {

        return new ConstantScoreWeight(this) {
            @Override
            public Scorer scorer(LeafReaderContext context) throws IOException {
                final int maxDoc = context.reader().maxDoc();
                if (context.docBase + maxDoc <= minDoc) {
                    return null;
                }
                final int segmentMinDoc = Math.max(0, minDoc - context.docBase);
                final DocIdSetIterator disi = new DocIdSetIterator() {

                    int doc = -1;

                    @Override
                    public int docID() {
                        return doc;
                    }

                    @Override
                    public int nextDoc() throws IOException {
                        return advance(doc + 1);
                    }

                    @Override
                    public int advance(int target) throws IOException {
                        assert target > doc;
                        if (doc == -1) {
                            // skip directly to minDoc
                            doc = Math.max(target, segmentMinDoc);
                        } else {
                            doc = target;
                        }

                        if (doc >= maxDoc) {
                            doc = NO_MORE_DOCS;
                        }
                        return doc;
                    }

                    @Override
                    public long cost() {
                        return maxDoc - segmentMinDoc;
                    }

                };
                return new ConstantScoreScorer(this, score(), disi);
            }
        };
    }

这样在合并倒排表之后,实际上就不会再命中上一页的内容了。并且从逻辑上看,如果触发了提前终止,后续倒排表也没必要合并了,性能提升了不少。

其实从上面代码就可以发现,scroll和search_after查询实际上走的逻辑是一样的,都是通过一个after变量来翻页,只不过scroll的after=scrollContext.lastEmittedDoc(ScoreDoc),search_after的after = searchContext.searchAfter(),是FieldDoc,其实也是ScoreDoc,只不过是包含了sort的fieldName信息。最终都会收集全部的命中文档之后才能得到排序结果。只不过scroll对于_doc排序做了优化,性能要非常好。

至于search_after即使指定_doc排序,一样要收集全部的命中文档,因为search_after是动态的,所以使用MinDocQuery跳跃就不是很合适了。但是search_after是可以提前终止的,lucene的后续版本中也是支持了这点。Elasticsearch6.x开始也是支持index sort,写入时指定索引的顺序,这样当查询的时候指定sort=idnex sort,就可以触发提前终止,不再收集全部命中的文档。这点我以前实际应用过了。不过Elasticsearch5.2.2还不支持。

其实我们不妨大胆想象一下,如果我们可以修改lastEmittedDoc这个值呢?像search_after一样传进来,这样就能像search_after一样翻页了,只不过由于是快照,无法获取更新数据,并且保持SearchContext本身也是一种消耗。但是从性能上考虑,如果是_doc排序,scroll要远优秀与search_after。

scroll快照了哪些信息

public class ScrollContext {

    public int totalHits = -1;           //查询命中总数
    public float maxScore;               //上一页最大分数
    public ScoreDoc lastEmittedDoc;      //上一页最大文档
    public Scroll scroll;                //keepAlive,存活时间
}

ScrollContext只会存储 maxScore和lastEmittedDoc信息用于翻页,并没有找到存储其他信息。从代码分析中也主要用到了lastEmittedDoc和maxScore。
但是scroll请求保存的上下文信息不仅仅是ScrollContext,而是SearchContext,至于SearchContext里面保存的信息就非常多了,最关键的就是searcher,searcher里包含IndexReader的信息,比如leafContexts,leafContexts就是LeafReaderContext。而每次执行查询,都是要读取LeafReaderContext的,由于IndexReader一直保持在SearchContext里,而IndexReader的特性是一旦创建了IndexReader,对于后续索引的更新,是感知不到的,除非是重新打开一个reader或者使用DirectoryReader.openIfChanged(oldreader)。所以这就是为什么scroll查询无法感知索引的更新,而search_after却可以感知索引更新,因为每次search_after查询都是重新打开一个reader。

经过测试,即使scroll过程中触发了merge,被merge的segment文件也不会立即被删除,当然,新的segment也不会被发现。这也就造成了scroll无法感知数据的更新,所以这才是scroll所谓的快照概念。快照的其实是LeafReaderContext。

结论

  1. 查询不指定sort,走lucene自己的打分逻辑,TopScoreDocsCollector,每次查询都是扫描收集全部命中文档,search、scroll和search_after均是如此。
  2. 查询指定sort(非_doc),会进入TopFieldCollector,每次查询都是扫描收集全部命中文档,search、scroll和search_after均是如此。
  3. 查询指定sort=_doc,会进入TopFieldCollector,当为scroll查询时,收集完topN之后会触发提前终止,不会再收集后续的doc。非scroll查询即使指定sort=_doc也不会触发提前终止。这里的提前终止由当前版本es代码实现,并非lucene实现。
  4. Elasticsearch6.x版本开始引入了索引预排序的概念,使得索引顺序不再单一为_doc,可以自定义索引顺序。并且Lucene应该是从7.2.0开始引入了canEarlyTerminate模块从而判断searchSort和indexSort是否一致来决定是否提前结束。
  5. 当scroll查询指定sort=_doc时,会在query基础之上加一个MinDocQuery,从而能够实现跳过上一页已经返回的doc。
  6. scroll基于快照实现,即保存SearchContext一定存活时间,实际上快照的是LeafReaderContext,并非检索命中的结果,每次scroll查询仍然要做扫描、合并倒排表,收集打分等操作。并且在LeafReaderContext保持打开的过程中,对应的segments文件不会因为触发merge而被删除。

最后放一张官网的截图
image

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-15 00:05:37  更:2022-04-15 00:10:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 12:40:34-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码