参考:Elasticsearch Search Scroll API(滚动查询) - 简书
Elasticsearch 中,传统的分页查询使用from+size 的模式,from 就是页码,从 0 开始。默认情况下,当(from+1)*size 大于 10000 时,也就是已查询的总数据量大于 10000 时,会出现异常。
如下,用循环模拟一个连续分页查询:
public void search() {
// 记录页码
int page = 0;
// 记录已经查询到总数据量
long total = 0;
while (true) {
NativeSearchQuery nativeSearchQuery = new NativeSearchQueryBuilder()
// 设置分页
.withPageable(PageRequest.of(page, 1000))
.withSort(new FieldSortBuilder("commentCount").order(SortOrder.DESC))
.build();
SearchHits<Book> searchHits = elasticsearchRestTemplate.search(nativeSearchQuery, Book.class);
if (!searchHits.hasSearchHits()) {
break;
}
for (SearchHit<Book> searchHit : searchHits.getSearchHits()) {
Book book = searchHit.getContent();
}
page++;
System.out.println(page);
System.out.println(total += searchHits.getSearchHits().size());
}
}
最终当 page 等于 10 时会抛出如下异常:
Caused by: ElasticsearchException[Elasticsearch exception [type=illegal_argument_exception, reason=Result window is too large, from + size must be less than or equal to: [10000] but was [11000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.]]
从异常信息中,我们可以发现官方给我们提供了两种方案来解决这个问题:
1、max_result_window
- 将 Elasticsearch 配置参数
index.max_result_window 修改为大于 100000 的值,对应的 RESTful API 如下:
PUT book/_settings
{
"index": {
"max_result_window": 1000000
}
}
虽然可以通过修改index.max_result_window 来解决查询时数据量的限制,但是这并不是不推荐的做法,当数据量达到百万、千万级别时,使用from+size 模式查询时性能会越来越差,每次查询的耗时也会越来越久,严重影响体验,同时对 CPU 和内存的消耗也很大的。
2、scroll api
如果需要查询大量的数据,可以考虑使用 Search Scroll API,这是一种更加高效的方式。
如果直接使用 Java Client,可以参考官方的 API 文档: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.9/java-rest-high-search-scroll.html
我们这里还是和 SpringBoot 整合去使用,其实核心的用法都是很类似的。如下同样模拟一个连分页查询:
public void scrollSearch() {
NativeSearchQuery nativeSearchQuery = new NativeSearchQueryBuilder()
.withSort(new FieldSortBuilder("commentCount").order(SortOrder.DESC))
.build();
// 设置每页数据量
nativeSearchQuery.setMaxResults(1000);
long scrollTimeInMillis = 60 * 1000;
// 第一次查询
SearchScrollHits<Book> searchScrollHits = elasticsearchRestTemplate.searchScrollStart(scrollTimeInMillis, nativeSearchQuery, Book.class, IndexCoordinates.of("book"));
String scrollId = searchScrollHits.getScrollId();
while (searchScrollHits.hasSearchHits()) {
System.out.println(total += searchScrollHits.getSearchHits().size());
for (SearchHit<Book> searchHit : searchScrollHits.getSearchHits()) {
Book book = searchHit.getContent();
}
// 后续查询
searchScrollHits = elasticsearchRestTemplate.searchScrollContinue(scrollId, scrollTimeInMillis, Book.class, IndexCoordinates.of("book"));
scrollId = searchScrollHits.getScrollId();
}
List<String> scrollIds = new ArrayList<>();
scrollIds.add(scrollId);
// 清除 scroll
elasticsearchRestTemplate.searchScrollClear(scrollIds);
}
以下几点需要注意:
setMaxResults(1000) 用来设置查询时每页的数据量,我这里使用 Elasticsearch7.9 有这个方法,如果其它旧版本没有这个方法,可以使用PageRequest.of(0, 1000) 来设置,注意页码要为 0。- 第一次查询使用
searchScrollStart() ,后续查询使用searchScrollContinue() ,查询结果中都携带了一个scrollId 。 - 除了第一次查询外,后续的查询都需要携带
scrollId ,可以理解为游标 ,用它来控制分页。和from+size 模式中页码是一个作用。 scrollTimeInMillis ,表示查询结果中scrollId 的有效时间,单位毫秒,可根据实际情况设置。- 查询结束后,需要使用
searchScrollClear() 清除 scroll。 - 在
from+size 分页查询模式中,我们可以指定任意合理的页码,实现跳页查询;但使用scroll api 就无法实现跳页查询了,因为除了第一次查询外的其它查询都要依赖上一次查询返回的scrollId ,这一点需要注意。
原文中可能会空查一次,少许修改代码,如下:
void searchScroll(){
NativeSearchQuery query = new NativeSearchQuery(QueryBuilders.matchAllQuery());
query.setMaxResults(1);//设置每页数据量
query.addSort(Sort.by(Sort.Direction.DESC,"age"));
long scrollTimeInMillis=5_000;
long currentTotal=0;
int pageNo=1;
List<String> scrollIdList = new ArrayList<>();
//scroll一共有三个方法:searchScrollStart(第一次查询)、searchScrollContinue(第二次到最后一次)、searchScrollClear(查询完成后执行)
//第一次查询使用:searchScrollStart
SearchScrollHits<People> searchScrollHits = this.elasticsearchRestTemplate.searchScrollStart(scrollTimeInMillis, query, People.class, IndexCoordinates.of("people_index"));
String scrollId = searchScrollHits.getScrollId();
scrollIdList.add(scrollId);
System.out.println("scrollId:"+scrollId);
long totalHits = searchScrollHits.getTotalHits();
currentTotal=searchScrollHits.getSearchHits().size();
System.out.println("totalHits:"+totalHits);
List<People> list = searchScrollHits.get().map(SearchHit::getContent).collect(Collectors.toList());
System.out.println("============pageNo:==========="+pageNo);
for (People people : list) {
System.out.println(people);
}
while (currentTotal<totalHits){
SearchScrollHits<People> searchScrollHitsContinue = elasticsearchRestTemplate.searchScrollContinue(scrollId, scrollTimeInMillis, People.class, IndexCoordinates.of("people_index"));
scrollId=searchScrollHitsContinue.getScrollId();
scrollIdList.add(scrollId);
pageNo++;
if(searchScrollHitsContinue.hasSearchHits()){
currentTotal+=searchScrollHitsContinue.getSearchHits().size();
List<People> peopleList = searchScrollHitsContinue.get().map(SearchHit::getContent).collect(Collectors.toList());
System.out.println("============pageNo:==========="+pageNo);
for (People people : peopleList) {
System.out.println(people);
}
}else{
System.out.println("============pageNo not hasSearchHits===========");
break;
}
}
System.out.println(scrollIdList);
elasticsearchRestTemplate.searchScrollClear(scrollIdList);
}
|