聚合概念
聚合就相当于是数据库中的分组(GROUP BY) 但是他比GROUP BY更加的强大 \
聚合类型三大类
Bucketing(桶聚合)
- Date Histogram Aggregation:根据日期阶梯分组,例如给定阶梯为周,会自动每周分为一组
- Histogram Aggregation:根据数值阶梯分组,与日期类似
- Terms Aggregation:根据词条内容分组,词条内容完全匹配的为一组
- Range Aggregation:数值和日期的范围分组,指定开始和结束,然后按段分组
- Missing Aggregation:统计文档中缺失字段的数量,缺失字段包含值为null的情况
- Filter Aggregation:对经过Filter条件过滤后的结果集进行聚合查询
每个桶都与一个键和一个文档标准相关联,通过桶的聚合查询,我们将得到一个桶的列表,即:满足条 件的文档集合。是按照某种方式对数据进行分组 比如对1,2,3,1,3使用Terms对其聚合,可以得到1桶,2桶,3桶。
看出ES的分组方式相当强大,mysql的group by只能实现类似Terms Aggregation的分组效果,而ES还可以根据阶梯和范围来分组。
Pipeline(管道) 对其他聚合的输出或相关指标进行二次聚合 比如Terms聚合后拿到了1,2,3桶这个时候我们可以在对他其他的属性进行聚合 也就是对结果在聚合 和数据库中 多字段分组一个意思(多字段分组可以使用script脚本去聚合需要添加配置 但是我试了下没什么用 如果有那个大神有好的解决办法可以分享出来 我这边用的是管道聚合也就是聚合套聚合 或者是在洗数据的时候拼接好分组字段)
Metric(指标) 指标聚合类似于 COUNT() 、 SUM() 、 MAX() 等统计方法
- Avg Aggregation:求平均值
- Max Aggregation:求最大值
- Min Aggregation:求最小值
- Percentiles Aggregation:求百分比
- Stats Aggregation:同时返回avg、max、min、sum、count等
- Sum Aggregation:求和
- Top hits Aggregation:求前几
- Value Count Aggregation:求总数
Term Aggregation
GET pre_package/_search
{
"aggs": {
"Group": {
"terms": {
"field": "productGroupId",
"size": 10
}
}
}
}
这个表示,查询索引为pre_package中的文档数据,并按照cargoOwnerName进行聚合查询,命名为:Group,且只查询前10条
Range Aggregation
GET pre_package/_search
{
"aggs": {
"Group": {
"range": {
"field": "isPrePackage",
"ranges": [
{
"to": 1
},
{
"from": 1,
"to": 2
},
{
"from": 2
}
]
}
}
}
}
按照isPrePackage属性,分为三档,分别为:小于1,1到2,大于2
Date Range Aggregation
GET pre_package/_search
{
"aggs": {
"Group": {
"date_range": {
"field": "update_date",
"ranges": [
{
"to": "2020-05-01 00:00:00"
},
{
"from": "2020-05-02 00:00:00",
"to": "2020-08-01 00:00:00"
},
{
"from": "2020-08-02 00:00:00"
}
]
}
}
}
}
基于时间范围的聚合查询 Filter Aggregation
GET pre_package/_search
{
"aggs": {
"flight_Miles": {
"filter": {
"term": {
"cargoOwnerName": "联合利华测试"
}
}
}
}
}
对经过Filter条件过滤后的结果集进行聚合查询 Missing Aggregation
GET pre_package/_search
{
"aggs": {
"without_age": {
"missing": {
"field": "orderTypeName"
}
}
}
}
统计文档中缺失字段的数量,缺失字段包含值为null的情况 Histogram Aggregation
GET pre_package/_search
{
"aggs": {
"test": {
"histogram": {
"field": "id",
"interval": 100
}
}
}
}
直方图聚合,可按照一定的区间进行统计
ElasticsearchRestTemplate操作帮助类
package com.xxl.job.executor.utils;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.xxl.job.executor.es.EsSortPage;
import com.xxl.job.executor.es.QueryCondition;
import com.xxl.job.executor.es.TermQueryReq;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedAvg;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class ElasticsearchRestEsUtils {
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
public boolean createIndexOps(Class cla) {
IndexOperations ops = elasticsearchTemplate.indexOps(cla);
if (!ops.exists()) {
ops.create();
ops.refresh();
ops.putMapping(ops.createMapping());
}
return true;
}
public void deleteIndex(Class cla) {
boolean delete = elasticsearchTemplate.indexOps(cla).delete();
System.out.println("delete = " + delete);
}
public List<?> findByPageable(EsSortPage esSortPage) {
List<Object> relist = new ArrayList<>();
Sort sort = null;
if (esSortPage.getSort() == 1) {
sort = Sort.by(Sort.Direction.DESC, esSortPage.getSortField());
} else {
sort = Sort.by(Sort.Direction.ASC, esSortPage.getSortField());
}
int currentPage = esSortPage.getCurrentPage();
int pageSize = esSortPage.getPageSize();
PageRequest pageRequest = PageRequest.of(currentPage, pageSize, sort);
ElasticsearchRepository<?, Long> dao = esSortPage.getDao();
Page<?> daoAll = dao.findAll(pageRequest);
List<?> content = daoAll.getContent();
for (Object o : content) {
relist.add(o);
}
return relist;
}
public List<?> findByTermQuery(TermQueryReq termQueryReq) {
List<Object> relist = new ArrayList<>();
FieldSortBuilder balance = null;
if (termQueryReq.getSort() == 1) {
balance = new FieldSortBuilder(termQueryReq.getSortField()).order(SortOrder.DESC);
} else {
balance = new FieldSortBuilder(termQueryReq.getSortField()).order(SortOrder.ASC);
}
int currentPage = termQueryReq.getCurrentPage();
int pageSize = termQueryReq.getPageSize();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
;
List<QueryCondition> mapList = termQueryReq.getMapList();
for (QueryCondition queryCondition : mapList) {
if (queryCondition.getFiltration() == 1) {
RangeQueryBuilder date = QueryBuilders.rangeQuery(queryCondition.getKey()).gte(queryCondition.getValue()).lte(queryCondition.getEndValue());
boolQueryBuilder.filter(date);
} else {
if (queryCondition.getExtractive() == 1) {
if (queryCondition.getValue() != null) {
boolQueryBuilder.must(QueryBuilders.matchQuery(queryCondition.getKey(), queryCondition.getValue()).minimumShouldMatch(termQueryReq.getScore()));
}
} else {
if (queryCondition.getValue() instanceof Integer[]) {
Integer[] pulldown = (Integer[]) queryCondition.getValue();
for (Integer integer : pulldown) {
boolQueryBuilder.should(QueryBuilders.matchQuery(queryCondition.getKey(), integer));
}
}
}
}
}
Pageable pageable = PageRequest.of(currentPage, pageSize);
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(boolQueryBuilder)
.withPageable(pageable)
.withSort(balance)
.build();
List<Object> resList = new ArrayList<>();
SearchHits<?> search = elasticsearchTemplate.search(query, termQueryReq.getCls());
for (SearchHit<?> searchHit : search) {
resList.add(searchHit.getContent());
}
return resList;
}
public List<Map<String, Object>> findPolymerization(TermQueryReq termQueryReq) {
NativeSearchQuery query = new NativeSearchQueryBuilder()
.addAggregation(AggregationBuilders.terms("count").field(termQueryReq.getAggregate() + ".keyword"))
.build();
SearchHits<?> searchHits = elasticsearchTemplate.search(query, termQueryReq.getCls());
Aggregations aggregations = searchHits.getAggregations();
Terms terms = (Terms) aggregations.asMap().get("count");
List<Map<String, Object>> mapList = new ArrayList<>();
for (Terms.Bucket bucket : terms.getBuckets()) {
Map<String, Object> map = new HashMap<>();
String keyAsString = bucket.getKeyAsString();
long docCount = bucket.getDocCount();
map.put("keyAsString", keyAsString);
map.put("docCount", docCount);
mapList.add(map);
}
return mapList;
}
public List<Map<String, Object>> findNest(TermQueryReq termQueryReq) {
TermsAggregationBuilder stateAgg = AggregationBuilders.terms("count").field(termQueryReq.getAggregate() + ".keyword");
AvgAggregationBuilder balanceAgg = AggregationBuilders.avg("avg_" + termQueryReq.getNes()).field(termQueryReq.getNes());
stateAgg.subAggregation(balanceAgg);
if (termQueryReq.getSort() == 1) {
stateAgg.order(BucketOrder.aggregation("avg_" + termQueryReq.getNes(), true));
} else {
stateAgg.order(BucketOrder.aggregation("avg_" + termQueryReq.getNes(), false));
}
NativeSearchQuery build = new NativeSearchQueryBuilder()
.addAggregation(stateAgg)
.build();
SearchHits<?> searchHits = elasticsearchTemplate.search(build, termQueryReq.getCls());
Aggregations aggregations = searchHits.getAggregations();
Terms terms = (Terms) aggregations.asMap().get("count");
List<Map<String, Object>> mapList = new ArrayList<>();
for (Terms.Bucket bucket : terms.getBuckets()) {
Map<String, Object> map = new HashMap<>();
ParsedAvg avg = bucket.getAggregations().get("avg_" + termQueryReq.getNes());
map.put("state", bucket.getKeyAsString());
map.put("count", bucket.getDocCount());
map.put("avg", avg.getValueAsString());
mapList.add(map);
}
return mapList;
}
public List<?> findFiltration(TermQueryReq termQueryReq) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
RangeQueryBuilder balance = QueryBuilders.rangeQuery("balance").gte(20000).lte(30000);
boolQueryBuilder.filter(balance);
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(boolQueryBuilder)
.build();
List<Object> resList = new ArrayList<>();
SearchHits<?> search = elasticsearchTemplate.search(query, termQueryReq.getCls());
for (SearchHit<?> searchHit : search) {
resList.add(searchHit.getContent());
}
return resList;
}
public boolean indexExist(String indexName) {
if (StringUtils.isBlank(indexName)) {
return false;
}
IndexCoordinates indexCoordinates = IndexCoordinates.of(indexName);
return elasticsearchTemplate.indexOps(indexCoordinates).exists();
}
public void indexDelete(String index) {
elasticsearchTemplate.indexOps(IndexCoordinates.of(index)).delete();
}
public boolean indexAddAlias(String indexName, String aliasName) {
if (StringUtils.isBlank(indexName) || StringUtils.isBlank(aliasName)) {
return false;
}
IndexCoordinates indexCoordinates = IndexCoordinates.of(indexName);
if (elasticsearchTemplate.indexOps(indexCoordinates).exists()) {
AliasQuery query = new AliasQuery(aliasName);
boolean bool = elasticsearchTemplate.indexOps(indexCoordinates).addAlias(query);
return bool;
}
return false;
}
public boolean indexRemoveAlias(String indexName, String aliasName) {
if (StringUtils.isBlank(indexName) || StringUtils.isBlank(aliasName)) {
return false;
}
IndexCoordinates indexCoordinates = IndexCoordinates.of(indexName);
if (elasticsearchTemplate.indexOps(indexCoordinates).exists()) {
AliasQuery query = new AliasQuery(aliasName);
boolean bool = elasticsearchTemplate.indexOps(indexCoordinates).removeAlias(query);
return bool;
}
return false;
}
public <T> void save(T t) {
elasticsearchTemplate.save(t);
}
public void bulkIndex(List<IndexQuery> queries, String index) {
IndexCoordinates indexCoordinates = IndexCoordinates.of(index);
elasticsearchTemplate.bulkIndex(queries, indexCoordinates);
}
public void bulkCls(List<IndexQuery> queries, Class cls) {
createIndexOps(cls);
elasticsearchTemplate.bulkIndex(queries, cls);
}
public void delete(Class c, String filedName, Object val, String index) {
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(filedName, val);
NativeSearchQuery nativeSearchQuery = new NativeSearchQuery(termQueryBuilder);
elasticsearchTemplate.delete(nativeSearchQuery, c, IndexCoordinates.of(index));
}
public void deleteById(Object id, String index) {
if (null != id && StringUtils.isNotBlank(index)) {
elasticsearchTemplate.delete(id.toString(), IndexCoordinates.of(index));
}
}
public <T> void update(T t, Integer id, String index) {
Object data = elasticsearchTemplate.get(id.toString(), t.getClass(), IndexCoordinates.of(index));
if (data != null) {
UpdateQuery build = UpdateQuery.builder(id.toString()).withDocument(Document.parse(JSON.toJSONString(t))).build();
elasticsearchTemplate.update(build, IndexCoordinates.of(index));
} else {
elasticsearchTemplate.save(t);
}
}
public IndexQuery assembleDataEs(Object obj) {
IndexQuery indexQuery = new IndexQuery();
try {
Field field = getDeclareField(obj, "id");
field.setAccessible(true);
indexQuery.setId(field.get(obj).toString());
} catch (IllegalAccessException e) {
e.printStackTrace();
}
indexQuery.setSource(JSON.toJSONString(obj));
return indexQuery;
}
public static Field getDeclareField(Object o, String fieldName) {
Field field = null;
Class<?> clazz = o.getClass();
for (; field == null; clazz = clazz.getSuperclass()) {
try {
field = clazz.getDeclaredField(fieldName);
} catch (NoSuchFieldException e) {
}
}
return field;
}
}
RestHighLevelClient操作帮助类
package com.xxl.job.executor.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xxl.job.executor.es.QueryCondition;
import com.xxl.job.executor.es.TermQueryReq;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
@Slf4j
@Component
public class RestHighLeveEsUtils {
@Autowired
private RestHighLevelClient restHighLevelClient;
public boolean createIndex(String indexName) {
boolean exists = true;
try {
CreateIndexRequest request = new CreateIndexRequest(indexName);
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
if (createIndexResponse.equals("") || createIndexResponse != null) {
exists = false;
}
} catch (IOException e) {
exists = false;
}
return exists;
}
public boolean isIndexExists(String indexName) {
boolean exists = true;
try {
GetIndexRequest request = new GetIndexRequest(indexName);
exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
exists = false;
}
return exists;
}
public boolean delIndex(String indexName) {
boolean exists = true;
try {
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
AcknowledgedResponse delete = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);
exists = delete.isAcknowledged();
} catch (IOException e) {
exists = false;
}
return exists;
}
public boolean updateDocument(String indexName, Object obj, String id) {
boolean exists = true;
try {
UpdateRequest updateRequest = new UpdateRequest(indexName, id);
updateRequest.timeout("1s");
updateRequest.doc(JSON.toJSONString(obj), XContentType.JSON);
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
if (!updateResponse.status().equals("OK")) {
exists = false;
}
} catch (IOException e) {
e.printStackTrace();
}
return exists;
}
public boolean deleteRequest(String indexName, String id) {
boolean exists = true;
try {
DeleteRequest request = new DeleteRequest(indexName, id);
request.timeout("1s");
DeleteResponse delete = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
if (!delete.status().equals("OK")) {
exists = false;
}
} catch (IOException e) {
e.printStackTrace();
}
return exists;
}
public Map getDocument(String indexName, String id) {
Map strToMap = null;
try {
GetRequest request = new GetRequest(indexName, id);
GetResponse getResponse = restHighLevelClient.get(request, RequestOptions.DEFAULT);
strToMap = JSONObject.parseObject(getResponse.getSourceAsString());
} catch (IOException e) {
e.printStackTrace();
}
return strToMap;
}
private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
BulkProcessor bulkProcessor = null;
try {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
log.info("Try to insert data number : " + request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
log.info("************** Success insert data number : " + request.numberOfActions() + " , id: "
+ executionId);
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
log.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
}
};
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(1000);
builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
builder.setConcurrentRequests(2);
builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
bulkProcessor = builder.build();
} catch (Exception e) {
e.printStackTrace();
try {
bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
client.close();
} catch (Exception e1) {
log.error(e1.getMessage());
}
}
return bulkProcessor;
}
public boolean bulkRequest(ArrayList<Map<String, Object>> objectArrayList, String indexName, String value) {
boolean exists = true;
BulkProcessor bulkProcessor = getBulkProcessor(restHighLevelClient);
try {
for (int i = 0; i < objectArrayList.size(); i++) {
bulkProcessor.add(new IndexRequest(indexName)
.id(objectArrayList.get(i).get(value).toString())
.source(JSON.toJSONString(objectArrayList.get(i)), XContentType.JSON));
}
bulkProcessor.flush();
} catch (Exception e) {
log.error(e.getMessage());
} finally {
try {
boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
log.info(String.valueOf(terminatedFlag));
} catch (Exception e) {
log.error(e.getMessage());
}
}
return exists;
}
public boolean addDocument(String indexName, Object obj, String id) {
boolean exists = true;
IndexResponse indexResponse = null;
try {
IndexRequest request = new IndexRequest(indexName);
request.id(id);
request.timeout(TimeValue.timeValueDays(1));
request.source(JSON.toJSONString(obj), XContentType.JSON);
indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
if (!indexResponse.equals("CREATED")) {
exists = false;
}
} catch (IOException e) {
exists = false;
}
return exists;
}
public boolean isExists(String indexName, String id) {
boolean exists = true;
try {
GetRequest request = new GetRequest(indexName, id);
request.fetchSourceContext(new FetchSourceContext(false));
request.storedFields("_none_");
exists = restHighLevelClient.exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return exists;
}
public List<Map<String, Object>> searchMatch(String indexName, String key, String value) throws IOException {
List<Map<String, Object>> map = new ArrayList<>();
SearchRequest searchRequest = new SearchRequest(indexName);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
MatchQueryBuilder termQueryBuilder = new MatchQueryBuilder(key, value);
termQueryBuilder.fuzziness(Fuzziness.AUTO);
sourceBuilder.query(termQueryBuilder);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
for (SearchHit documentFields : searchResponse.getHits().getHits()) {
map.add(documentFields.getSourceAsMap());
}
return map;
}
public List<Map<String, Object>> searchQuery(String indexName, String key, String value) throws IOException {
List<Map<String, Object>> map = new ArrayList<>();
SearchRequest searchRequest = new SearchRequest(indexName);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(key, value);
sourceBuilder.query(termQueryBuilder);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
for (SearchHit documentFields : searchResponse.getHits().getHits()) {
map.add(documentFields.getSourceAsMap());
}
return map;
}
public void EsupdateTimeliness(String catalogId,String timeliness,String indexName,String catalogName){
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery(catalogName, catalogId));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(boolQueryBuilder);
SearchRequest request = new SearchRequest(indexName);
request.source(sourceBuilder);
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName)
.setQuery(boolQueryBuilder)
.setScript(new Script("ctx._source['timeliness']='" + timeliness + "'"));
try {
SearchResponse search = restHighLevelClient.search(request, RequestOptions.DEFAULT);
long docMun = search.getHits().getTotalHits().value;
if (docMun != 0) {
restHighLevelClient.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT).getUpdated();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private SearchResponse getCommodityList(String index,TermQueryReq req) {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.sort("id", SortOrder.ASC);
BoolQueryBuilder mustQuery = QueryBuilders.boolQuery();
List<QueryCondition> mapList = req.getMapList();
for (QueryCondition queryCondition : mapList) {
if (queryCondition.getExtractive() == 1){
if (queryCondition.getValue() != null){
mustQuery.must(QueryBuilders.termQuery(queryCondition.getKey(), queryCondition.getValue()));
}
}else{
Integer[] pulldown = (Integer[]) queryCondition.getValue();
for (Integer integer : pulldown) {
mustQuery.should(QueryBuilders.matchQuery(queryCondition.getKey(), integer));
}
}
}
StringBuffer fields = new StringBuffer();
for (QueryCondition queryCondition : mapList) {
fields.append(queryCondition.getKey()).append(",");
}
sourceBuilder.fetchSource(new FetchSourceContext(true, fields.toString().split(","), Strings.EMPTY_ARRAY));
sourceBuilder.query(mustQuery);
sourceBuilder.from((req.getPageSize() - 1) * req.getPageSize());
sourceBuilder.size(req.getPageSize());
sourceBuilder.trackTotalHits(true);
searchRequest.source(sourceBuilder);
SearchResponse response;
List<Map<String, Object>> list = null;
try {
response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
e.printStackTrace();
return null;
}
return response;
}
}
二次聚合
网上说在es5.0以后二次聚合如果使用script脚本去聚合需要添加配置 但是我试了下没什么用 如果有那个大神有好的解决办法可以分享出来 我这边用的是管道聚合也就是聚合套聚合 或者是在洗数据的时候拼接好分组字段
PageInfo<DespatchRes> pageInfo = new PageInfo<>();
List<DespatchRes> relist = Lists.newArrayList();
int from = (req.getPageNum() - 1) * (req.getPageSize() - 1);
int limit = from+req.getPageNum();
TermsAggregationBuilder stateTime = AggregationBuilders.terms("stateTime").field("productGroupId").size(Integer.MAX_VALUE);
TermsAggregationBuilder stateAgg = AggregationBuilders.terms("GroupId").field("pickupTime").size(Integer.MAX_VALUE);
stateAgg.order(BucketOrder.aggregation("_count", true));
stateAgg.subAggregation(AggregationBuilders.topHits("Group").size(1));
stateAgg.subAggregation(new BucketSortPipelineAggregationBuilder("pageInfo", null).from(from).size(limit));
stateAgg.subAggregation(AggregationBuilders.sum("perSum").field("isPrePackage"));
stateAgg.subAggregation(stateTime);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (StringUtils.isNotBlank(req.getPickupStartTime()) && StringUtils.isNotBlank(req.getPickupEndTime())){
RangeQueryBuilder date1 = QueryBuilders.rangeQuery("pickupTime").from(req.getPickupStartTime()).to(req.getPickupEndTime());
boolQueryBuilder.filter(date1);
}
boolQueryBuilder.must(QueryBuilders.matchQuery("sts", 0));
boolQueryBuilder.must(QueryBuilders.matchQuery("tenantId", user.getTenantId()));
if (ObjectUtils.isNotEmpty(req.getCargoOwnerId())){
boolQueryBuilder.must(QueryBuilders.matchQuery("cargoOwnerId", req.getCargoOwnerId()));
}
if (ObjectUtils.isNotEmpty(req.getWarehouseId())){
List<Long> warehouseId = req.getWarehouseId();
for (Long aLong : warehouseId) {
boolQueryBuilder.should(QueryBuilders.matchQuery("warehouseId", aLong));
}
}
if (ObjectUtils.isNotEmpty(req.getWarehouseCodeId())){
List<Long> warehouseCodeId = req.getWarehouseCodeId();
for (Long aLong : warehouseCodeId) {
boolQueryBuilder.should(QueryBuilders.matchQuery("referenceThree", aLong));
}
}
if (ObjectUtils.isNotEmpty(req.getOrderTypeId())){
List<Long> warehouseCodeId = req.getOrderTypeId();
for (Long aLong : warehouseCodeId) {
boolQueryBuilder.should(QueryBuilders.matchQuery("orderTypeId", aLong));
}
}
NativeSearchQuery build = new NativeSearchQueryBuilder()
.withQuery(boolQueryBuilder)
.withTrackTotalHits(true)
.addAggregation(stateAgg)
.build();
SearchHits<DespatchEs> searchHits = elasticsearchTemplate.search(build, DespatchEs.class);
Aggregations aggregations = searchHits.getAggregations();
Terms terms = (Terms) aggregations.asMap().get("GroupId");
for (Terms.Bucket bucket : terms.getBuckets()) {
DespatchRes rs = new DespatchRes();
rs.setNumber(bucket.getDocCount());
Aggregations bucketAggregations = bucket.getAggregations();
TopHits ns = (TopHits) bucketAggregations.asMap().get("Group");
SearchHit[] hits = ns.getHits().getHits();
for (SearchHit hit : hits) {
rs.setId(Long.parseLong(hit.getId()));
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
rs.setPickupTime(sourceAsMap.get("pickupTime") == null ? "" :sourceAsMap.get("pickupTime").toString());
rs.setProductGroupDetail(sourceAsMap.get("productGroupDetail") == null ? "" : sourceAsMap.get("productGroupDetail").toString());
rs.setProductGroupId(sourceAsMap.get("productGroupId") == null ? "":sourceAsMap.get("productGroupId").toString());
rs.setReferenceThree(sourceAsMap.get("referenceThree") == null ? "":sourceAsMap.get("referenceThree").toString());
}
Sum perSum = bucketAggregations.get("perSum");
double v = bucket.getDocCount() - perSum.getValue();
rs.setIsPrePackageNumber(v);
relist.add(rs);
}
pageInfo.setList(relist);
pageInfo.setTotal(terms.getBuckets().size());
pageInfo.setPageNum(req.getPageNum());
pageInfo.setPageSize(req.getPageSize());
return ResultRes.success(pageInfo);
我始终相信 在这个世界上一定有着另外一个自己 在做着我不敢做的事情 过着我想过的生活
|