版本
**Spring Boot 2.6.0** **spring-boot-starter-data-elasticsearch 4.3.0**
配置
spring.elasticsearch.uris=http://localhost:9201,http://localhost:9202,http://localhost:9203
RestService.java
package com.sjy.elasticsearch.config;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.CloseIndexRequest;
import org.elasticsearch.client.indices.CloseIndexResponse;
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Setting;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.SearchScrollHits;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@Component
public class RestService {
protected static final Logger log = LoggerFactory.getLogger(RestService.class);
@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;
private static final String PLACEHOLDERS = "%s";
public boolean indexExists(String indexName) {
try {
return elasticsearchRestTemplate.indexOps(IndexCoordinates.of(indexName)).exists();
} catch (Exception e) {
log.error("RestService##indexExists:", e);
return false;
}
}
public boolean createIndex(Class<?> clazz, String... placeholders) {
try {
String indexName = getIndexName(clazz, placeholders);
if (!this.indexExists(indexName)) {
HashMap<String, Object> settingMap = new HashMap<>();
settingMap.put("number_of_shards", getShardsFromClass(clazz));
settingMap.put("number_of_replicas", getReplicasFromClass(clazz));
settingMap.put("max_result_window", 10000000);
Document settings = Document.from(settingMap);
return elasticsearchRestTemplate.indexOps(IndexCoordinates.of(indexName)).create(settings);
} else {
log.info("{} 索引已经存在", indexName);
}
} catch (Exception e) {
log.error("RestService##createIndex:", e);
}
return false;
}
public boolean deleteIndex(Class<?> clazz, String... placeholders) {
try {
String indexName = getIndexName(clazz, placeholders);
if (this.indexExists(indexName)) {
return elasticsearchRestTemplate.indexOps(IndexCoordinates.of(indexName)).delete();
} else {
log.info("{} 索引不存在", indexName);
}
} catch (Exception e) {
log.error("RestService##deleteIndex:", e);
}
return false;
}
public boolean closeIndex(Class<?> clazz, String... placeholders) {
try {
String indexName = getIndexName(clazz, placeholders);
CloseIndexRequest request = new CloseIndexRequest(indexName);
CloseIndexResponse execute = elasticsearchRestTemplate.execute(client -> client.indices().close(request, RequestOptions.DEFAULT));
boolean acknowledged = execute.isAcknowledged();
if (acknowledged) {
log.info("close index {} success" ,indexName );
}else{
log.info("close index {} fail" ,indexName );
}
return acknowledged;
} catch (Exception e) {
log.error("RestService##deleteIndex:", e);
return false;
}
}
public boolean openIndex(Class<?> clazz, String... placeholders) {
try {
String indexName = getIndexName(clazz, placeholders);
OpenIndexRequest request = new OpenIndexRequest(indexName);
OpenIndexResponse execute = elasticsearchRestTemplate.execute(client -> client.indices().open(request, RequestOptions.DEFAULT));
boolean acknowledged = execute.isAcknowledged();
if (acknowledged) {
log.info("open index {} success" ,indexName );
}else{
log.info("open index {} fail" ,indexName );
}
return acknowledged;
} catch (Exception e) {
log.error("RestService##deleteIndex:", e);
return false;
}
}
public boolean putMapping(Class<?> clazz, String json, String... placeholders) {
try {
String indexName = getIndexName(clazz, placeholders);
return elasticsearchRestTemplate.indexOps(IndexCoordinates.of(indexName)).putMapping(Document.parse(json));
} catch (Exception e) {
log.error("RestService##deleteIndex:", e);
return false;
}
}
public boolean putTemplate(String name, String source) {
try {
PutIndexTemplateRequest builder = new PutIndexTemplateRequest(name);
builder.source(source, XContentType.JSON);
AcknowledgedResponse execute = elasticsearchRestTemplate.execute(client -> client.indices().putTemplate(builder, RequestOptions.DEFAULT));
return execute.isAcknowledged();
} catch (Exception e) {
log.error("RestService##putTemplate:", e);
return false;
}
}
public <T> String index(Class<T> clazz, T source, String... placeholders) {
String indexName = getIndexName(clazz, placeholders);
return elasticsearchRestTemplate.index(new IndexQueryBuilder().withId(getIdFromSource(source))
.withObject(source).build(), IndexCoordinates.of(indexName));
}
public boolean bulkIndex(List<?> list, Class<?> clazz, String... placeholders) {
String indexName = getIndexName(clazz, placeholders);
try {
if (list != null && !list.isEmpty()) {
List<IndexQuery> indexQueries = new ArrayList<>();
list.forEach(source ->
indexQueries.add(new IndexQueryBuilder().withId(getIdFromSource(source)).withObject(source).build()));
elasticsearchRestTemplate.bulkIndex(indexQueries, IndexCoordinates.of(indexName));
}
} catch (Exception e) {
log.error("RestService##bulkIndex:", e);
return false;
}
return true;
}
public SearchScrollHits<?> scrollFirst(Query query, Class<?> clazz, String... placeholders) {
String indexName = getIndexName(clazz, placeholders);
try {
return elasticsearchRestTemplate.searchScrollStart(60000, query, clazz, IndexCoordinates.of(indexName));
} catch (Exception e) {
log.error("RestService##scrollFirst:", e);
}
return null;
}
public SearchScrollHits<?> scroll(String scrollId, Class<?> clazz, String... placeholders) {
String indexName = getIndexName(clazz, placeholders);
try {
return elasticsearchRestTemplate.searchScrollContinue(scrollId, 60000, clazz, IndexCoordinates.of(indexName));
} catch (Exception e) {
log.error("RestService##scrollFirst:", e);
}
return null;
}
public SearchHits<?> search(Query query, Class<?> clazz, String... placeholders) {
try {
String indexName = getIndexName(clazz, placeholders);
return elasticsearchRestTemplate.search(query, clazz, IndexCoordinates.of(indexName));
} catch (Exception e) {
log.error("RestService##scrollFirst:", e);
}
return null;
}
private String getIdFromSource(Object source) {
if (source == null) {
return null;
} else {
Field[] fields = source.getClass().getDeclaredFields();
Field[] var2 = fields;
int var3 = fields.length;
for (int var4 = 0; var4 < var3; ++var4) {
Field field = var2[var4];
if (field.isAnnotationPresent(Id.class)) {
try {
field.setAccessible(true);
Object name = field.get(source);
return name == null ? null : name.toString();
} catch (IllegalAccessException var7) {
}
}
}
return null;
}
}
private String getIndexFromClass(Class<?> source) {
try {
return source.getAnnotation(org.springframework.data.elasticsearch.annotations.Document.class).indexName();
} catch (Exception e) {
log.error("RestService##getIndexFromClass", e);
}
return null;
}
private long getShardsFromClass(Class<?> source) {
try {
return source.getAnnotation(Setting.class).shards();
} catch (Exception e) {
log.error("RestService##getShardsFromClass", e);
}
return 1;
}
private long getReplicasFromClass(Class<?> source) {
try {
return source.getAnnotation(Setting.class).replicas();
} catch (Exception e) {
log.error("RestService##getReplicasFromClass", e);
}
return 1;
}
private String getIndexName(Class<?> clazz, String... placeholders) {
String indexName = getIndexFromClass(clazz);
Assert.notNull(indexName, "indexName must not be null");
if (indexName.contains(PLACEHOLDERS)) {
Assert.notEmpty(placeholders, "placeholders must not be null");
indexName = String.format(indexName, placeholders);
}
return indexName;
}
}
实体
LogMessage.java
package com.sjy.elasticsearch.entity;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.annotations.Setting;
import java.io.Serializable;
import java.util.Date;
@Document(indexName = "log-message-%s-%s")
@Setting(shards = 2, replicas = 2)
public class LogMessage implements Serializable {
@Id
@Field(type = FieldType.Keyword)
private String id;
@Field(type = FieldType.Keyword)
private String level;
@Field(type = FieldType.Long)
private Long createTime;
@Field(type = FieldType.Text)
private String msg;
...getter and setter...
}
Student.java
package com.sjy.elasticsearch.entity;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.annotations.Setting;
import java.io.Serializable;
@Document(indexName = "student")
@Setting(shards = 2, replicas = 2)
public class Student implements Serializable {
public Student(long id, String name, Integer age, String sex, String desc) {
this.id = id;
this.name = name;
this.age = age;
this.sex = sex;
this.desc = desc;
}
@Id
private long id;
@Field(type = FieldType.Keyword)
private String name;
@Field(type = FieldType.Integer)
private Integer age;
@Field(type = FieldType.Keyword)
private String sex;
@Field(type = FieldType.Text)
private String desc;
...getter and setter...
}
StudentDao.java
package com.sjy.elasticsearch;
import com.sjy.elasticsearch.entity.Student;
import org.elasticsearch.client.ElasticsearchClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
/**
* @author shijingyang
*/
public interface StudentDao extends ElasticsearchRepository<Student,Long> {
}
TEST
package com.sjy.elasticsearch;
import com.sjy.elasticsearch.config.RestService;
import com.sjy.elasticsearch.entity.LogMessage;
import com.sjy.elasticsearch.entity.Student;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.SearchScrollHits;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.UUID;
@SpringBootTest
class ElasticsearchApplicationTests {
@Autowired
private StudentDao studentDao;
@Test
void saveAll() {
for (int j = 1; j <= 100; j++) {
ArrayList<Student> students = new ArrayList<>();
for (int i = 1; i <= 500; i++) {
Student student = new Student(j * (i + 1), "student name" + j * (i + 1), j * (i + 1), j * (i + 1) + "", "I don't know");
students.add(student);
}
long start = System.currentTimeMillis();
studentDao.saveAll(students);
long end = System.currentTimeMillis();
System.out.println(j + " : " + (end - start));
}
}
@Autowired
RestService restService;
@Test
void indexExists() {
boolean student = restService.indexExists("student");
boolean student22 = restService.indexExists("student11");
boolean student33 = restService.indexExists("student22");
System.out.println("student22 = " + student22);
System.out.println("student33 = " + student33);
System.out.println("student = " + student);
}
@Test
void test2() {
}
@Test
void putTemplate() {
System.out.println(restService.putTemplate("log-template", "{\"index_patterns\":\"log-message-*\",\"order\":1,\"settings\":{\"number_of_shards\":2,\"number_of_replicas\":0,\"max_result_window\":10000000},\"mappings\":{\"properties\":{\"id\":{\"index\":true,\"type\":\"keyword\"},\"level\":{\"index\":true,\"type\":\"keyword\"},\"createTime\":{\"index\":true,\"type\":\"long\"},\"msg\":{\"index\":true,\"type\":\"text\"}}}}"));
}
@Test
void deleteIndex() {
System.out.println(restService.deleteIndex(LogMessage.class, "dev", LocalDate.now().toString()));
}
@Test
void createIndex() {
System.out.println(restService.createIndex(LogMessage.class, "dev", LocalDate.now().toString()));
}
@Test
void closeIndex() {
System.out.println(restService.closeIndex(LogMessage.class, "dev", LocalDate.now().toString()));
}
@Test
void openIndex() {
System.out.println(restService.openIndex(LogMessage.class, "dev", LocalDate.now().toString()));
}
@Test
void index() {
LogMessage logMessage = new LogMessage();
logMessage.setId(UUID.randomUUID().toString());
logMessage.setCreateTime(System.currentTimeMillis());
logMessage.setMsg("elasticsearch yyds");
logMessage.setLevel("INFO");
System.out.println(restService.index(LogMessage.class, logMessage, "dev", LocalDate.now().toString()));
}
@Test
void bulkIndex() {
for (int j = 0; j < 100; j++) {
ArrayList<LogMessage> logMessages = new ArrayList<>();
for (int i = 0; i < 500; i++) {
LogMessage logMessage = new LogMessage();
logMessage.setId(UUID.randomUUID().toString());
logMessage.setCreateTime(System.currentTimeMillis());
logMessage.setMsg("elasticsearch yyds " + i + "-" + j);
logMessage.setLevel("INFO");
logMessages.add(logMessage);
}
restService.bulkIndex(logMessages, LogMessage.class, "dev", LocalDate.now().toString());
}
}
@Test
void scroll() {
QueryBuilder age = QueryBuilders.matchQuery("msg", "elasticsearch yyds 0");
RangeQueryBuilder createTime = QueryBuilders.rangeQuery("createTime").gte(1637754072221L).lte(1637754072221L);
int pageSize = 100;
Query query = new NativeSearchQueryBuilder().withQuery(age).withQuery(createTime).withPageable(Pageable.ofSize(pageSize)).build();
String dateStr = LocalDate.now().toString();
SearchScrollHits<?> searchHits = restService.scrollFirst(query, LogMessage.class,"dev", dateStr);
long totalHits = searchHits.getSearchHits().size();
System.out.println(totalHits);
String scrollId = searchHits.getScrollId();
while (totalHits == pageSize) {
SearchScrollHits<?> scroll = restService.scroll(scrollId, LogMessage.class,"dev", dateStr);
scrollId = scroll.getScrollId();
totalHits = scroll.getSearchHits().size();
System.out.println(totalHits);
}
System.out.println("down");
}
@Test
void search() {
QueryBuilder age = QueryBuilders.matchQuery("msg", "elasticsearch yyds 0");
RangeQueryBuilder createTime = QueryBuilders.rangeQuery("createTime").gte(1637754072221L);
int pageSize = 10000;
PageRequest pageRequest = PageRequest.of(0, pageSize, Sort.Direction.ASC, "createTime");
Query query = new NativeSearchQueryBuilder().withQuery(age).withQuery(createTime).withPageable(pageRequest).build();
SearchHits<?> search = restService.search(query, LogMessage.class,"dev", LocalDate.now().toString());
System.out.println(search.getSearchHits());
}
}
参考
https://docs.spring.io/spring-data/elasticsearch/docs/4.3.0/reference/html/#
|