这篇文章是翻译过来的,原文在此,需要科学上网。
当查询页很深或者查询的数据量很大时,深查询就会出现。es 的自我保护机制允许的一次最大查询量是 10000 条数据。在请求中加入trackTotalHits(true) 可以解除10000条的上限。
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().trackTotalHits(true);
三种批量查询
from size
这种实现方式有点类似于 MySQL 中的 limit。性能差,实现简单,适用于少量数据,但优点是可以随机跳转页面。
package com.example.es.test;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ESTest_from_size {
public static final Logger logger = LoggerFactory.getLogger(ESTest_searchAfter.class);
public static void main(String[] args) throws Exception{
long startTime = System.currentTimeMillis();
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http")));
SearchRequest searchRequest = new SearchRequest("audit2");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.from(0);
sourceBuilder.size(1000);
sourceBuilder.sort(SortBuilders.fieldSort("operationtime").order(SortOrder.DESC));
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = searchResponse.getHits().getHits();
List<Map<String, Object>> result = new ArrayList<>();
if (hits != null && hits.length > 0) {
for (SearchHit hit : hits) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
result.add(sourceAsMap);
}
}
logger.info("The number of data queried is:{}", result.size());
esClient.close();
logger.info("Running time: " + (System.currentTimeMillis() - startTime) + "ms");
}
}
}
scroll
高效的滚动查询,第一个查询会在内存中保存一个历史快照和光标(scroll_id)来记录当前消息查询的终止位置。下次查询会从光标记录的位置往后进行查询。这种方式性能好,不是事实的,一般用于海量数据导出或者重建索引。但是 scroll_id 有过期时间,两次查询之间如果 scroll_id 过期了,第二次查询会抛异常“找不到 “scroll_id”。加入场景是读一批数据,处理,再读再处理,恰好处理过称很花费时间且不确定,那很可能会遇到 scroll_id 过期。
package com.example.es.test;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ESTest_Scroll {
public static final Logger logger = LoggerFactory.getLogger(ESTest_Scroll.class);
public static void main(String[] args) throws Exception{
long startTime = System.currentTimeMillis();
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
SearchRequest searchRequest = new SearchRequest("audit2");
searchRequest.scroll(TimeValue.timeValueMinutes(1L));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(1000);
searchSourceBuilder.sort(SortBuilders.fieldSort("operationtime").order(SortOrder.DESC));
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT);
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();
List<Map<String, Object>> result = new ArrayList<>();
for (SearchHit hit: searchHits) {
result.add(hit.getSourceAsMap());
}
while (true) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(TimeValue.timeValueMinutes(1L));
SearchResponse scrollResp = esClient.scroll(scrollRequest, RequestOptions.DEFAULT);
SearchHit[] hits = scrollResp.getHits().getHits();
if (hits != null && hits.length > 0) {
for (SearchHit hit : hits) {
result.add(hit.getSourceAsMap());
}
} else {
break;
}
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse = esClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
boolean succeeded = clearScrollResponse.isSucceeded();
logger.info("delete scrollId: {}", succeeded);
logger.info("Total number of queries:{}", result.size());
esClient.close();
logger.info("Running time: " + (System.currentTimeMillis() - startTime) + "ms");
}
}
search after
顾名思义,从指定的某个数据后面开始读。这种方式不能随机跳转分页,只能一页一页地读取数据,而且必须用一个唯一且不重复的属性对待查数据进行排序。这种方式的优点是批量查询但不依赖于 scroll_id,所以后续处理可以不考虑耗费时间的问题。
package com.example.es.test;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ESTest_searchAfter {
public static final Logger logger = LoggerFactory.getLogger(ESTest_searchAfter.class);
public static void main(String[] args) throws Exception{
long startTime = System.currentTimeMillis();
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
SearchRequest searchRequest = new SearchRequest("audit2");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().trackTotalHits(true);
sourceBuilder.size(1000);
sourceBuilder.sort(SortBuilders.fieldSort("operationtime").order(SortOrder.DESC));
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits1 = searchResponse.getHits().getHits();
List<Map<String, Object>> result = new ArrayList<>();
if (hits1 != null && hits1.length > 0) {
do {
for (SearchHit hit : hits1) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
result.add(sourceAsMap);
}
SearchHit[] hits = searchResponse.getHits().getHits();
Object[] lastNum = hits[hits.length - 1].getSortValues();
sourceBuilder.searchAfter(lastNum);
searchRequest.source(sourceBuilder);
searchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT);
} while (searchResponse.getHits().getHits().length != 0);
}
logger.info("The number of data queried is:{}", result.size());
esClient.close();
logger.info("Running time: " + (System.currentTimeMillis() - startTime) + "ms");
}
}
|