ElasticSearch核心概念
(1)索引
一个索引就是一个拥有几分相似特征的文档的集合。比如说,你可以有一个客户数据的索引,另一个产品目录的索引,还有一个订单数据的索引。一个索引由一个名字来标识(必须全部是小写字母的),并且当我们要对对应于这个索引中的文档进行索引、搜索、更新和删除的时候,都要使用到这个名字。在一个集群中,可以定义任意多的索引。
(2)类型type
在一个索引中,你可以定义一种或多种类型。一个类型是你的索引的一个逻辑上的分类/分区,其语义完全由你来定。通常,会为具有一组共同字段的文档定义一个类型。比如说,我们假设你运营一个博客平台并且将你所有的数据存储到一个索引中。在这个索引中,你可以为用户数据定义一个类型,为博客数据定义另一个类型,当然,也可以为评论数据定义另一个类型。
(3)文档document
一个文档是一个可被索引的基础信息单元。比如,你可以拥有某一个客户的文档,某一个产品的一个文档,当然,也可以拥有某个订单的一个文档。文档以JSON(Javascript Object Notation)格式来表示,而JSON是一个到处存在的互联网数据交互格式。 在一个index/type里面,你可以存储任意多的文档。注意,尽管一个文档,物理上存在于一个索引之中,文档必须被索引/赋予一个索引的type。
(4)字段 Field(域)
相当于是数据表的字段,对文档数据根据不同属性进行的分类标识
(5)映射 mapping
mapping是处理数据的方式和规则方面做一些限制,如某个字段的数据类型、默认值、分词器、是否被索引等等,这些都是映射里面可以设置的,其它就是处理es里面数据的一些使用规则设置也叫做映射,按着最优规则处理数据对性能提高很大,因此才需要建立映射,并且需要思考如何建立映射才能对性能更好。
(6)接近实时 NRT
Elasticsearch是一个接近实时的搜索平台。这意味着,从索引一个文档直到这个文档能够被搜索到有一个轻微的延迟(通常是1秒以内)
(7)集群 cluster??
一个集群就是由一个或多个节点组织在一起,它们共同持有整个的数据,并一起提供索引和搜索功能。一个集群由一个唯一的名字标识,这个名字默认就是“elasticsearch”。这个名字是重要的,因为一个节点只能通过指定某个集群的名字,来加入这个集群
(8)节点 node
一个节点是集群中的一个服务器,作为集群的一部分,它存储数据,参与集群的索引和搜索功能。和集群类似,一个节点也是由一个名字来标识的,默认情况下,这个名字是一个随机的角色的名字,这个名字会在启动的时候赋予节点。这个名字对于管理工作来说挺重要的,因为在这个管理过程中,你会去确定网络中的哪些服务器对应于Elasticsearch集群中的哪些节点。 一个节点可以通过配置集群名称的方式来加入一个指定的集群。默认情况下,每个节点都会被安排加入到一个叫做“elasticsearch”的集群中,这意味着,如果你在你的网络中启动了若干个节点,并假定它们能够相互发现彼此,它们将会自动地形成并加入到一个叫做“elasticsearch”的集群中。 在一个集群里,只要你想,可以拥有任意多个节点。而且,如果当前你的网络中没有运行任何Elasticsearch节点,这时启动一个节点,会默认创建并加入一个叫做“elasticsearch”的集群。
(9)分片和复制 shards&replicas
一个索引可以存储超出单个节点硬件限制的大量数据。比如,一个具有10亿文档的索引占据1TB的磁盘空间,而任一节点都没有这样大的磁盘空间;或者单个节点处理搜索请求,响应太慢。为了解决这个问题,Elasticsearch提供了将索引划分成多份的能力,这些份就叫做分片。当你创建一个索引的时候,你可以指定你想要的分片的数量。每个分片本身也是一个功能完善并且独立的“索引”,这个“索引”可以被放置到集群中的任何节点上。分片很重要,主要有两方面的原因: 1)允许你水平分割/扩展你的内容容量。 2)允许你在分片(潜在地,位于多个节点上)之上进行分布式的、并行的操作,进而提高性能/吞吐量。 至于一个分片怎样分布,它的文档怎样聚合回搜索请求,是完全由Elasticsearch管理的,对于作为用户的你来说,这些都是透明的。 在一个网络/云的环境里,失败随时都可能发生,在某个分片/节点不知怎么的就处于离线状态,或者由于任何原因消失了,这种情况下,有一个故障转移机制是非常有用并且是强烈推荐的。为此目的,Elasticsearch允许你创建分片的一份或多份拷贝,这些拷贝叫做复制分片,或者直接叫复制。 复制之所以重要,有两个主要原因: 在分片/节点失败的情况下,提供了高可用性。因为这个原因,注意到复制分片从不与原/主要(original/primary)分片置于同一节点上是非常重要的。扩展你的搜索量/吞吐量,因为搜索可以在所有的复制上并行运行。总之,每个索引可以被分成多个分片。一个索引也可以被复制0次(意思是没有复制)或多次。一旦复制了,每个索引就有了主分片(作为复制源的原来的分片)和复制分片(主分片的拷贝)之别。分片和复制的数量可以在索引创建的时候指定。在索引创建之后,你可以在任何时候动态地改变复制的数量,但是你事后不能改变分片的数量。 默认情况下,Elasticsearch中的每个索引被分片5个主分片和1个复制,这意味着,如果你的集群中至少有两个节点,你的索引将会有5个主分片和另外5个复制分片(1个完全拷贝),这样的话每个索引总共就有10个分片。
?
使用RestHighLevelClient操作ES
(1)导入maven依赖
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>7.6.2</elasticsearch.version>
</properties>
<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
(2)配置RestHighLevelClient
@Configuration
public class ElasticSearchClientConfig {
@Bean
public RestHighLevelClient restHighLevelClient() {
return new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));
}
}
(3)创建实体类User.java
@Data
@AllArgsConstructor
@NoArgsConstructor
@Component
public class User {
private String name;
private int age;
}
(4)基本CURD操作
package com.zhuang;
import com.google.gson.Gson;
import com.zhuang.pojo.User;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@SpringBootTest
class ElasticsearchDemo02ApplicationTests {
@Autowired
@Qualifier("restHighLevelClient")
private RestHighLevelClient client;
/**
* 创建索引
*/
@Test
public void testCreateIndex() throws IOException {
//创建索引请求
CreateIndexRequest request = new CreateIndexRequest("zhuang_index");
//设置分片和复制
request.settings(Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 0));
//执行创建请求 获取响应
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
System.out.println(response);
}
/**
* 获取索引
*/
@Test
public void testIndexExists() throws IOException {
//创建获取索引请求
GetIndexRequest request = new GetIndexRequest("zhuang_index");
//执行请求,获取结果
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
System.out.println("zhuang_index的索引是否存在: " + exists);
}
/**
* 删除索引
*/
@Test
public void deleteIndex() throws IOException {
//创建删除索引请求
DeleteIndexRequest request = new DeleteIndexRequest("zhuang_index");
//执行请求
AcknowledgedResponse acknowledgedResponse = client.indices().delete(request, RequestOptions.DEFAULT);
System.out.println(acknowledgedResponse.isAcknowledged());
}
/**
* 添加文档
*/
@Test
public void testAddDocument() throws IOException {
User user = new User("唐小壮", 3);
//创建请求
IndexRequest request = new IndexRequest("zhuang_index");
//规则 put zhuang_index/_doc/1
request.id("1").timeout(TimeValue.timeValueSeconds(1));
//将数据放入请求
Gson gson = new Gson();
request.source(gson.toJson(user), XContentType.JSON);
//发送请求
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
System.out.println(response.toString());
//对应命令返回的状态
System.out.println(response.status());
}
/**
* 判断文档是否存在
*/
@Test
public void testDocumentExists() throws IOException {
//get /zhuang_index/_doc/1
GetRequest request = new GetRequest("zhuang_index", "1");
//不获取返回的 _source 的上下文数据
request.fetchSourceContext(new FetchSourceContext(false));
request.storedFields("_none_");
boolean exists = client.exists(request, RequestOptions.DEFAULT);
System.out.println(exists);
}
/**
* 获取文档的信息
*/
@Test
public void testGetDocument() throws IOException {
GetRequest request = new GetRequest("zhuang_index", "1");
GetResponse response = client.get(request, RequestOptions.DEFAULT);
//获取source内容
System.out.println(response.getSourceAsString());
System.out.println(response);
}
/**
* 更新文档信息
*/
@Test
public void testUpdateDocument() throws IOException {
UpdateRequest updateRequest = new UpdateRequest("zhuang_index", "1");
//设置超时时间
updateRequest.timeout("1s");
User user = new User("唐小壮Java", 18);
//设置需要更新的内容
updateRequest.doc(new Gson().toJson(user), XContentType.JSON);
UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
System.out.println(response.toString());
}
/**
* 删除文档
*/
@Test
public void testDeleteDocument() throws IOException {
DeleteRequest deleteRequest = new DeleteRequest("zhuang_index", "1");
deleteRequest.timeout("1s");
//删除文档
DeleteResponse response = client.delete(deleteRequest, RequestOptions.DEFAULT);
System.out.println(response.status());
}
/**
* 批量插入数据
*/
@Test
public void testBulkRequest() throws IOException {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout("10s");
List<User> users = new ArrayList<User>() {
private static final long serialVersionUID = -5642486388847127074L;
{
add(new User("xiaozhuang1", 3));
add(new User("xiaozhuang2", 3));
add(new User("xiaozhuang3", 3));
add(new User("zhangsan1", 3));
add(new User("zhangsan2", 3));
add(new User("zhangsan3", 3));
}
};
for (int i = 0; i < users.size(); i++) {
bulkRequest.add(new IndexRequest("zhuang_index").id(i + 1 + "").source(new Gson().toJson(users.get(i)), XContentType.JSON));
}
//执行批处理请求
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
//是否执行失败
System.out.println(response.hasFailures());
}
/**
* 查询
*/
@Test
public void testQuery() throws IOException {
SearchRequest searchRequest = new SearchRequest("zhuang_index");
//构建搜索条件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
/**
* 使用QueryBuilders进行快速匹配
* matchAllQuery: 匹配所有
*/
MatchAllQueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
sourceBuilder.query(queryBuilder);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
//放入请求
searchRequest.source(sourceBuilder);
//执行请求
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
//从hit中获取数据
System.out.println(new Gson().toJson(response));
System.out.println("====================");
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsMap());
}
}
}
|