目录
桶(Buckets)
指标(Metrics)
将两者结合起来——聚合
常见的聚合查询
聚合查询的使用
1、简单的词频统计
2、数据按时间划分
3、数据按某个字段进行聚合后,再按时间排序
为了掌握聚合,要先了解两个主要概念:
Buckets(桶):满足某个条件的文档集合。
Metrics(指标):为某个桶中的文档计算得到的统计信息。
就是这样!每个聚合只是简单地由一个或者多个桶,零个或者多个指标组合而成。可以将它粗略地转换为SQL:
SELECT COUNT(field) FROM table?GROUP BY field
以上的COUNT(field )就相当于一个指标。GROUP BY field 则相当于一个桶。
桶和SQL中的组(Grouping)拥有相似的概念,而指标则与COUNT(),SUM(),MAX()等相似。
桶(Buckets)
一个桶就是满足特定条件的一个文档集合,例如:
- 一名员工要么属于男性桶,或者女性桶。
- 长宁区属于上海市这个桶。
- 日期2021-08-28属于八月份这个桶。
随着聚合被执行,每份文档中的值会被计算来决定它们是否匹配了桶的条件。如果匹配成功,那么该文档会被置入该桶中,同时聚合会继续执行。
桶也能够嵌套在其它桶中,能让你完成层次或者条件划分这些需求。比如,娄山关地铁站可以被放置在长宁区这个桶中,而整个长宁区则能够被放置在上海市这个桶中。
ES中有很多类型的桶,让你可以将文档通过多种方式进行划分(按日期,关键词,标签等),从根本上来说,都是根据相同的原理运作:按照条件对文档进行划分。
指标(Metrics)
分桶是达到最终目的的手段:提供了对文档进行划分的方法,从而让你能够计算需要的指标。
多数指标仅仅是简单的数学运算(比如,min,mean,max以及sum),它们使用文档中的值进行计算。
在实际应用中,指标能够让你计算例如平均价格,最高出售价格等。
将两者结合起来——聚合
一个聚合就是一些桶和指标的组合。一个聚合可以只有一个桶,或者一个指标,或者每样一个。在桶中甚至可以有多个嵌套的桶。比如,我们可以将文档按照其所属国家进行分桶,然后对每个桶计算其平均薪资(一个指标)。
因为桶是可以嵌套的,我们能够实现一个更加复杂的聚合操作:
- 将文档按照国家进行分桶。(桶)
- 然后将每个国家的桶再按照性别分桶。(桶)
- 然后将每个性别的桶按照年龄区间进行分桶。(桶)
- 最后,为每个年龄区间计算平均薪资。(指标)
常见的聚合查询
聚合查询都是由AggregationBuilders创建的,一些常见的聚合查询如下
??????(1)统计某个字段的数量
ValueCountBuilder vcb= AggregationBuilders.count("count_uid").field("uid");
(2)去重统计某个字段的数量(有少量误差)
CardinalityBuilder cb= AggregationBuilders.cardinality("distinct_count_uid").field("uid");
(3)聚合过滤
FilterAggregationBuilder fab= AggregationBuilders.filter("uid_filter").filter(QueryBuilders.queryStringQuery("uid:001"));
(4)按某个字段分组
TermsBuilder tb= AggregationBuilders.terms("group_name").field("name");
(5)求和
SumBuilder sumBuilder= AggregationBuilders.sum("sum_price").field("price");
(6)求平均
AvgBuilder ab= AggregationBuilders.avg("avg_price").field("price");
(7)求最大值
MaxBuilder mb= AggregationBuilders.max("max_price").field("price");
(8)求最小值
MinBuilder min= AggregationBuilders.min("min_price").field("price");
(9)按日期间隔分组
DateHistogramBuilder dhb= AggregationBuilders.dateHistogram("dh").field("date");
(10)获取聚合里面的结果
TopHitsBuilder thb= AggregationBuilders.topHits("top_result");
(11)嵌套的聚合
NestedBuilder nb= AggregationBuilders.nested("negsted_path").path("quests");
(12)反转嵌套
AggregationBuilders.reverseNested("res_negsted").path("kps ");
聚合查询的使用
1、简单的词频统计
public void test(){
//构建数据筛选条件
BoolQueryBuilder root = QueryBuilders.boolQuery();
root.must(QueryBuilders.rangeQuery("publish_time")
.gte(startTime)
.lte(endTime));
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.from(0).size(0)
.timeout(new TimeValue(60, TimeUnit.SECONDS))
.query(root);
//构建聚合桶
TermsAggregationBuilder termsAggregationBuilder =
AggregationBuilders.terms("terms_aggr")
.field("word_cloud")
.includeExclude(new IncludeExclude(".{2,}", null))
.size(50);
builder.aggregation(termsAggregationBuilder);
//构建查询对象
SearchRequest searchRequest = new SearchRequest(ES_INDEX_NAME);
searchRequest.source(builder);
//查询,获取查询结果
SearchResponse response;
try {
response = ES.getHighLevelClient().search(searchRequest);
} catch (IOException e) {
LOGGER.warn("Search ES Error", e);
return Collections.emptyList();
}
//取出查询结果
List<Map<String, Object>> results = new ArrayList<>();
Terms aggr = response.getAggregations().get("terms_aggr");
for (Terms.Bucket bucket : aggr.getBuckets()) {
String key = bucket.getKeyAsString().toLowerCase().replaceFirst("'s$", "");
Map<String, Object> map = new HashMap<>(2);
map.put("name", key);
map.put("value", bucket.getDocCount());
results.add(map);
}
System.out.println("results:" + results);
}
2、数据按时间划分
public void test(){
//构建数据筛选条件,本测试仅进行时间筛选
BoolQueryBuilder root = QueryBuilders.boolQuery();
root.must(QueryBuilders.rangeQuery("publish_time")
.gte(startTime)
.lte(endTime));
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.from(0).size(0)
.timeout(new TimeValue(60, TimeUnit.SECONDS))
.query(root);
//构建聚合对象
DateHistogramAggregationBuilder histogramBuilder =
AggregationBuilders.dateHistogram("date_hist")
.field("publish_time")
.dateHistogramInterval(DateHistogramInterval.DAY)
.extendedBounds(new ExtendedBounds(startTime,endTime))
.minDocCount(0);
builder.aggregation(histogramBuilder );
//构建查询对象
SearchRequest searchRequest = new SearchRequest(ES_INDEX_NAME);
searchRequest.source(builder);
//查询,获取查询结果
SearchResponse response;
try {
response = ES.getHighLevelClient().search(searchRequest);
} catch (IOException e) {
LOGGER.warn("Search ES Error", e);
return Collections.emptyList();
}
//取出查询结果
List<Map<String, Object>> results = new ArrayList<>();
Terms aggr = response.getAggregations().get("date_hist");
for (Terms.Bucket bucket : aggr.getBuckets()) {
String key = bucket.getKeyAsString();
Map<String, Object> map = new HashMap<>(2);
map.put("name", key);
map.put("value", bucket.getDocCount());
results.add(map);
}
System.out.println("results:" + results);
}
3、数据按某个字段进行聚合后,再按时间排序
//排序,倒序:false,正序:true
boolean sortASC = false ;
public void test(){
//构建数据筛选条件,本测试仅进行时间筛选
BoolQueryBuilder root = QueryBuilders.boolQuery();
root.must(QueryBuilders.rangeQuery("publish_time")
.gte(startTime)
.lte(endTime));
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.from(0).size(0)
.timeout(new TimeValue(60, TimeUnit.SECONDS))
.query(root);
//构建聚合的Order对象
Terms.Order sortOrder = Terms.Order.aggregation("sort_aggs_publish_time",sortASC);
//构建排序子聚合,实现排序,正序就用min函数,倒序就用max函数
AggregationBuilder sortAggregationBuilder =
sortASC?AggregationBuilders.min("sort_aggs_publish_time").field("publish_time"):
AggregationBuilders.max("sort_aggs_publish_time").field("publish_time");
//构建聚合对象
TermsAggregationBuilder termsAggregationBuilder =
AggregationBuilders.terms("aggr_cluster_id")
.field("cluster_id")
.size(50)
.order(sortOrder)
.subAggregation(sortAggregationBuilder)
;
builder.aggregation(termsAggregationBuilder);
//构建查询对象
SearchRequest searchRequest = new SearchRequest(ES_INDEX_NAME);
searchRequest.source(builder);
//查询,获取查询结果
SearchResponse response;
try {
response = ES.getHighLevelClient().search(searchRequest);
} catch (IOException e) {
LOGGER.warn("Search ES Error", e);
return Collections.emptyList();
}
//取出查询结果
List<Map<String, Object>> results = new ArrayList<>();
Terms aggr = response.getAggregations().get("date_hist");
for (Terms.Bucket bucket : aggr.getBuckets()) {
String key = bucket.getKeyAsString();
Map<String, Object> map = new HashMap<>(2);
map.put("name", key);
map.put("value", bucket.getDocCount());
results.add(map);
}
System.out.println("results:" + results);
}
|