IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Elasticsearch基础和原理 -> 正文阅读

[大数据]Elasticsearch基础和原理

安装和运行

本地环境使用docker部署(减少踩坑,参考官网

拉取官方镜像(采用7.15.2版本)
docker pull docker.elastic.co/elasticsearch/elasticsearch:7.15.2

单节点部署

docker run -p 127.0.0.1:9200:9200 -p 127.0.0.1:9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.15.2

基本概念

和Mysql概念对比

Mysql数据库 Database表 Table行 Row列 Column
Elasticsearch索引 index类型 Type文档 Document字段 Fields

在Elasticsearch中:

  1. 一个index就表示一个数据库,是数据存储的地方
  2. 每个index可以拥有多个类型type, 表示一个数据表
  3. 每个类型包含多个文档document,表示一行数据
  4. 每行数据有多个字段field,表示多列

集群的概念

  1. 节点Node: 一个Elasticsearch实例,即为一个节点
  2. 集群Cluster: 一个或者多个节点共同协作,即组成一个集群
    • 集群中所有的节点具有相同的cluster.name
    • 集群中的一个节点会被选举为主节点master,临时管理集群级别的变更,如新增或删除节点,新建或删除索引等
    • 集群中每个节点都知道文档存在于哪个节点上,每个节点都可以转发请求到真正存储数据的节点上
      • 作为用户,我们可以访问任意节点(称作请求节点),请求节点负责收集各节点返回的数据,并聚合、处理后返回客户端

集群状态

使用/_cluster/health接口,可以获取当前集群的状态信息:

请求:
curl -XGET 'http://localhost:9200/_cluster/health?pretty'

响应:
{
  "cluster_name" : "docker-cluster", // 集群名
  "status" : "green", // 集群状态: 
  						1. grean, 所有主要分片(primary shard)和复制分片(replica shard)都可用
  						2. yellow, 所有主要分片可用,部分复制分片不可用
  						3. red, 部分主要分片不可用
  "timed_out" : false,
  "number_of_nodes" : 1,  // 集群节点数量;我本地只起了一个节点
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 1,
  "active_shards" : 1,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}

主分片和复制分片

  1. 分片(shard):索引index仅仅是一个逻辑命名空间,一个索引的数据被分配到多个分片(shards)中存储,每个分片存储该索引的一部分数据;分片是Elasticsearch在集群中分发数据的关键
  2. 主分片(primary shard): 一个索引的主分片数量是一定的(索引创建时指定,重建索引可修改)。索引根据主分片数量划分所有数据,每个文档隶属于一个主分片
  3. 复制分片(replica shard): 复制分片是主分片的副本,复制分片可以负载均衡主分片的读流量,同时也提供主分片的故障转移能力

假设现在集群中有3个节点,我们创建一个索引并指定其有3个主分片,2个复制分片

请求:
curl -XPUT -H "Content-Type:application/json" 'http://localhost:9200/blogs/' -d '{"settings": {"number_of_shards":3,"number_of_replicas":2}}'

其分片的分布可能如下:
在这里插入图片描述

其中:

  1. P: 表示主分片;P0/1/2表示3个主分片
  2. R: 表示复制分片;R0-0表示第1个主分片的第一个复制分片;

文档

什么是文档?简单讲,Elasticsearch是使用json作为文档序列化方式存储的,一个文档,可以理解成一个json对象(object)

文档的元数据

每一个文档除了包含业务定义的字段数据之外,还有三个必须的元数据metadata:

  1. _index: 文档存储的地方(索引名)
  2. _type: 文档代表的对象的类(类型名)
  3. _id: 文档的位移标识

我随手从es上查询一条日志,查看其Json内容如下,可以看到这个文档的上述3个元数据信息,同时,还有其他元数据,如版本_veriosn等,fields即为日志业务定义的字段内容,包括时间戳、日志内容,日志等级等等等

{
  "_index": "uat:billions-main.archive.aegis-admin-@-w-2021.47-uat01-0-0",
  "_type": "_doc",
  "_id": "3Ww9YH0Bh_7MgDoxV17G",
  "_version": 1,
  "_score": null,
  "fields": {
    "@timestamp": [
      "2021-11-27T07:12:44.808Z"
    ],
    "log": [
      "SearchESSQL Get failed, resp.code(-400) resp.message(business name (aegis_resource_archive) not found) ....."
    ],
    "zone": [
          "sh007"
    ],
    "level": [
      "ERROR"
    ],
    ...
}

文档的增删改查

这篇笔记更多关注Elasticsearch基础和原理,对于查询和更新的使用暂不过分关注,不管是restful风格的简单http请求,还是比较繁琐的DSL,后面可以作为Elasticsearch的应用方式独立探索,尤其DSL,至今感觉不太会用。。。

版本冲突

前不久,线上日志刚好看到偶发的版本冲突报错:
[base][379393076]: version conflict, current version [7] is different than the one provided [6]
这个问题就和这一部分的内容息息相关了

数据有版本,原因在多进程(线程)并发变更同一数据时,由于获取数据和修改数据存在时间差,会导致结果异常;解决并发变更的常用方式是加悲观锁,比如MySQL的锁机制,另一种方式则是加乐观锁。

悲观锁:在修改一行数据期间,假设它会被其他人访问并修改,于是加上悲观锁,在此期间其他线程对该数据的变更将阻塞
乐观锁:在修改一行数据期间,假设它不会被其他人访问并修改,于是通过版本号实现乐观锁控制,如果更新数据时发现它的版本被更新,则说明改行数据已被修改,需要重新获取最新数据、计算、修改,或者直接返回错误给调用者

Elasticsearch的乐观并发控制

上面在讨论文档的元数据时,提到文档的另一个元数据属性_version,这个属性即表明当前文档的版本,其初始值是1,每次被修改后加1;如果我们更新时提交的版本号(查询时获取)小于当前文档的版本,则会报错409 Conflict

如以下示例:

新建一个blogs: 
curl -XPUT -H "Content-Type:application/json" 'http://localhost:9200/blogs/doc/2' -d '{"title": "doc-1", "text": "12345"}'

当前信息:
?  local curl -XGET 'http://localhost:9200/blogs/doc/2?pretty'
{
  "_index" : "blogs",
  "_type" : "doc",
  "_id" : "2",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "title" : "doc-1",
    "text" : "12345"
  }
}

修改一次:
curl -XPUT -H "Content-Type:application/json" 'http://localhost:9200/blogs/doc/2' -d '{"title": "doc-1", "text": "1234444"}'

当前信息:(版本号为2)
?  local curl -XGET 'http://localhost:9200/blogs/doc/2?pretty'
{
  "_index" : "blogs",
  "_type" : "doc",
  "_id" : "2",
  "_version" : 2,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "title" : "doc-1",
    "text" : "1234444"
  }
}

基于版本1提交:
curl -XPUT -H "Content-Type:application/json" 'http://localhost:9200/blogs/doc/2?if_seq_no=2&if_primary_term=1' -d '{"title": "doc-1", "text": "123444455"}'

报错:
{"error":{"root_cause":[{"type":"version_conflict_engine_exception","reason":"[2]: version conflict, required seqNo [2], primary term [1]. current document has seqNo [1] and primary term [1]","index_uuid":"5BTLSFMDSD69VQJbiVJyuA","shard":"1","index":"blogs"}],"type":"version_conflict_engine_exception","reason":"[2]: version conflict, required seqNo [2], primary term [1]. current document has seqNo [1] and primary term [1]","index_uuid":"5BTLSFMDSD69VQJbiVJyuA","shard":"1","index":"blogs"},"status":409}

分片之间的数据同步

路由

Elasticsearch在接受一个请求时,需要知道对应的文档在哪个分片上;Elasticsearch计算文档所在分片所用的字段即为路由(router),默认为_id,其计算分片值的简单算法如下:

shard = hash(router) % number_of_primary_shards

主分片和复制分片的交互

上面在集群的阐述中,我们说明了一个前提:集群中任一节点都可以接受请求,并转发请求,并最终收集数据返回给用户;

假设我们现在有一个3节点(Node1, Node2, Node3)的集群,如上图。

文档新建、索引、删除

和MySQL一样,文档的写操作都必须在主节点上执行

Elasticsearch文档写操作的执行顺序如下:

  1. 客户端给节点Node1发送写请求
  2. 节点Node1使用文档的路由键(默认_id)计算出其所在分片为P1
  3. Node1知道P1在Node2节点上,于是将请求转发到Node2
  4. Node2在主分片上执行请求,成功后会将请求转发到其他节点上的复制分片同步结果
  5. 当所有的复制分片也成功执行,Node2将结果返回请求节点Node1
    • 这一步不是必须的,和kafka的发送确认机制一样,可以通过参数配置是否要确认所有副本更新成功,可参考kafka专题笔记 - 生产者
  6. 请求节点Node1返回客户端结果

文档检索

这里先看单文档检索。其过程大致如下:

  1. 客户端给任一节点Node1发送Get请求
  2. 节点Node1通过路由键(默认_id)计算它在分片1上
  3. 节点Node1将请求转发到分片1的第二个复制分片R1-1所在节点Node3
  4. 节点Node3查询数据后,返回给节点Node1,请求节点Node1再返回给客户端

对于多文档的检索,请求节点会计算哪些请求应该转发到哪些节点上,然后转发请求 => 获取各节点结果并处理 => 返回客户端

以上是指定一个文档或者多个文档的查询,那如果我想要一个范围查询,那是怎样的流程呢??比如,想获取根据timestamp字段排序的前1000个文档

范围(分页)查询

当我们需要查询某个范围数据的时候,由于没有指定具体的路由键(即不知道具体在哪个索引),这时就需要广播所有的节点,在每个节点上都执行请求;请求节点拿到所有节点的返回数据后,还需要做特定的处理,如重新排序,每个节点的排序只能基于自己节点的数据,无法表示全局;所以,在类似范围查询或者分页查询的时候,需要查询和取回两个阶段(query and fetch),以此查询为例:

GET /_search
{
	"from": 90,
	"size": 10
}

获取所有数据的第90-100条数据(再明确一个具体的场景,根据创建时间排序)

查询阶段

查询阶段,请求节点向每个分片副本所在的节点广播请求;每个分片在本地执行搜索并建立本地优先队列

一个优先队列(priority queue)只能存储前n个匹配文档的有序列表,这个优先队列由参数from、size决定;比如from = 90, size = 10时,优先队列的大小为100;注意,这里虽然最后只取10个,但是优先级队列大小得是100;

同理,如果from = 100000, size = 10, 优先级队列大小为100010,这也是为什么不能深度分页的原因

查询阶段三个步骤:

  1. 客户端请求到某一节点Node1Node1创建大小为from + size的空优先级队列
  2. Node1转发请求到索引中每个分片的原本或者副本所在的节点,每个节点建立本地优先级队列,并把本地查询结果放到优先级队列里,产生局部排序结果
    • 注意,如果Node1也有对应分片,它还要再建一个本地优先级队列
  3. 每个分片返回本地优先级队列里的文档ID和排序字段给协调节点Node1Node1把这些值合并到自己的优先级队列里产生Node1,产生全局排序结果
取回阶段

查询阶段拿到的全局排序中,只获取了必要的文档ID和排序字段信息,需要再次填充文档数据。

取回阶段三个步骤:

  1. 协调节点Node1辨别出那个文档来自哪个分片,并向该分片发起GET请求
  2. 每个分片加载文档信息并根据需要丰富字段信息,并将结果返回给协调节点
  3. 协调节点拿到所有数据后,返回客户端
深度分页

上面说到,每个分页请求需要在协调节点和每个分片节点建立优先队列,且队列大小为from + size,那么当from很大很大时,这个本地队列的大小就会很大,消耗大量CPU(优先级队列的数据处理)、内存(优先级队列)、带宽(节点之间数据传递)等等

报错指南

1、提交请求体时,接口响应406,不支持application/x-www-form-urlencoded

请求:
curl -XGET 'http://localhost:9200/_count?pretty' -d '{"query":{"match_all":{}}}'

响应:
{
  "error" : "Content-Type header [application/x-www-form-urlencoded] is not supported",
  "status" : 406
}

解决: 请求中指定提交数据类型
curl -XGET -H "Content-Type:application/json" 'http://localhost:9200/_count?pretty' -d '{"query":{"match_all":{}}}'

原因:
es6.0之后官方部分调整
  1. 更改数据时指定version=1,报错400, "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use if_seq_no and if_primary_term instead;
请求: 
curl -XPUT -H "Content-Type:application/json" 'http://localhost:9200/blogs/doc/2?version=1' -d '{"title": "doc-1", "text": "123444455"}'

响应:
{"error":{"root_cause":[{"type":"action_request_validation_exception","reason":"Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"}],"type":"action_request_validation_exception","reason":"Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"},"status":400}

解决:使用if_seq_no=2&if_primary_term=1 代替version = 1
curl -XPUT -H "Content-Type:application/json" 'http://localhost:9200/blogs/doc/2?if_seq_no=2&if_primary_term=1' -d '{"title": "doc-1", "text": "123444455"}'

原因:
版本问题。Elasticsearch在6.7版本后不支持version=1的参数指定方式,我本地测试使用7.15.2
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-28 11:22:04  更:2021-11-28 11:23:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 14:03:25-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码