IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Elasticsearch之指标,分桶,管道聚合之操作类ElasticsearchRestTemplate和RestHighLevelClient以及dsl -> 正文阅读

[大数据]Elasticsearch之指标,分桶,管道聚合之操作类ElasticsearchRestTemplate和RestHighLevelClient以及dsl

作者:token keyword

聚合概念

聚合就相当于是数据库中的分组(GROUP BY) 但是他比GROUP BY更加的强大 \

聚合类型三大类

Bucketing(桶聚合)

  • Date Histogram Aggregation:根据日期阶梯分组,例如给定阶梯为周,会自动每周分为一组
  • Histogram Aggregation:根据数值阶梯分组,与日期类似
  • Terms Aggregation:根据词条内容分组,词条内容完全匹配的为一组
  • Range Aggregation:数值和日期的范围分组,指定开始和结束,然后按段分组
  • Missing Aggregation:统计文档中缺失字段的数量,缺失字段包含值为null的情况
  • Filter Aggregation:对经过Filter条件过滤后的结果集进行聚合查询

每个桶都与一个键和一个文档标准相关联,通过桶的聚合查询,我们将得到一个桶的列表,即:满足条 件的文档集合。是按照某种方式对数据进行分组 比如对1,2,3,1,3使用Terms对其聚合,可以得到1桶,2桶,3桶。

看出ES的分组方式相当强大,mysql的group by只能实现类似Terms Aggregation的分组效果,而ES还可以根据阶梯和范围来分组。

Pipeline(管道)
对其他聚合的输出或相关指标进行二次聚合 比如Terms聚合后拿到了1,2,3桶这个时候我们可以在对他其他的属性进行聚合 也就是对结果在聚合 和数据库中 多字段分组一个意思(多字段分组可以使用script脚本去聚合需要添加配置 但是我试了下没什么用 如果有那个大神有好的解决办法可以分享出来 我这边用的是管道聚合也就是聚合套聚合 或者是在洗数据的时候拼接好分组字段)

Metric(指标)
指标聚合类似于 COUNT() 、 SUM() 、 MAX() 等统计方法

  • Avg Aggregation:求平均值
  • Max Aggregation:求最大值
  • Min Aggregation:求最小值
  • Percentiles Aggregation:求百分比
  • Stats Aggregation:同时返回avg、max、min、sum、count等
  • Sum Aggregation:求和
  • Top hits Aggregation:求前几
  • Value Count Aggregation:求总数

Term Aggregation

GET pre_package/_search
{
  "aggs": {
    "Group": {
      "terms": {
        "field": "productGroupId",
        "size": 10
      }
    }
  }
}

在这里插入图片描述
这个表示,查询索引为pre_package中的文档数据,并按照cargoOwnerName进行聚合查询,命名为:Group,且只查询前10条

Range Aggregation

GET pre_package/_search
{
  "aggs": {
    "Group": {
      "range": {
        "field": "isPrePackage",
        "ranges": [
          {
            "to": 1
          },
          {
            "from": 1,
            "to": 2
          },
          {
            "from": 2
          }
        ]
      }
    }
  }
}

在这里插入图片描述

按照isPrePackage属性,分为三档,分别为:小于1,1到2,大于2

Date Range Aggregation

GET pre_package/_search
{
  "aggs": {
    "Group": {
      "date_range": {
        "field": "update_date",
        
        "ranges": [
          {
            "to": "2020-05-01 00:00:00"
          },
          {
            "from": "2020-05-02 00:00:00",
            "to": "2020-08-01 00:00:00"
          },
          {
            "from": "2020-08-02 00:00:00"
          }
        ]
      }
    }
  }
}

基于时间范围的聚合查询
Filter Aggregation

GET pre_package/_search
{
  "aggs": {
    "flight_Miles": {
      "filter": {
         "term": {
          "cargoOwnerName": "联合利华测试"
        }
      }
    }
  }
}

在这里插入图片描述

对经过Filter条件过滤后的结果集进行聚合查询
Missing Aggregation

GET pre_package/_search
{
  "aggs": {
    "without_age": {
      "missing": {
        "field": "orderTypeName"
      }
    }
  }
}

在这里插入图片描述
统计文档中缺失字段的数量,缺失字段包含值为null的情况
Histogram Aggregation

GET pre_package/_search
{
  "aggs": {
    "test": {
      "histogram": {
        "field": "id",
        "interval": 100
      }
    }
  }
}

在这里插入图片描述
直方图聚合,可按照一定的区间进行统计

ElasticsearchRestTemplate操作帮助类

package com.xxl.job.executor.utils;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.xxl.job.executor.es.EsSortPage;
import com.xxl.job.executor.es.QueryCondition;
import com.xxl.job.executor.es.TermQueryReq;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedAvg;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


/**
 * @description: es操作类(ElasticsearchRestTemplate)
 * @Author: 
 * @NAME: ElasticsearchRestEsUtils
 * @date: 2022/7/1 14:17
 */
@Component
public class ElasticsearchRestEsUtils {

    @Autowired
    private ElasticsearchRestTemplate elasticsearchTemplate;


    /**
     * @description: 创建索引
     * @author: 
     * @date 2022/7/1 14:07
     * @param: [cla]
     * @return: boolean
     */
    public boolean createIndexOps(Class cla) {
        // 索引别名
        IndexOperations ops = elasticsearchTemplate.indexOps(cla);
        if (!ops.exists()) {
            ops.create();
            ops.refresh();
            ops.putMapping(ops.createMapping());
        }
        return true;
    }

    /**
     * @description: 删除索引
     * @author: 
     * @date 2022/7/1 15:23
     * @param: [cla]
     * @return: void
     */
    public void deleteIndex(Class cla) {
        // 这里使用了 restTemplate 的 indexOps() 获取 IndexOperations 对象操作索引
        // dao 所提供的方法不支持操作索引
        boolean delete = elasticsearchTemplate.indexOps(cla).delete();
        System.out.println("delete = " + delete);
    }

    /**
     * @description: 分页查询全部
     * @author: 
     * @date 2022/7/1 15:37
     * @param: [esSortPage]
     * @return: java.util.List<?>
     */
    public List<?> findByPageable(EsSortPage esSortPage) {
        List<Object> relist = new ArrayList<>();
        // 设置排序(排序方式,正序还是倒序,排序的 id)
        Sort sort = null;
        if (esSortPage.getSort() == 1) {
            sort = Sort.by(Sort.Direction.DESC, esSortPage.getSortField());
        } else {
            sort = Sort.by(Sort.Direction.ASC, esSortPage.getSortField());
        }

        int currentPage = esSortPage.getCurrentPage();
        int pageSize = esSortPage.getPageSize();

        // 设置查询分页
        PageRequest pageRequest = PageRequest.of(currentPage, pageSize, sort);
        ElasticsearchRepository<?, Long> dao = esSortPage.getDao();
        //分页查询
        Page<?> daoAll = dao.findAll(pageRequest);
        List<?> content = daoAll.getContent();
        for (Object o : content) {
            relist.add(o);
        }
        return relist;
    }

    /**
     * @description: 根据组装条件分页查询  会根据类型去匹配全等还是分词模糊查询
     * @author: 
     * @date 2022/7/3 11:07
     * @param: [termQueryReq]
     * @return: java.util.List<?>
     */
    public List<?> findByTermQuery(TermQueryReq termQueryReq) {
        List<Object> relist = new ArrayList<>();
        // 设置排序(排序方式,正序还是倒序)
        FieldSortBuilder balance = null;
        if (termQueryReq.getSort() == 1) {
            balance = new FieldSortBuilder(termQueryReq.getSortField()).order(SortOrder.DESC);
        } else {
            balance = new FieldSortBuilder(termQueryReq.getSortField()).order(SortOrder.ASC);
        }
        //分页条件
        int currentPage = termQueryReq.getCurrentPage();
        int pageSize = termQueryReq.getPageSize();
        // 构建查询条件
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        ;
        List<QueryCondition> mapList = termQueryReq.getMapList();
        for (QueryCondition queryCondition : mapList) {
            if (queryCondition.getFiltration() == 1) {//过滤
                RangeQueryBuilder date = QueryBuilders.rangeQuery(queryCondition.getKey()).gte(queryCondition.getValue()).lte(queryCondition.getEndValue());
                boolQueryBuilder.filter(date);
            } else {
                if (queryCondition.getExtractive() == 1) {
                    if (queryCondition.getValue() != null) {
                        boolQueryBuilder.must(QueryBuilders.matchQuery(queryCondition.getKey(), queryCondition.getValue()).minimumShouldMatch(termQueryReq.getScore()));//多条件全匹配 and  因为text会分词匹配 所以得设置一个百分比得分 越高月精确
                    }
                } else {
                    if (queryCondition.getValue() instanceof Integer[]) {
                        Integer[] pulldown = (Integer[]) queryCondition.getValue();
                        for (Integer integer : pulldown) {
                            boolQueryBuilder.should(QueryBuilders.matchQuery(queryCondition.getKey(), integer));//多条件匹配之一 or
                        }
                    }
                }
            }
        }

        // 分页
        Pageable pageable = PageRequest.of(currentPage, pageSize);

        // 执行查询
        NativeSearchQuery query = new NativeSearchQueryBuilder()
                .withQuery(boolQueryBuilder)
                .withPageable(pageable)
                .withSort(balance)
                .build();

        List<Object> resList = new ArrayList<>();
        SearchHits<?> search = elasticsearchTemplate.search(query, termQueryReq.getCls());
        for (SearchHit<?> searchHit : search) {
            resList.add(searchHit.getContent());
        }
        return resList;
    }

    /**
     * @description: 聚合搜索 类似于group by,对termQueryReq.getAggregate()字段进行聚合,
     * @author: 
     * @date 2022/7/3 11:32
     * @param: [termQueryReq]
     * @return: java.util.List<?>
     */
    public List<Map<String, Object>> findPolymerization(TermQueryReq termQueryReq) {
        NativeSearchQuery query = new NativeSearchQueryBuilder()
                .addAggregation(AggregationBuilders.terms("count").field(termQueryReq.getAggregate() + ".keyword"))
                .build();

        SearchHits<?> searchHits = elasticsearchTemplate.search(query, termQueryReq.getCls());

        //取出聚合结果
        Aggregations aggregations = searchHits.getAggregations();
        Terms terms = (Terms) aggregations.asMap().get("count");
        List<Map<String, Object>> mapList = new ArrayList<>();
        for (Terms.Bucket bucket : terms.getBuckets()) {
            Map<String, Object> map = new HashMap<>();
            String keyAsString = bucket.getKeyAsString();   // 聚合字段列的值
            long docCount = bucket.getDocCount();           // 聚合字段对应的数量
            map.put("keyAsString", keyAsString);
            map.put("docCount", docCount);
            mapList.add(map);
        }
        return mapList;
    }

    /**
     * @description: 嵌套聚合 统计出相同termQueryReq.getAggregate()的文档数量,再统计出termQueryReq.getNes()的平均值,带排序
     * @author: 
     * @date 2022/7/3 11:36
     * @param: [termQueryReq]
     * @return: java.util.List<?>
     */
    public List<Map<String, Object>> findNest(TermQueryReq termQueryReq) {
        // 创建聚合查询条件
        TermsAggregationBuilder stateAgg = AggregationBuilders.terms("count").field(termQueryReq.getAggregate() + ".keyword");
        AvgAggregationBuilder balanceAgg = AggregationBuilders.avg("avg_" + termQueryReq.getNes()).field(termQueryReq.getNes());
        // 嵌套
        stateAgg.subAggregation(balanceAgg);
        // 按balance的平均值降序排序
        if (termQueryReq.getSort() == 1) {
            stateAgg.order(BucketOrder.aggregation("avg_" + termQueryReq.getNes(), true));
        } else {
            stateAgg.order(BucketOrder.aggregation("avg_" + termQueryReq.getNes(), false));
        }


        NativeSearchQuery build = new NativeSearchQueryBuilder()
                .addAggregation(stateAgg)
                .build();
        //执行查询
        SearchHits<?> searchHits = elasticsearchTemplate.search(build, termQueryReq.getCls());
        // 取出聚合结果
        Aggregations aggregations = searchHits.getAggregations();
        Terms terms = (Terms) aggregations.asMap().get("count");

        List<Map<String, Object>> mapList = new ArrayList<>();
        for (Terms.Bucket bucket : terms.getBuckets()) {
            // state : count : avg
            Map<String, Object> map = new HashMap<>();
            ParsedAvg avg = bucket.getAggregations().get("avg_" + termQueryReq.getNes());
            map.put("state", bucket.getKeyAsString());
            map.put("count", bucket.getDocCount());
            map.put("avg", avg.getValueAsString());
            mapList.add(map);
        }
        return mapList;
    }

    /**
     * @description: 过滤搜索
     * @author: 
     * @date 2022/7/3 13:15
     * @param: [termQueryReq]
     * @return: java.util.List<?>
     */
    public List<?> findFiltration(TermQueryReq termQueryReq) {
        // 构建条件
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        RangeQueryBuilder balance = QueryBuilders.rangeQuery("balance").gte(20000).lte(30000);
        boolQueryBuilder.filter(balance);

        NativeSearchQuery query = new NativeSearchQueryBuilder()
                .withQuery(boolQueryBuilder)
                .build();

        List<Object> resList = new ArrayList<>();
        SearchHits<?> search = elasticsearchTemplate.search(query, termQueryReq.getCls());
        for (SearchHit<?> searchHit : search) {
            resList.add(searchHit.getContent());
        }

        return resList;
    }

    /**
     * @description: 判断索引是否已存在
     * @author: 
     * @date 2022/7/3 13:21
     * @param: [indexName]
     * @return: boolean
     */
    public boolean indexExist(String indexName) {
        if (StringUtils.isBlank(indexName)) {
            return false;
        }
        IndexCoordinates indexCoordinates = IndexCoordinates.of(indexName);
        return elasticsearchTemplate.indexOps(indexCoordinates).exists();
    }

    /**
     * @description: 根据索引名称,删除索引
     * @author: 
     * @date 2022/7/3 13:24
     * @param: [index]
     * @return: void
     */
    public void indexDelete(String index) {
        elasticsearchTemplate.indexOps(IndexCoordinates.of(index)).delete();
    }

    /**
     * @description: 索引添加别名
     * @author: 
     * @date 2022/7/3 13:25
     * @param: [indexName, aliasName]
     * @return: boolean
     */
    public boolean indexAddAlias(String indexName, String aliasName) {
        if (StringUtils.isBlank(indexName) || StringUtils.isBlank(aliasName)) {
            return false;
        }
        // 索引封装类
        IndexCoordinates indexCoordinates = IndexCoordinates.of(indexName);
        // 判断索引是否存在
        if (elasticsearchTemplate.indexOps(indexCoordinates).exists()) {
            // 索引别名
            AliasQuery query = new AliasQuery(aliasName);
            // 添加索引别名
            boolean bool = elasticsearchTemplate.indexOps(indexCoordinates).addAlias(query);
            return bool;
        }
        return false;
    }

    /**
     * @description: 索引别名删除
     * @author: 
     * @date 2022/7/3 13:25
     * @param: [indexName, aliasName]
     * @return: boolean
     */
    public boolean indexRemoveAlias(String indexName, String aliasName) {
        if (StringUtils.isBlank(indexName) || StringUtils.isBlank(aliasName)) {
            return false;
        }
        // 索引封装类
        IndexCoordinates indexCoordinates = IndexCoordinates.of(indexName);
        // 判断索引是否存在
        if (elasticsearchTemplate.indexOps(indexCoordinates).exists()) {
            // 索引别名
            AliasQuery query = new AliasQuery(aliasName);
            // 删除索引别名
            boolean bool = elasticsearchTemplate.indexOps(indexCoordinates).removeAlias(query);
            return bool;
        }
        return false;
    }

    /**
     * @description: 索引新增数据
     * @author: 
     * @date 2022/7/3 13:26
     * @param: [t]
     * @return: void
     */
    public <T> void save(T t) {
        // 根据索引实体名新增数据
        elasticsearchTemplate.save(t);
    }

    /**
     * @description: 批量插入数据
     * @author: 
     * @date 2022/7/3 13:26
     * @param: [queries, index]
     * @return: void
     */
    public void bulkIndex(List<IndexQuery> queries, String index) {
        // 索引封装类
        IndexCoordinates indexCoordinates = IndexCoordinates.of(index);
        // 批量新增数据,此处数据,不要超过100m,100m是es批量新增的筏值,修改可能会影响性能
        elasticsearchTemplate.bulkIndex(queries, indexCoordinates);
    }

    /**
     * @description: 批量插入数据
     * @author: 
     * @date 2022/7/3 13:26
     * @param: [queries, index]
     * @return: void
     */
    public void bulkCls(List<IndexQuery> queries, Class cls) {
        // 索引封装类
        // 批量新增数据,此处数据,不要超过100m,100m是es批量新增的筏值,修改可能会影响性能
        createIndexOps(cls);
        elasticsearchTemplate.bulkIndex(queries, cls);
    }

    /**
     * 根据条件删除对应索引名称的数据
     *
     * @param c         索引类对象
     * @param filedName 索引中字段
     * @param val       删除条件
     * @param index     索引名
     */
    public void delete(Class c, String filedName, Object val, String index) {
        // 匹配文件查询
        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(filedName, val);
        NativeSearchQuery nativeSearchQuery = new NativeSearchQuery(termQueryBuilder);
        // 删除索引数据
        elasticsearchTemplate.delete(nativeSearchQuery, c, IndexCoordinates.of(index));
    }

    /**
     * 根据数据id删除索引
     *
     * @param id    索引id
     * @param index
     */
    public void deleteById(Object id, String index) {
        if (null != id && StringUtils.isNotBlank(index)) {
            // 根据索引删除索引id数据
            elasticsearchTemplate.delete(id.toString(), IndexCoordinates.of(index));
        }
    }

    /**
     * 根据id更新索引数据,不存在则创建索引
     *
     * @param t     索引实体
     * @param id    主键
     * @param index 索引名称
     * @param <T>   索引实体
     */
    public <T> void update(T t, Integer id, String index) {
        // 查询索引中数据是否存在
        Object data = elasticsearchTemplate.get(id.toString(), t.getClass(), IndexCoordinates.of(index));
        if (data != null) {
            // 存在则更新
            UpdateQuery build = UpdateQuery.builder(id.toString()).withDocument(Document.parse(JSON.toJSONString(t))).build();
            elasticsearchTemplate.update(build, IndexCoordinates.of(index));
        } else {
            // 不存在则创建
            elasticsearchTemplate.save(t);
        }
    }

    /**
     * 拼接推送数据信息
     *
     * @param obj
     * @return
     */
    public IndexQuery assembleDataEs(Object obj) {
        //拼接数据
        IndexQuery indexQuery = new IndexQuery();
        try {
            Field field = getDeclareField(obj, "id");
            field.setAccessible(true);
            indexQuery.setId(field.get(obj).toString());
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        indexQuery.setSource(JSON.toJSONString(obj));
        return indexQuery;
    }

    /**
     * 读取Field(含父类属性)
     *
     * @param o
     * @param fieldName
     * @return
     */
    public static Field getDeclareField(Object o, String fieldName) {
        Field field = null;
        Class<?> clazz = o.getClass();
        for (; field == null; clazz = clazz.getSuperclass()) {
            try {
                field = clazz.getDeclaredField(fieldName);
            } catch (NoSuchFieldException e) {
                //e.printStackTrace();
            }
        }
        return field;
    }
}

RestHighLevelClient操作帮助类

package com.xxl.job.executor.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xxl.job.executor.es.QueryCondition;
import com.xxl.job.executor.es.TermQueryReq;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
//import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

/**
 * @description: es操作类(RestHighLevelClient)
 * @Author: 
 * @NAME: EsUtils
 * @date: 2022/7/1 13:39
 */
@Slf4j
@Component
public class RestHighLeveEsUtils {

    @Autowired
    private  RestHighLevelClient restHighLevelClient;

    /**
     * @description: 创建索引
     * @author: 
     * @date 2022/7/1 14:02
     * @param: [indexName]
     * @return: boolean
     */
    public  boolean createIndex(String indexName) {
        //返回结果
        boolean exists = true;
        try {
            // 1、创建索引请求
            CreateIndexRequest request = new CreateIndexRequest(indexName);
            // 2、客户端执行请求 indexResponse, 请求后获得相应
            CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
            //判断响应对象是否为空
            if (createIndexResponse.equals("") || createIndexResponse != null) {
                exists = false;
            }
        } catch (IOException e) {
            exists = false;
        }
        return exists;
    }

    /**
     * @description: 判断索引是否存在
     * @author: 
     * @date 2022/7/1 14:03
     * @param: [indexName]
     * @return: boolean
     */
    public  boolean isIndexExists(String indexName) {
        boolean exists = true;
        try {
            GetIndexRequest request = new GetIndexRequest(indexName);
            exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            exists = false;
        }
        return exists;
    }

    /**
     * @description: 删除索引
     * @author: 
     * @date 2022/7/1 14:03
     * @param: [indexName]
     * @return: boolean
     */
    public  boolean delIndex(String indexName) {
        boolean exists = true;

        try {
            DeleteIndexRequest request = new DeleteIndexRequest(indexName);
            AcknowledgedResponse delete = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);
            exists = delete.isAcknowledged();
        } catch (IOException e) {
            exists = false;
        }

        return exists;
    }

    /**
     * @description: 更新文档的信息
     * @author: 
     * @date 2022/7/1 14:28
     * @param: [indexName, obj, id]
     * @return: boolean
     */
    public  boolean updateDocument(String indexName, Object obj, String id) {
        boolean exists = true;
        try {
            UpdateRequest updateRequest = new UpdateRequest(indexName, id);
            updateRequest.timeout("1s");
            updateRequest.doc(JSON.toJSONString(obj), XContentType.JSON);
            UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
            if (!updateResponse.status().equals("OK")) {
                exists = false;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return exists;
    }

    /**
     * @description: 删除文档记录
     * @author: 
     * @date 2022/7/1 14:28
     * @param: [indexName, id]
     * @return: boolean
     */
    public  boolean deleteRequest(String indexName, String id) {
        boolean exists = true;
        try {
            DeleteRequest request = new DeleteRequest(indexName, id);
            request.timeout("1s");
            DeleteResponse delete = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
            if (!delete.status().equals("OK")) {
                exists = false;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return exists;
    }


    /**
     * @description: 根据id获取文档信息
     * @author: 
     * @date 2022/7/1 14:29
     * @param: [indexName, id]
     * @return: java.util.Map
     */
    public Map getDocument(String indexName, String id) {
        Map strToMap = null;
        try {
            GetRequest request = new GetRequest(indexName, id);
            GetResponse getResponse = restHighLevelClient.get(request, RequestOptions.DEFAULT);
            strToMap = JSONObject.parseObject(getResponse.getSourceAsString());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return strToMap;
    }


    /**
     * @description: 创建bulkProcessor并初始化
     * @author: River
     * @date 2022/7/1 14:30
     * @param: [client]
     * @return: org.elasticsearch.action.bulk.BulkProcessor
     */
    private  BulkProcessor getBulkProcessor(RestHighLevelClient client) {

        BulkProcessor bulkProcessor = null;
        try {

            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    log.info("Try to insert data number : " + request.numberOfActions());
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                    log.info("************** Success insert data number : " + request.numberOfActions() + " , id: "
                            + executionId);
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    log.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
                }
            };

            BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
                    .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

            BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
            // 设置最大的上传数量
            builder.setBulkActions(1000);
            builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
            // 设置最多的线程并发数
            builder.setConcurrentRequests(2);
            builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
            bulkProcessor = builder.build();

        } catch (Exception e) {
            e.printStackTrace();
            try {
                bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
                client.close();
            } catch (Exception e1) {
                log.error(e1.getMessage());
            }
        }
        return bulkProcessor;
    }

    /**
     * @description: 批量插入
     * @author: 
     * @date 2022/7/1 14:31
     * @param: [objectArrayList, indexName, value]
     * @return: boolean
     */
    public  boolean bulkRequest(ArrayList<Map<String, Object>> objectArrayList, String indexName, String value) {
        boolean exists = true;
        BulkProcessor bulkProcessor = getBulkProcessor(restHighLevelClient);
        try {
            for (int i = 0; i < objectArrayList.size(); i++) {
                bulkProcessor.add(new IndexRequest(indexName)
                        .id(objectArrayList.get(i).get(value).toString())
                        .source(JSON.toJSONString(objectArrayList.get(i)), XContentType.JSON));
            }
            // 将数据刷新到es
            bulkProcessor.flush();
        } catch (Exception e) {
            log.error(e.getMessage());
        } finally {
            try {
                boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
                log.info(String.valueOf(terminatedFlag));
            } catch (Exception e) {
                log.error(e.getMessage());
            }
        }
        return exists;
    }

   /**
    * @description: 创建文档
    * @author: 
    * @date 2022/7/3 16:03
    * @param: [indexName, obj, id]
    * @return: boolean
    */
    public  boolean addDocument(String indexName, Object obj, String id) {
        boolean exists = true;
        IndexResponse indexResponse = null;

        try {
            // 创建请求
            IndexRequest request = new IndexRequest(indexName);
            // 规则 put /kuang_index/_doc/1
            request.id(id);
            request.timeout(TimeValue.timeValueDays(1));
            // 将我们的数据放入请求 json
            request.source(JSON.toJSONString(obj), XContentType.JSON);
            // 客户端发送请求,获取响应结果
            indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
            if (!indexResponse.equals("CREATED")) {//判断响应结果对象是否为CREATED
                exists = false;
            }
        } catch (IOException e) {
            exists = false;
        }
        return exists;
    }

    /**
     * @description: 获取文档,判断是否存在
     * @author: 
     * @date 2022/7/3 16:03
     * @param: [indexName, id]
     * @return: boolean
     */
    public  boolean isExists(String indexName, String id) {
        boolean exists = true;
        try {
            GetRequest request = new GetRequest(indexName, id);
            // 不获取返回的 _source 的上下文了
            request.fetchSourceContext(new FetchSourceContext(false));
            request.storedFields("_none_");
            exists = restHighLevelClient.exists(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return exists;
    }

    /**
     * @description: 模糊查询
     * @author: 
     * @date 2022/7/3 16:05
     * @param: [indexName, key, value]
     * @return: java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
     */
    public List<Map<String, Object>> searchMatch(String indexName, String key, String value) throws IOException {
        List<Map<String, Object>> map = new ArrayList<>();
        SearchRequest searchRequest = new SearchRequest(indexName);
        // 构建搜索条件
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

        MatchQueryBuilder termQueryBuilder = new MatchQueryBuilder(key, value);
        termQueryBuilder.fuzziness(Fuzziness.AUTO);
        sourceBuilder.query(termQueryBuilder);
        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
        searchRequest.source(sourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        for (SearchHit documentFields : searchResponse.getHits().getHits()) {
            map.add(documentFields.getSourceAsMap());
        }
        return map;
    }

    /**
     * @description: 精确查询
     * @author: 
     * @date 2022/7/3 16:05
     * @param: [indexName, key, value]
     * @return: java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
     */
    public  List<Map<String, Object>> searchQuery(String indexName, String key, String value) throws IOException {
        List<Map<String, Object>> map = new ArrayList<>();
        SearchRequest searchRequest = new SearchRequest(indexName);
        // 构建搜索条件
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(key, value);
        sourceBuilder.query(termQueryBuilder);
        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
        searchRequest.source(sourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        for (SearchHit documentFields : searchResponse.getHits().getHits()) {
            map.add(documentFields.getSourceAsMap());
        }
        return map;
    }

    /**
     * @description: 根据条件修改
     * @author: 
     * @date 2022/7/3 16:05
     * @param: [catalogId, timeliness]
     * @return: void
     */
    public  void EsupdateTimeliness(String catalogId,String timeliness,String indexName,String catalogName){
        //构建条件
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        boolQueryBuilder.must(QueryBuilders.termQuery(catalogName, catalogId));
        //查询一遍,若没有数据则不执行修改
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(boolQueryBuilder);
        SearchRequest request = new SearchRequest(indexName);
        request.source(sourceBuilder);

        //执行查询修改
        UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName)
                .setQuery(boolQueryBuilder)
                .setScript(new Script("ctx._source['timeliness']='" + timeliness + "'"));
        try {
            SearchResponse search = restHighLevelClient.search(request, RequestOptions.DEFAULT);
            long docMun = search.getHits().getTotalHits().value;
            if (docMun != 0) {
                restHighLevelClient.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT).getUpdated();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    private SearchResponse getCommodityList(String index,TermQueryReq req) {
        SearchRequest searchRequest = new SearchRequest(index);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        //根据ID进行排序
        sourceBuilder.sort("id", SortOrder.ASC);
        //查询条件
        BoolQueryBuilder mustQuery = QueryBuilders.boolQuery();
        List<QueryCondition> mapList = req.getMapList();
        for (QueryCondition queryCondition : mapList) {
            if (queryCondition.getExtractive() == 1){//and
                if (queryCondition.getValue() != null){
                    mustQuery.must(QueryBuilders.termQuery(queryCondition.getKey(), queryCondition.getValue()));
                }
            }else{
                Integer[] pulldown = (Integer[]) queryCondition.getValue();
                for (Integer integer : pulldown) {
                    mustQuery.should(QueryBuilders.matchQuery(queryCondition.getKey(), integer));//多条件匹配之一 or
                }
            }
        }
        //查询字段
        StringBuffer fields = new StringBuffer();
        for (QueryCondition queryCondition : mapList) {
            fields.append(queryCondition.getKey()).append(",");
        }
        sourceBuilder.fetchSource(new FetchSourceContext(true, fields.toString().split(","), Strings.EMPTY_ARRAY));
        //分页
        sourceBuilder.query(mustQuery);
        sourceBuilder.from((req.getPageSize() - 1) * req.getPageSize());
        sourceBuilder.size(req.getPageSize());
        sourceBuilder.trackTotalHits(true);
        searchRequest.source(sourceBuilder);
        //开始查询
        SearchResponse response;
        List<Map<String, Object>> list = null;
        try {
            response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
        return response;
    }

}

二次聚合

网上说在es5.0以后二次聚合如果使用script脚本去聚合需要添加配置 但是我试了下没什么用 如果有那个大神有好的解决办法可以分享出来 我这边用的是管道聚合也就是聚合套聚合 或者是在洗数据的时候拼接好分组字段

PageInfo<DespatchRes> pageInfo = new PageInfo<>();
        List<DespatchRes> relist = Lists.newArrayList();

        //起始条数
        int from = (req.getPageNum() - 1) * (req.getPageSize() - 1);
        //末尾条数
        int limit = from+req.getPageNum();
        //根据productGroupId进行分桶   分通聚合
        TermsAggregationBuilder stateTime = AggregationBuilders.terms("stateTime").field("productGroupId").size(Integer.MAX_VALUE);
        TermsAggregationBuilder stateAgg = AggregationBuilders.terms("GroupId").field("pickupTime").size(Integer.MAX_VALUE);
        stateAgg.order(BucketOrder.aggregation("_count", true));//根据count数量排序

        //取出首条数据作为展示数据
        stateAgg.subAggregation(AggregationBuilders.topHits("Group").size(1));
        //对聚合后的结果进行排序和分页
        stateAgg.subAggregation(new BucketSortPipelineAggregationBuilder("pageInfo", null).from(from).size(limit));

        //使用指标聚合 拿到分桶中的非预报数
        stateAgg.subAggregation(AggregationBuilders.sum("perSum").field("isPrePackage"));
        //将stateTime桶中的数据拼接到stateAgg桶 二次聚合也就是管道聚合
        stateAgg.subAggregation(stateTime);

        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

        //时间范围
        if (StringUtils.isNotBlank(req.getPickupStartTime()) && StringUtils.isNotBlank(req.getPickupEndTime())){
            RangeQueryBuilder date1 = QueryBuilders.rangeQuery("pickupTime").from(req.getPickupStartTime()).to(req.getPickupEndTime());
            boolQueryBuilder.filter(date1);
        }
        boolQueryBuilder.must(QueryBuilders.matchQuery("sts", 0));
        boolQueryBuilder.must(QueryBuilders.matchQuery("tenantId", user.getTenantId()));
        //添加must条件
        if (ObjectUtils.isNotEmpty(req.getCargoOwnerId())){
            boolQueryBuilder.must(QueryBuilders.matchQuery("cargoOwnerId", req.getCargoOwnerId()));
        }

        //添加should条件
        if (ObjectUtils.isNotEmpty(req.getWarehouseId())){
            List<Long> warehouseId = req.getWarehouseId();
            for (Long aLong : warehouseId) {
                boolQueryBuilder.should(QueryBuilders.matchQuery("warehouseId", aLong));
            }
        }
        if (ObjectUtils.isNotEmpty(req.getWarehouseCodeId())){
            List<Long> warehouseCodeId = req.getWarehouseCodeId();
            for (Long aLong : warehouseCodeId) {
                boolQueryBuilder.should(QueryBuilders.matchQuery("referenceThree", aLong));
            }
        }
        if (ObjectUtils.isNotEmpty(req.getOrderTypeId())){
            List<Long> warehouseCodeId = req.getOrderTypeId();
            for (Long aLong : warehouseCodeId) {
                boolQueryBuilder.should(QueryBuilders.matchQuery("orderTypeId", aLong));
            }
        }

        // 构建查询条件
        NativeSearchQuery build = new NativeSearchQueryBuilder()
                .withQuery(boolQueryBuilder)
                .withTrackTotalHits(true)
                .addAggregation(stateAgg)
                .build();

        //执行查询
        SearchHits<DespatchEs> searchHits = elasticsearchTemplate.search(build, DespatchEs.class);
        Aggregations aggregations = searchHits.getAggregations();
        Terms terms = (Terms) aggregations.asMap().get("GroupId");
        for (Terms.Bucket bucket : terms.getBuckets()) {
            DespatchRes rs = new DespatchRes();
            //获取count值
            rs.setNumber(bucket.getDocCount());
            Aggregations bucketAggregations = bucket.getAggregations();
            //获取分组数据
            TopHits ns = (TopHits) bucketAggregations.asMap().get("Group");
            SearchHit[] hits = ns.getHits().getHits();
            for (SearchHit hit : hits) {
                rs.setId(Long.parseLong(hit.getId()));
                Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                rs.setPickupTime(sourceAsMap.get("pickupTime") == null ? "" :sourceAsMap.get("pickupTime").toString());
                rs.setProductGroupDetail(sourceAsMap.get("productGroupDetail") == null ? "" : sourceAsMap.get("productGroupDetail").toString());
                rs.setProductGroupId(sourceAsMap.get("productGroupId") == null ? "":sourceAsMap.get("productGroupId").toString());
                rs.setReferenceThree(sourceAsMap.get("referenceThree") == null ? "":sourceAsMap.get("referenceThree").toString());
            }
            //获取非预包数
            Sum perSum = bucketAggregations.get("perSum");
            //总数-非预包数 = 预包数
            double v = bucket.getDocCount() - perSum.getValue();
            rs.setIsPrePackageNumber(v);
            relist.add(rs);
        }
        pageInfo.setList(relist);
        pageInfo.setTotal(terms.getBuckets().size());
        pageInfo.setPageNum(req.getPageNum());
        pageInfo.setPageSize(req.getPageSize());
        return ResultRes.success(pageInfo);

我始终相信 在这个世界上一定有着另外一个自己 在做着我不敢做的事情 过着我想过的生活

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-09-04 01:18:22  更:2022-09-04 01:22:37 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 0:23:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码