本文简单记录下 springboot 集成 elasticsearch 7.x 的方法
引入maven依赖
这里放弃了使用spring-boot-starter-data-elasticsearch(这个可能会受 springboot 版本和 elasticsearch 版本影响),而是直接使用rest-high-level-client。
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.9.3</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.9.3</version>
</dependency>
将RestHighLevelClient纳入spring容器管理
@Configuration
public class ElasticSearchClientConfig {
@Bean
public RestHighLevelClient restHighLevelClient() {
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("127.0.0.1", 9200, "http")));
return client;
}
}
之后,在其他地方就能直接使用了:
@Autowired
@Qualifier("restHighLevelClient")
private RestHighLevelClient client;
文档操作
根据ID查询文档
使用 GetRequest
@Test
public void testGetById() throws IOException {
GetRequest getRequest = new GetRequest("my-index", "1");
GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
log.info("result exists : " + response.isExists());
log.info("response : " + response.getSourceAsString()); // 格式是json字符串
}
创建文档
使用 IndexRequest
@Test
public void testCreateDocument() throws IOException {
DataEntity entity = new DataEntity();
// 设置entity属性
......
IndexRequest indexRequest = new IndexRequest("my-index").id(entity.getId());
indexRequest.source(JSON.toJSONString(entity), XContentType.JSON);
IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
log.info("id : " + response.getId());
log.info("operate result : " + response.getResult());
}
批量创建文档
比如将数据库中的数据导入es中时,可以使用批量创建
通过 BulkRequest 来操作
@Test
public void testBulkCreateDocument() throws IOException {
// 手动初始化,或者从数据库中查询
List<DataEntity> list = ......
BulkRequest bulkRequest = new BulkRequest();
list.forEach(entity -> {
EsDataEntity esDataEntity = new EsDataEntity();
BeanUtils.copyProperties(entity, esDataEntity);
// 这里设置id为业务的id。也可以不设置,由es自动创建
IndexRequest indexRequest = new IndexRequest("my-index").id(esDataEntity.getId());
// 创建时需要传入json字符串
indexRequest.source(JSON.toJSONString(esDataManage), XContentType.JSON);
// 将IndexRequest添加到BulkRequest
bulkRequest.add(indexRequest);
});
// 执行批量插入
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
log.info("BulkResponse : " + JSON.toJSONString(response));
}
根据ID更新文档
使用 UpdateRequest
@Test
public void testUpdateDocument() throws IOException {
GetRequest getRequest = new GetRequest("my-index", "1");
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
if (!getResponse.isExists()) {
log.error("id:[1]不存在!");
return;
}
// 将json转为java对象
EsDataEntity esDataEntity = JSON.parseObject(getResponse.getSourceAsString(), EsDataEntity.class);
// 随便修改个字段值
esDataEntity.setRemark("Vehicle");
// 构造函数中传入索引和文档id
UpdateRequest updateRequest = new UpdateRequest(getResponse.getIndex(), getResponse.getId());
// java对象转为json
updateRequest.doc(JSON.toJSONString(esDataEntity), XContentType.JSON);
// 执行更新
UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
log.info("id : " + response.getId());
log.info("operate result : " + response.getResult());
}
根据ID删除文档
使用 DeleteRequest
@Test
public void testDeleteDocument() throws IOException {
DeleteRequest deleteRequest = new DeleteRequest("my-index", "1");
DeleteResponse response = client.delete(deleteRequest, RequestOptions.DEFAULT);
log.info("id : " + response.getId());
log.info("operate result : " + response.getResult());
}
查询
查询就太多了,随便贴一个作为记录:
@Test
void query() {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// bool查询
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.matchQuery("labels.container_name", "train-py-test"));
boolQueryBuilder.must(QueryBuilders.matchQuery("name", "container_memory_rss"));
boolQueryBuilder.must(QueryBuilders.rangeQuery("@timestamp")
.gte("2022-03-31 07:59:00")
.lte("2022-03-31 08:59:00")
.format("yyyy-MM-dd HH:mm:ss"));
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.from(0); // 从第几条开始
sourceBuilder.size(120); // 需要获取的数量
sourceBuilder.sort("@timestamp", SortOrder.DESC); // 按时间倒序排序
SearchRequest request = new SearchRequest();
request.indices("my-index"); // 索引
request.source(sourceBuilder);
try {
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 业务对象集合
List<Metrics> metricsList = new ArrayList<>();
for (SearchHit hit : response.getHits().getHits()) {
// 返回的结果是一个json字符串
String json = hit.getSourceAsString();
// 转为JSONObject对象
JSONObject jsonObject = JSONObject.parseObject(json);
// json转为需要的java对象
Metrics metrics = JSON.parseObject(json, Metrics.class);
metrics.setTimestamp(DateUtil.parseUTC((String)jsonObject.get("@timestamp")));
System.out.println(JSON.toJSONString(metrics));
metricsList.add(metrics);
}
} catch (IOException e) {
e.printStackTrace();
}
}
|