问题
- scroll查询指定_doc排序要比不指定sort或者指定某个字段sort要快很多。
- scroll查询每次依然要进行collect及计算打分,并非是第一次查询之后会缓存命中结果。
源码分析
首先看一下Elasticsearch的Collector,主要以下几个
这里主要介绍一下TopDocsCollector,TopDocsCollector类在收集完文档后,会返回一个TopDocs对象。TopDocs对象是收集后的文档信息按照某种规则有序的存放在TopDocs对象中,该对象是搜索结果的返回值。
根据不同排序(sorting)规则,TopDocsCollector派生出类图中的3个子类:
-
TopFieldCollector,有如下两个内部子类:
- SimpleFieldCollector
- PagingFieldCollector
-
TopScoreDocsCollector,有如下两个内部子类:
- SimpleTopScoreDocCollector
- PagingTopScoreDocCollector
-
DiversifiedTopDocsCollector
TopScoreDocCollector类的排序规则是先执行打分,分数相同的文档按文档号排序。
TopFieldCollector则是先按照指定的sort排序,值相同的则再按照文档号排序。
两个Collector的触发逻辑非常简单,就是判断searchContext.sort()是否为null。如果不为null,则走TopFieldCollector。指定_doc排序也属于不为null情况。
TopScoreDocsCollector
对于TopScoreDocsCollector的两个子类SimpleTopScoreDocCollector和PagingTopScoreDocCollector功能上的区别在于PagingTopScoreDocCollector是针对翻页请求的。代码上只是增加了一个对after的判断。
if (score > after.score || (score == after.score && doc <= afterDoc)) {
// hit was collected on a previous page
return;
}
对于使用TopScoreDocsCollector无论是否为翻页请求,每次请求都会扫描全部的命中文档并计算分值。
使用SimpleTopScoreDocCollector还是PagingTopScoreDocCollector是根据after是否为null决定的。
public static TopScoreDocCollector create(int numHits, ScoreDoc after) {
if (numHits <= 0) {
throw new IllegalArgumentException("numHits must be > 0; please use TotalHitCountCollector if you just need the total hit count");
}
if (after == null) {
return new SimpleTopScoreDocCollector(numHits);
} else {
return new PagingTopScoreDocCollector(numHits, after);
}
}
对于scroll请求,after = scrollContext.lastEmittedDoc;即上次翻页最大的ScoreDoc
TopFieldCollector
TopFieldCollectortor一样有两个子类,SimpleFieldCollector和PagingFieldCollector,区别也是一个针对分页一个不分页的情况。
if (topCmp > 0 || (topCmp == 0 && doc <= afterDoc)) {
// Already collected on a previous page
return;
}
是否翻页判断逻辑跟TopScoreDocsCollector也一样
public static TopFieldCollector create(Sort sort, int numHits, FieldDoc after,
boolean fillFields, boolean trackDocScores, boolean trackMaxScore)
throws IOException {
if (sort.fields.length == 0) {
throw new IllegalArgumentException("Sort must contain at least one field");
}
if (numHits <= 0) {
throw new IllegalArgumentException("numHits must be > 0; please use TotalHitCountCollector if you just need the total hit count");
}
FieldValueHitQueue<Entry> queue = FieldValueHitQueue.create(sort.fields, numHits);
if (after == null) {
return new SimpleFieldCollector(sort, queue, numHits, fillFields, trackDocScores, trackMaxScore);
} else {
if (after.fields == null) {
throw new IllegalArgumentException("after.fields wasn't set; you must pass fillFields=true for the previous search");
}
if (after.fields.length != sort.getSort().length) {
throw new IllegalArgumentException("after.fields has " + after.fields.length + " values but sort has " + sort.getSort().length);
}
return new PagingFieldCollector(sort, queue, after, numHits, fillFields, trackDocScores, trackMaxScore);
}
}
如何提前终止?
在lucene6.4.1版本中,无论是SimpleFieldCollector和PagingFieldCollector都和上面的TopScoreDocsCollector一样,都是不具备提前结束功能的,也就是说都会扫描完全部的命中文档。不过在更高版本的lucene中,具备了这个功能,判断依据是指定的search sort=index sort,如果一致,则在收集topN之后可以通过抛出CollectionTerminatedException异常的方式提前结束收集。
//该部分代码为lucene高版本代码
if (searchSortPartOfIndexSort == null) {
final Sort indexSort = context.reader().getMetaData().getSort();
searchSortPartOfIndexSort = canEarlyTerminate(sort, indexSort);
if (searchSortPartOfIndexSort) {
firstComparator.disableSkipping();
}
}
...
boolean thresholdCheck(int doc) throws IOException {
if (collectedAllCompetitiveHits || reverseMul * comparator.compareBottom(doc) <= 0) {
// since docs are visited in doc Id order, if compare is 0, it means
// this document is largest than anything else in the queue, and
// therefore not competitive.
if (searchSortPartOfIndexSort) {
if (hitsThresholdChecker.isThresholdReached()) {
totalHitsRelation = Relation.GREATER_THAN_OR_EQUAL_TO;
throw new CollectionTerminatedException();
} else {
collectedAllCompetitiveHits = true;
}
} else if (totalHitsRelation == TotalHits.Relation.EQUAL_TO) {
// we can start setting the min competitive score if the
// threshold is reached for the first time here.
updateMinCompetitiveScore(scorer);
}
return true;
}
return false;
}
Elasticsearch从6.x版本开始也支持了自定义写入的顺序,可以不是_doc而是某个字段值。
回到我们现有的Elasticsearch5.2.2对应Lucene6.4.1。虽然Lucene没有提供提前终止功能,但是Elasticsearch做了这个功能。
//QueryPhase
...
final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER;
if (terminateAfterSet) {
final Collector child = collector;
// throws Lucene.EarlyTerminationException when given count is reached
collector = Lucene.wrapCountBasedEarlyTerminatingCollector(collector, searchContext.terminateAfter());
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TERMINATE_AFTER_COUNT,
Collections.singletonList((InternalProfileCollector) child));
}
}
...
当terminateAfterSet=true时,进入Lucene.wrapCountBasedEarlyTerminatingCollector方法,该方法返回EarlyTerminatingCollector,该类重写了collect方法。在进入TopFieldCollector$PagingFieldCollector$1.collect方法之前会先调用这个collect方法。
//EarlyTerminatingCollector
...
@Override
public void collect(int doc) throws IOException {
leafCollector.collect(doc);
if (++count >= maxCountHits) {
throw new EarlyTerminationException("early termination [CountBased]");
}
}
...
方法很简单,判断当前搜集的count是否大于等于maxCountHits(即:searchContext.terminateAfter()),如果成立则抛出EarlyTerminationException,然后在QueryPhase捕获异常,查询结束。
//QueryPhase
...
try {
if (collector != null) {
if (doProfile) {
searchContext.getProfilers().getCurrentQueryProfiler().setCollector((InternalProfileCollector) collector);
}
searcher.search(query, collector);
}
} catch (TimeLimitingCollector.TimeExceededException e) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
queryResult.searchTimedOut(true);
} catch (Lucene.EarlyTerminationException e) {
assert terminateAfterSet : "EarlyTerminationException thrown even though terminateAfter wasn't set";
queryResult.terminatedEarly(true);
} finally {
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
}
...
到此,我们可以确定目前我们使用的Elasticsearch5.2.2是具备提前终止功能的,下面看触发条件:searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER;
从代码我们看出触发条件主要涉及一个变量searchContext.terminateAfter(),这个变量是在这里被定义的
//QueryPhase
...
if (searchContext.request().scroll() != null) {
numDocs = Math.min(searchContext.size(), totalNumDocs);
after = scrollContext.lastEmittedDoc;
if (returnsDocsInOrder(query, searchContext.sort())) {
if (scrollContext.totalHits == -1) {
// first round
assert scrollContext.lastEmittedDoc == null;
// there is not much that we can optimize here since we want to collect all
// documents in order to get the total number of hits
} else {
// now this gets interesting: since we sort in index-order, we can directly
// skip to the desired doc and stop collecting after ${size} matches
if (scrollContext.lastEmittedDoc != null) {
BooleanQuery bq = new BooleanQuery.Builder()
.add(query, BooleanClause.Occur.MUST)
.add(new MinDocQuery(after.doc + 1), BooleanClause.Occur.FILTER)
.build();
query = bq;
}
searchContext.terminateAfter(numDocs);
}
}
} else {
after = searchContext.searchAfter();
}
...
从代码中推断出触发终止的场景有三个必要条件:
- 必须是scroll请求
- 必须returnsDocsInOrder(query, searchContext.sort())成立,即必须指定sort为_doc
- scrollContext.totalHits != -1,即必须是翻到第二页往上的情况
以上全部成立后,设置terminateAfter为numDocs
final int totalNumDocs = searcher.getIndexReader().numDocs();
int numDocs = Math.min(searchContext.from() + searchContext.size(), totalNumDocs);
//如果为scroll请求
numDocs = Math.min(searchContext.size(), totalNumDocs);
至此,我们确认scroll请求指定_doc排序在从第二页开始,只会收集size个doc,性能上要好上很多。
如何跳跃?
对于scroll请求,由于scroll不支持向前翻页,所以每次查询对于已经查过的数据是没有必要收集的,Elasticsearch对于scroll请求,包装了一层MinDocQuery,用于过滤掉已经翻页过的数据,大大减少文档的命中数,避免收集无用doc,这对于越深度的翻页,性能差别越大。
after = scrollContext.lastEmittedDoc;
...
if (scrollContext.lastEmittedDoc != null) {
BooleanQuery bq = new BooleanQuery.Builder()
.add(query, BooleanClause.Occur.MUST)
.add(new MinDocQuery(after.doc + 1), BooleanClause.Occur.FILTER)
.build();
query = bq;
}
这样每次就可以跳过已经翻页过的数据,直接从lastEmittedDoc + 1开始匹配。其实这也是为什么scroll不能往回翻,只能往下翻且不能跳页的原因。
lucene收集命中文档在scoreAll方法中
static void scoreAll(LeafCollector collector, DocIdSetIterator iterator, TwoPhaseIterator twoPhase, Bits acceptDocs) throws IOException {
if (twoPhase == null) {
for (int doc = iterator.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = iterator.nextDoc()) {
if (acceptDocs == null || acceptDocs.get(doc)) {
collector.collect(doc);
}
}
} else {
// The scorer has an approximation, so run the approximation first, then check acceptDocs, then confirm
final DocIdSetIterator approximation = twoPhase.approximation();
for (int doc = approximation.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = approximation.nextDoc()) {
if ((acceptDocs == null || acceptDocs.get(doc)) && twoPhase.matches()) {
collector.collect(doc);
}
}
}
}
当执行iterator.nextDoc()时,实际上执行的是ConjunctionDISI.nextDoc()方法,在这里进入doNext方法,开始合并倒排表逻辑,这里就会进入MinDocQuery的advance。
@Override
public int nextDoc() throws IOException {
return doNext(lead1.nextDoc());
}
...
private int doNext(int doc) throws IOException {
advanceHead: for(;;) {
assert doc == lead1.docID();
// find agreement between the two iterators with the lower costs
// we special case them because they do not need the
// 'other.docID() < doc' check that the 'others' iterators need
final int next2 = lead2.advance(doc);
if (next2 != doc) {
doc = lead1.advance(next2);
if (next2 != doc) {
continue;
}
}
// then find agreement with other iterators
for (DocIdSetIterator other : others) {
// other.doc may already be equal to doc if we "continued advanceHead"
// on the previous iteration and the advance on the lead scorer exactly matched.
if (other.docID() < doc) {
final int next = other.advance(doc);
if (next > doc) {
// iterator beyond the current doc - advance lead and continue to the new highest doc.
doc = lead1.advance(next);
continue advanceHead;
}
}
}
// success - all iterators are on the same doc
return doc;
}
}
MinDocQuery重写了createWeight方法,重新定义了Scorer,在advance方法中,当doc=-1时,直接将doc跳到segmentMinDoc(segmentMinDoc=lastEmittedDoc+1)
//MinDocQuery
@Override
public Weight createWeight(IndexSearcher searcher, boolean needsScores) throws IOException {
return new ConstantScoreWeight(this) {
@Override
public Scorer scorer(LeafReaderContext context) throws IOException {
final int maxDoc = context.reader().maxDoc();
if (context.docBase + maxDoc <= minDoc) {
return null;
}
final int segmentMinDoc = Math.max(0, minDoc - context.docBase);
final DocIdSetIterator disi = new DocIdSetIterator() {
int doc = -1;
@Override
public int docID() {
return doc;
}
@Override
public int nextDoc() throws IOException {
return advance(doc + 1);
}
@Override
public int advance(int target) throws IOException {
assert target > doc;
if (doc == -1) {
// skip directly to minDoc
doc = Math.max(target, segmentMinDoc);
} else {
doc = target;
}
if (doc >= maxDoc) {
doc = NO_MORE_DOCS;
}
return doc;
}
@Override
public long cost() {
return maxDoc - segmentMinDoc;
}
};
return new ConstantScoreScorer(this, score(), disi);
}
};
}
这样在合并倒排表之后,实际上就不会再命中上一页的内容了。并且从逻辑上看,如果触发了提前终止,后续倒排表也没必要合并了,性能提升了不少。
其实从上面代码就可以发现,scroll和search_after查询实际上走的逻辑是一样的,都是通过一个after变量来翻页,只不过scroll的after=scrollContext.lastEmittedDoc(ScoreDoc),search_after的after = searchContext.searchAfter(),是FieldDoc,其实也是ScoreDoc,只不过是包含了sort的fieldName信息。最终都会收集全部的命中文档之后才能得到排序结果。只不过scroll对于_doc排序做了优化,性能要非常好。
至于search_after即使指定_doc排序,一样要收集全部的命中文档,因为search_after是动态的,所以使用MinDocQuery跳跃就不是很合适了。但是search_after是可以提前终止的,lucene的后续版本中也是支持了这点。Elasticsearch6.x开始也是支持index sort,写入时指定索引的顺序,这样当查询的时候指定sort=idnex sort,就可以触发提前终止,不再收集全部命中的文档。这点我以前实际应用过了。不过Elasticsearch5.2.2还不支持。
其实我们不妨大胆想象一下,如果我们可以修改lastEmittedDoc这个值呢?像search_after一样传进来,这样就能像search_after一样翻页了,只不过由于是快照,无法获取更新数据,并且保持SearchContext本身也是一种消耗。但是从性能上考虑,如果是_doc排序,scroll要远优秀与search_after。
scroll快照了哪些信息
public class ScrollContext {
public int totalHits = -1; //查询命中总数
public float maxScore; //上一页最大分数
public ScoreDoc lastEmittedDoc; //上一页最大文档
public Scroll scroll; //keepAlive,存活时间
}
ScrollContext只会存储 maxScore和lastEmittedDoc信息用于翻页,并没有找到存储其他信息。从代码分析中也主要用到了lastEmittedDoc和maxScore。 但是scroll请求保存的上下文信息不仅仅是ScrollContext,而是SearchContext,至于SearchContext里面保存的信息就非常多了,最关键的就是searcher,searcher里包含IndexReader的信息,比如leafContexts,leafContexts就是LeafReaderContext。而每次执行查询,都是要读取LeafReaderContext的,由于IndexReader一直保持在SearchContext里,而IndexReader的特性是一旦创建了IndexReader,对于后续索引的更新,是感知不到的,除非是重新打开一个reader或者使用DirectoryReader.openIfChanged(oldreader)。所以这就是为什么scroll查询无法感知索引的更新,而search_after却可以感知索引更新,因为每次search_after查询都是重新打开一个reader。
经过测试,即使scroll过程中触发了merge,被merge的segment文件也不会立即被删除,当然,新的segment也不会被发现。这也就造成了scroll无法感知数据的更新,所以这才是scroll所谓的快照概念。快照的其实是LeafReaderContext。
结论
- 查询不指定sort,走lucene自己的打分逻辑,TopScoreDocsCollector,每次查询都是扫描收集全部命中文档,search、scroll和search_after均是如此。
- 查询指定sort(非_doc),会进入TopFieldCollector,每次查询都是扫描收集全部命中文档,search、scroll和search_after均是如此。
- 查询指定sort=_doc,会进入TopFieldCollector,当为scroll查询时,收集完topN之后会触发提前终止,不会再收集后续的doc。非scroll查询即使指定sort=_doc也不会触发提前终止。这里的提前终止由当前版本es代码实现,并非lucene实现。
- Elasticsearch6.x版本开始引入了索引预排序的概念,使得索引顺序不再单一为_doc,可以自定义索引顺序。并且Lucene应该是从7.2.0开始引入了canEarlyTerminate模块从而判断searchSort和indexSort是否一致来决定是否提前结束。
- 当scroll查询指定sort=_doc时,会在query基础之上加一个MinDocQuery,从而能够实现跳过上一页已经返回的doc。
- scroll基于快照实现,即保存SearchContext一定存活时间,实际上快照的是LeafReaderContext,并非检索命中的结果,每次scroll查询仍然要做扫描、合并倒排表,收集打分等操作。并且在LeafReaderContext保持打开的过程中,对应的segments文件不会因为触发merge而被删除。
最后放一张官网的截图
|