背景:
问题: 使用 es 的 cardinality 做数据去重 会导致 结果出现 +-%5误差;
Query:
{
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"rpt_dt": {
"gte": "2022-01-01",
"lt": "2022-12-13"
}
}
}
]
},
"aggs": {
"01": {
"filter": {
"terms": {
"data_sources": [
"01"
]
}
},
"aggs": {
"01": {
"cardinality": {
"field": "mac_id"
}
}
}
}
}
}
Return:
{
"01" : {
"doc_count" : 10901762,
"01" : {
"value" : 1425288
}
}
}
?方案一:
- 使用composite 多列聚合 类似 mysql?select count(commissionamount) from xxx_table group by timeperiod,orgId
Query:
{
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"rpt_dt": {
"gte": "2022-01-01",
"lt": "2022-12-13"
}
}
}
]
}},
"aggs": {
"NAME": {
"composite": {
"sources": [
{
"fault_name": {
"terms": {
"field": "mac_id"
}
}
},
{
"mac_id":{
"terms": {
"field": "fault_name"
}
}
}
]
}
}
}
}
Return:
{
"key" : {
"fault_name" : "***",
"mac_id" : "***"
},
"doc_count" : 2
}
- 通过after key 循环查询 所有返回的 fault_name 和 mac_id 排列组合,在python 中进行数量排名,去重数量计算。
- 优点:
- 问题:
- 需要循环查询多次(取决于排列组合数量),代码需要大量改动
方案二:
扩大es terms 聚合size 上限
PUT test/_settings
{
"persistent": {
"search.max_buckets": 2000000
}
}
GET test/_search?filter_path=aggregations.**.count,aggregations.mac_id.buckets.key,aggregations.mac_id.buckets.key_as_string
{
"query": {
"bool": {
"filter": [
{
"range": {
"rpt_dt": {
"gte": "2022-06-01",
"lt": "2022-06-04"
}
}
}
]
}
},
"size": 0,
"aggs": {
"mac_id": {
"terms": {
"field": "rpt_dt",
"size": 20
},
"aggs": {
"fault_name": {
"terms": {
"field": "mac_id",
"size": 100000000
}
},
"count": {
"stats_bucket": {
"buckets_path": "fault_name._count"
}
}
}
}
}
}
具体介绍:Stats Bucket Aggregation - elasticsearch中文文档??
方案三:
使用 scripted_metric 实现自定义聚合
init_script 定义 map_script 操作 判断 combine_script 操作返回记录 reduce_script 返回sum值
Query:
{
"query": {
"bool": {
"filter": [
{
"range": {
"rpt_dt": {
"gte": "2022-06-01",
"lt": "2022-06-04"
}
}
}
]
}
},
"size": 0,
"aggs": {
"mac_id": {
"terms": {
"field": "fault_name",
"size": 10
},
"aggs": {
"spu": {
"scripted_metric": {
"init_script": {
"source": "state.numas=new HashMap();",
"lang": "painless"
},
"map_script": {
"source": """
if(doc.mac_id.length>=1){
String houseKey = doc.mac_id.value;
state.numas.put(houseKey,1);
}
""",
"lang": "painless"
},
"combine_script": {
"source": """
double item_finish_count=0;
for(key in state.numas.keySet()){
item_finish_count+=1;
}
return item_finish_count;""",
"lang": "painless"
},
"reduce_script": {
"source": """double result=0;
for(e in states){
if(!Objects.isNull(e)){
result+=e;
}
}
return result;""",
"lang": "painless"
},
"params": {
"close_sum_key": "close_sum3",
"house_sum_key": "house_sum3"
}
}
}
}
}
}
}
Return:
{
"key_as_string" : "2022-06-01 00:00:00",
"key" : 1654041600000,
"doc_count" : 274282,
"spu" : {
"value" : 268144.0
}
}
-
es 支持 scripted_metric 使用java 语法脚本的方式 自定义聚合 -
这里使用 hashmap 数据结构,对第一层 桶做遍历,将 需要去重的字段 放入hashmap ,最后统计hashmap的key数量,得到去重后数量 -
优点:
- 聚合去重结果准确,代码改动少
- 速度快,几乎和原始方案差别不到 慢20%不到
-
缺点
- 多分片时脚本较为复杂,因为每个分片有一个hashmap,需要汇总统计,单分片可解决
- 天量数据可能会占用较大内存,2-3年内数据问题不大
?
|