IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> springboot elasticsearch整合 (整合es操作) -> 正文阅读

[大数据]springboot elasticsearch整合 (整合es操作)

版本

**Spring Boot 2.6.0**
**spring-boot-starter-data-elasticsearch 4.3.0**

配置

spring.elasticsearch.uris=http://localhost:9201,http://localhost:9202,http://localhost:9203

RestService.java

package com.sjy.elasticsearch.config;

import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.CloseIndexRequest;
import org.elasticsearch.client.indices.CloseIndexResponse;
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Setting;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.SearchScrollHits;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
 * @author shijingyang
 */
@Component
public class RestService {
    protected static final Logger log = LoggerFactory.getLogger(RestService.class);

    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;
    private static final String PLACEHOLDERS = "%s";


    /**
     * 判断索引是否存在
     *
     * @return boolean
     */
    public boolean indexExists(String indexName) {
        try {
            return elasticsearchRestTemplate.indexOps(IndexCoordinates.of(indexName)).exists();
        } catch (Exception e) {
            log.error("RestService##indexExists:", e);
            return false;
        }
    }

    /**
     * 创建索引
     *
     * @param clazz 实体
     * @return boolean
     */
    public boolean createIndex(Class<?> clazz, String... placeholders) {
        try {
            String indexName = getIndexName(clazz, placeholders);
            if (!this.indexExists(indexName)) {
                HashMap<String, Object> settingMap = new HashMap<>();
                settingMap.put("number_of_shards", getShardsFromClass(clazz));
                settingMap.put("number_of_replicas", getReplicasFromClass(clazz));
                settingMap.put("max_result_window", 10000000);
                Document settings = Document.from(settingMap);
                return elasticsearchRestTemplate.indexOps(IndexCoordinates.of(indexName)).create(settings);
            } else {
                log.info("{} 索引已经存在", indexName);
            }
        } catch (Exception e) {
            log.error("RestService##createIndex:", e);
        }
        return false;
    }

    /**
     * 删除索引
     *
     * @param clazz 实体
     * @return boolean
     */
    public boolean deleteIndex(Class<?> clazz, String... placeholders) {
        try {
            String indexName = getIndexName(clazz, placeholders);
            if (this.indexExists(indexName)) {
                return elasticsearchRestTemplate.indexOps(IndexCoordinates.of(indexName)).delete();
            } else {
                log.info("{} 索引不存在", indexName);
            }
        } catch (Exception e) {
            log.error("RestService##deleteIndex:", e);
        }
        return false;
    }

    /**
     * 关闭索引
     *
     * @param clazz 实体
     * @return boolean
     */
    public boolean closeIndex(Class<?> clazz, String... placeholders) {
        try {
            String indexName = getIndexName(clazz, placeholders);
            CloseIndexRequest request = new CloseIndexRequest(indexName);
            CloseIndexResponse execute = elasticsearchRestTemplate.execute(client -> client.indices().close(request, RequestOptions.DEFAULT));
            boolean acknowledged = execute.isAcknowledged();
            if (acknowledged) {
                log.info("close index {} success" ,indexName );
            }else{
                log.info("close index {} fail" ,indexName );
            }
            return acknowledged;
        } catch (Exception e) {
            log.error("RestService##deleteIndex:", e);
            return false;
        }
    }

    /**
     * 打开索引
     *
     * @param clazz 实体
     * @return boolean
     */
    public boolean openIndex(Class<?> clazz, String... placeholders) {
        try {
            String indexName = getIndexName(clazz, placeholders);
            OpenIndexRequest request = new OpenIndexRequest(indexName);
            OpenIndexResponse execute = elasticsearchRestTemplate.execute(client -> client.indices().open(request, RequestOptions.DEFAULT));
            boolean acknowledged = execute.isAcknowledged();
            if (acknowledged) {
                log.info("open index {} success" ,indexName );
            }else{
                log.info("open index {} fail" ,indexName );
            }
            return acknowledged;
        } catch (Exception e) {
            log.error("RestService##deleteIndex:", e);
            return false;
        }
    }

    /**
     * 设置索引Mapping
     *
     * @param clazz
     * @param json
     * @param placeholders
     * @return
     */
    public boolean putMapping(Class<?> clazz, String json, String... placeholders) {
        try {
            String indexName = getIndexName(clazz, placeholders);
            return elasticsearchRestTemplate.indexOps(IndexCoordinates.of(indexName)).putMapping(Document.parse(json));
        } catch (Exception e) {
            log.error("RestService##deleteIndex:", e);
            return false;
        }
    }

    /**
     * 设置 template
     *
     * @param name
     * @param source
     * @return
     */
    public boolean putTemplate(String name, String source) {
        try {
            PutIndexTemplateRequest builder = new PutIndexTemplateRequest(name);
            builder.source(source, XContentType.JSON);
            AcknowledgedResponse execute = elasticsearchRestTemplate.execute(client -> client.indices().putTemplate(builder, RequestOptions.DEFAULT));
            return execute.isAcknowledged();
        } catch (Exception e) {
            log.error("RestService##putTemplate:", e);
            return false;
        }
    }

    /**
     * 新增文档
     *
     * @param clazz
     * @param source
     * @param placeholders
     * @param <T>
     * @return
     */
    public <T> String index(Class<T> clazz, T source, String... placeholders) {
        String indexName = getIndexName(clazz, placeholders);
        return elasticsearchRestTemplate.index(new IndexQueryBuilder().withId(getIdFromSource(source))
                .withObject(source).build(), IndexCoordinates.of(indexName));
    }

    /**
     * 批量插入
     *
     * @param list
     * @param clazz
     * @param placeholders
     * @return
     */
    public boolean bulkIndex(List<?> list, Class<?> clazz, String... placeholders) {
        String indexName = getIndexName(clazz, placeholders);
        try {
            if (list != null && !list.isEmpty()) {
                List<IndexQuery> indexQueries = new ArrayList<>();
                list.forEach(source ->
                        indexQueries.add(new IndexQueryBuilder().withId(getIdFromSource(source)).withObject(source).build()));
                elasticsearchRestTemplate.bulkIndex(indexQueries, IndexCoordinates.of(indexName));
            }
        } catch (Exception e) {
            log.error("RestService##bulkIndex:", e);
            return false;
        }
        return true;
    }


    /**
     * 滚动查询
     *
     * @param query
     * @param clazz
     * @param placeholders
     * @return
     */
    public SearchScrollHits<?> scrollFirst(Query query, Class<?> clazz, String... placeholders) {
        String indexName = getIndexName(clazz, placeholders);
        try {
            return elasticsearchRestTemplate.searchScrollStart(60000, query, clazz, IndexCoordinates.of(indexName));
        } catch (Exception e) {
            log.error("RestService##scrollFirst:", e);
        }
        return null;
    }

    public SearchScrollHits<?> scroll(String scrollId, Class<?> clazz, String... placeholders) {
        String indexName = getIndexName(clazz, placeholders);
        try {
            return elasticsearchRestTemplate.searchScrollContinue(scrollId, 60000, clazz, IndexCoordinates.of(indexName));
        } catch (Exception e) {
            log.error("RestService##scrollFirst:", e);
        }
        return null;
    }

    public SearchHits<?> search(Query query, Class<?> clazz, String... placeholders) {
        try {
            String indexName = getIndexName(clazz, placeholders);
            return elasticsearchRestTemplate.search(query, clazz, IndexCoordinates.of(indexName));
        } catch (Exception e) {
            log.error("RestService##scrollFirst:", e);
        }
        return null;
    }

    /**
     * 获取 @id 属性的数据
     *
     * @param source
     * @return
     */
    private String getIdFromSource(Object source) {
        if (source == null) {
            return null;
        } else {
            Field[] fields = source.getClass().getDeclaredFields();
            Field[] var2 = fields;
            int var3 = fields.length;

            for (int var4 = 0; var4 < var3; ++var4) {
                Field field = var2[var4];
                if (field.isAnnotationPresent(Id.class)) {
                    try {
                        field.setAccessible(true);
                        Object name = field.get(source);
                        return name == null ? null : name.toString();
                    } catch (IllegalAccessException var7) {
                    }
                }
            }
            return null;
        }
    }

    /**
     * 获取实体类注解索引名称
     *
     * @param source
     * @return
     */
    private String getIndexFromClass(Class<?> source) {
        try {
            return source.getAnnotation(org.springframework.data.elasticsearch.annotations.Document.class).indexName();
        } catch (Exception e) {
            log.error("RestService##getIndexFromClass", e);
        }
        return null;
    }

    private long getShardsFromClass(Class<?> source) {
        try {
            return source.getAnnotation(Setting.class).shards();
        } catch (Exception e) {
            log.error("RestService##getShardsFromClass", e);
        }
        return 1;
    }

    private long getReplicasFromClass(Class<?> source) {
        try {
            return source.getAnnotation(Setting.class).replicas();
        } catch (Exception e) {
            log.error("RestService##getReplicasFromClass", e);
        }
        return 1;
    }

    /**
     * 获取索引名称
     *
     * @param clazz
     * @param placeholders
     * @return
     */
    private String getIndexName(Class<?> clazz, String... placeholders) {
        String indexName = getIndexFromClass(clazz);
        Assert.notNull(indexName, "indexName must not be null");
        if (indexName.contains(PLACEHOLDERS)) {
            Assert.notEmpty(placeholders, "placeholders must not be null");
            indexName = String.format(indexName, placeholders);
        }
        return indexName;
    }
}

实体

LogMessage.java

package com.sjy.elasticsearch.entity;

import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.annotations.Setting;

import java.io.Serializable;
import java.util.Date;

/**
 * log-message-test-2021-11-24
 * log-message-dev-2021-11-24
 * @author shijingyang
 */
@Document(indexName = "log-message-%s-%s")
@Setting(shards = 2, replicas = 2)
public class LogMessage implements Serializable {

    @Id
    @Field(type = FieldType.Keyword)
    private String id;
    @Field(type = FieldType.Keyword)
    private String level;
    @Field(type = FieldType.Long)
    private Long createTime;
    @Field(type = FieldType.Text)
    private String msg;

    ...getter and setter...
}

Student.java

package com.sjy.elasticsearch.entity;

import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.annotations.Setting;

import java.io.Serializable;

/**
 * @author shijingyang
 */
@Document(indexName = "student")
@Setting(shards = 2, replicas = 2)
public class Student implements Serializable {

    public Student(long id, String name, Integer age, String sex, String desc) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.sex = sex;
        this.desc = desc;
    }

    @Id
    private long id;
    @Field(type = FieldType.Keyword)
    private String name;
    @Field(type = FieldType.Integer)
    private Integer age;
    @Field(type = FieldType.Keyword)
    private String sex;
    @Field(type = FieldType.Text)
    private String desc;
   ...getter and setter...  
}

StudentDao.java

package com.sjy.elasticsearch;

import com.sjy.elasticsearch.entity.Student;
import org.elasticsearch.client.ElasticsearchClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

/**
 * @author shijingyang
 */
public interface StudentDao extends ElasticsearchRepository<Student,Long> {

}

TEST

package com.sjy.elasticsearch;

import com.sjy.elasticsearch.config.RestService;
import com.sjy.elasticsearch.entity.LogMessage;
import com.sjy.elasticsearch.entity.Student;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.SearchScrollHits;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;

import java.time.LocalDate;
import java.util.ArrayList;
import java.util.UUID;

@SpringBootTest
class ElasticsearchApplicationTests {


    @Autowired
    private StudentDao studentDao;

    @Test
    void saveAll() {
        for (int j = 1; j <= 100; j++) {
            ArrayList<Student> students = new ArrayList<>();
            for (int i = 1; i <= 500; i++) {
                Student student = new Student(j * (i + 1), "student name" + j * (i + 1), j * (i + 1), j * (i + 1) + "", "I don't know");
                students.add(student);
            }
            long start = System.currentTimeMillis();
            studentDao.saveAll(students);
            long end = System.currentTimeMillis();
            System.out.println(j + " : " + (end - start));
        }
    }

    @Autowired
    RestService restService;

    @Test
    void indexExists() {
        boolean student = restService.indexExists("student");
        boolean student22 = restService.indexExists("student11");
        boolean student33 = restService.indexExists("student22");
        System.out.println("student22 = " + student22);
        System.out.println("student33 = " + student33);
        System.out.println("student = " + student);
    }

    @Test
    void test2() {

    }

    @Test
    void putTemplate() {
        // restService.putTemplate("content-template", "{\"index_patterns\":\"content_*\",\"order\":1,\"settings\":{\"number_of_shards\":4,\"number_of_replicas\":1},\"mappings\":{\"properties\":{\"@timestamp\":{\"type\":\"date\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}}");
        System.out.println(restService.putTemplate("log-template", "{\"index_patterns\":\"log-message-*\",\"order\":1,\"settings\":{\"number_of_shards\":2,\"number_of_replicas\":0,\"max_result_window\":10000000},\"mappings\":{\"properties\":{\"id\":{\"index\":true,\"type\":\"keyword\"},\"level\":{\"index\":true,\"type\":\"keyword\"},\"createTime\":{\"index\":true,\"type\":\"long\"},\"msg\":{\"index\":true,\"type\":\"text\"}}}}"));
    }

    @Test
    void deleteIndex() {
        System.out.println(restService.deleteIndex(LogMessage.class, "dev", LocalDate.now().toString()));
    }

    @Test
    void createIndex() {
        System.out.println(restService.createIndex(LogMessage.class, "dev", LocalDate.now().toString()));
    }

    @Test
    void closeIndex() {
        System.out.println(restService.closeIndex(LogMessage.class, "dev", LocalDate.now().toString()));
    }

    @Test
    void openIndex() {
        System.out.println(restService.openIndex(LogMessage.class, "dev", LocalDate.now().toString()));
    }

    @Test
    void index() {
        LogMessage logMessage = new LogMessage();
        logMessage.setId(UUID.randomUUID().toString());
        logMessage.setCreateTime(System.currentTimeMillis());
        logMessage.setMsg("elasticsearch yyds");
        logMessage.setLevel("INFO");
        System.out.println(restService.index(LogMessage.class, logMessage, "dev", LocalDate.now().toString()));
    }

    @Test
    void bulkIndex() {
        for (int j = 0; j < 100; j++) {
            ArrayList<LogMessage> logMessages = new ArrayList<>();
            for (int i = 0; i < 500; i++) {
                LogMessage logMessage = new LogMessage();
                logMessage.setId(UUID.randomUUID().toString());
                logMessage.setCreateTime(System.currentTimeMillis());
                logMessage.setMsg("elasticsearch yyds " + i + "-" + j);
                logMessage.setLevel("INFO");
                logMessages.add(logMessage);
            }
            restService.bulkIndex(logMessages, LogMessage.class, "dev", LocalDate.now().toString());
        }
    }

    @Test
    void scroll() {
        QueryBuilder age = QueryBuilders.matchQuery("msg", "elasticsearch yyds 0");
        RangeQueryBuilder createTime = QueryBuilders.rangeQuery("createTime").gte(1637754072221L).lte(1637754072221L);

        int pageSize = 100;
        Query query = new NativeSearchQueryBuilder().withQuery(age).withQuery(createTime).withPageable(Pageable.ofSize(pageSize)).build();
        String dateStr = LocalDate.now().toString();
        SearchScrollHits<?> searchHits = restService.scrollFirst(query, LogMessage.class,"dev", dateStr);
        long totalHits = searchHits.getSearchHits().size();

        System.out.println(totalHits);
        String scrollId = searchHits.getScrollId();
        while (totalHits == pageSize) {
            SearchScrollHits<?> scroll = restService.scroll(scrollId, LogMessage.class,"dev", dateStr);
            scrollId = scroll.getScrollId();
            totalHits = scroll.getSearchHits().size();
            System.out.println(totalHits);
        }
        System.out.println("down");
    }

    @Test
    void search() {
        QueryBuilder age = QueryBuilders.matchQuery("msg", "elasticsearch yyds 0");
        RangeQueryBuilder createTime = QueryBuilders.rangeQuery("createTime").gte(1637754072221L);
        int pageSize = 10000;
        PageRequest pageRequest = PageRequest.of(0, pageSize, Sort.Direction.ASC, "createTime");
        Query query = new NativeSearchQueryBuilder().withQuery(age).withQuery(createTime).withPageable(pageRequest).build();
        SearchHits<?> search = restService.search(query, LogMessage.class,"dev", LocalDate.now().toString());
        System.out.println(search.getSearchHits());
    }
}

参考

https://docs.spring.io/spring-data/elasticsearch/docs/4.3.0/reference/html/#

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-25 08:11:03  更:2021-11-25 08:11:25 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 15:59:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码