一、背景
管理後臺有個導出需求,一個Event的點擊日志,總數可能有個2-3W,但是客戶到出我們是
From -To 這之間的記錄數,超過1W 的數據倒不出來,縂條數顯示有2萬多,客戶需要到處隨意條數。這個大家都懂是ES的最大返回條數控制了;
網上查詢了方法有Scroll可以實現; 貌似Search After也可以實現
而我們剛好使用的 client是? RestHighLevelClient 有一些參考網頁是別的Clent已經過時了,此處就不做展示了【反正就是关联上你的Es的cluster等配置信息即可】
再完成了单元测试的改造之后发现Scroll并不支持from\size的翻页参数设置,此处需要注意
SearchRequest 【请求设置】 和 SearchResponse.[拿到结果]
SearchRequest 中的 SearchSourceBuilder 参数要去掉from入参,而builer原来分页的size变成了scroll滚动查询时的每个分片返回的条数
public SearchSourceBuilder getScrollSearchSourceBuilder() {
// searchSourceBuilder.from(start);
// searchSourceBuilder.size(pagesize);
searchSourceBuilder.size(5000);
searchSourceBuilder.fetchSource(includes, excludes);
if(this.distinctField!=null){
searchSourceBuilder.collapse(new CollapseBuilder(this.distinctField));
}
if (highlightBuilder.fields().size() > 0) {
highlightBuilder.preTags(preTags);
highlightBuilder.postTags(postTags);
searchSourceBuilder.highlighter(highlightBuilder);
}
return searchSourceBuilder.query(boolQueryBuilder);
}
具体滚动查询的实现接口
EsQuery中也可以防止Scroll对象,这里我是写到代码里了
public <T> EsResult<List<T>> createScrollSearch(EsIndexConstant index, EsQuery esQuery, Class<T> clazz) {
SearchResponse result;
SearchRequest search = new SearchRequest(index.name());
search.source(esQuery.getScrollSearchSourceBuilder());
//是否有路由
if(StrUtil.isNotBlank(esQuery.getRouting())){
search.routing("routing",esQuery.getRouting());
}
//滚动游标设置
Scroll scroll = new Scroll(new TimeValue(600000));
search.scroll(scroll);
log.debug("Elasticsearch Exec:{}",search.source());
int i = 0;
while (true) {
try {
result = rhlClient.search(search,RequestOptions.DEFAULT);
break;
} catch (IOException e) {
i++;
log.error("elasticsearch createSearch,retry Count:{}, error :{}",i, e);
if(i>retry){
//超过重试次数,则抛出此异常
throw ExceptionConstant.queryDataException(ExceptionCodeEnum.esQueryIoException, e);
}
}
}
if (!result.status().equals(RestStatus.OK)){
log.error("elasticsearch search error");
return new EsResult(false);
}
String scrollId = result.getScrollId();
//返回集合
List resultList = new ArrayList<>();
//放入总条数
SearchHits hits = result.getHits();
//获取总数
Long total = hits.getTotalHits().value;
TotalHits.Relation esCount = hits.getTotalHits().relation;
SearchHit[] hitsHits = hits.getHits();
try {
while(ArrayUtil.isNotEmpty(hitsHits)){
if (clazz.equals(Map.class) || clazz.equals(Dict.class)) {
for (SearchHit hitsHit : hitsHits) {
resultList.add(hitsHit.getSourceAsMap());
}
} else {
for (SearchHit hitsHit : hitsHits) {
T entity = JSON.parseObject(hitsHit.getSourceAsString(), clazz, FastjsonUtil.parserConfig);
resultList.add(entity);
}
}
//再次发送请求,并使用上次搜索结果的ScrollId
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
searchScrollRequest.scroll(scroll);
SearchResponse searchScrollResponse = rhlClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
scrollId = searchScrollResponse.getScrollId();
hitsHits = searchScrollResponse.getHits().getHits();
}
//及时清除es快照,释放资源
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
rhlClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
}catch (IOException e){
log.error("elasticsearch scroll search error");
}
return new EsResult(resultList,true).total(total);
}
在fromSize 不能返回的事实不能实现 我才明白Scroll只能根据筛选条件滚动返回所有结果。
然后你需要哪些数据可以在ResultList中做相关逻辑处理。
还有没有别的弊端 暂时不知,还需进一步总结。 后续用一个searchAfter的接口实现 看看区别
由于我的这个索引数据一周变更一次,只会往里面写入数据,且在管理后台使用所以对实时性的要求不是那么高……
|