一:近实时搜索原理
先认识几个基本概念:
1、segment
es基本存储单元是shard,index分散在多个shard上。 而每个shard由多个段-segment组成,每次创建一个新Document(一条新数据),就会归属于一个新的segment。 删除数据时,也不会直接删除当前segment,只是标记为已删除状态,后续在合适时机删除。
2、translog
操作日志,用来记录操作动作,防止数据丢失。 每个shard中对应一个translog文件。
3、commit
提交,意味着将多个segment,合并成新的更大的segment,并刷入磁盘。
4、refresh
es索引数据时,先是写入到内存buffer中,默认1s执行一次refresh操作,刷新到一个新的segment中,在segment中数据才具备被检索的结构,才能被查询。当写入segment后,会清空内存buffer。 所以近实时搜索通常指的是: 写入数据1s后才能被检索。
当然,可以改变默认时长(时长为-1代表关闭刷新):
PUT /mytest/_settings? { ? "refresh_interval": "20s" }
或者直接调用refresh的api:
POST /_refresh? ? 刷新所有索引
POST /mytest/_refresh? ? ?刷新某个索引
PUT /mytest/_doc/1?refresh=true? ? ? //刷新具体文档数据 { ? "test": "test" }
5、flush
数据清洗,将内存缓冲区、segments中、translog等全部刷盘,成功后清空原数据和translog。
默认每30分钟执行一次,或者translog变大超过设定值后触发。
commit需要一个fsync同步操作来保证数据物理的被成功刷盘,假如每一个写操作都这样,那么性能会大大下降。 es在内存buffer与磁盘之间,引入了文件系统缓存。 refresh将数据刷到新的segment,这些segment其实是先存在于文件系统缓存,后续再刷盘。
整体流程:
?当es收到写请求后,数据暂时写入内存buffer,并添加translog。默认1s后,refresh数据到file system cache,并清空内存buffer。 30分钟后,执行flush刷数据到磁盘(tanslog大小超过设定阈值也会执行flush)。
分片默认会30分钟执行一次flush,也可手动调用api:
POST /mytest/_flush? ? ? ? 刷新某一个索引
POST /_flush?wait_for_ongoin? ? ? ? ?刷新所有索引直到成功后返回
(手动调用flush情况很少,不过要关闭索引或者重启节点时,最好执行一下。因为es恢复索引或者重新打开索引时,它必须要先把translog里面的所有操作给恢复,所以也就是说translog越小,recovery恢复操作就越快)
上面说了数据的流程,现在看看translog是如何工作的?
当数据被refresh期间,新的操作日志会继续追加到translog,默认每次写请求(如 index, delete, update, bulk)都会刷盘。?这样会有很大的性能问题,所以如果能容忍5s内的数据丢失情况,还是使用每5s异步刷盘的方式。
配置如下:
PUT /mytest/_settings? { ? "index.translog.durability": "async", ? "index.translog.sync_interval": "5s" }
要保证完全可靠,还是使用默认配置:
PUT /mytest/_settings? { ? "index.translog.durability": "request" }
流程图:?
二: 段合并机制?
?在索引数据过程中,每一次的refresh都会创建新的segment,数量会越来越多,影响内存和CPU运行,查询也会在多个段中,影响性能。
所以,es会使用一定的策略,将segment不断的合并为更大的segment,最终被flush刷新到磁盘。
当然,合并会消耗大量IO和CPU,所以要对执行归并任务的线程作限速控制,默认是20MB,如果磁盘转速高,或者SSD等,可以适当调高:
PUT /_cluster/settings? {? ? "persistent" :? ? {? ? ? "indices.store.throttle.max_bytes_per_sec" : "100mb" ? }? }
线程数也可以调整,比如为CPU核心数的一半:
index.merge.scheduler.max_thread_count
下面来看看合并是依据什么策略来执行的:
主要有以下几条:
index.merge.policy.floor_segment? ??
默认
2MB
,小于这个大小的
segment
,优先被归并。
index.merge.policy.max_merge_at_once? ? ?
默认一次最多归并
10
个
segment
index.merge.policy.max_merge_at_once_explicit? ? ??
默认
optimize
时一次最多归并
30
个
segment
。
index.merge.policy.max_merged_segment? ? ?
默认
5 GB
,大于这个大小的
segment
,不用参与归 并,
optimize
除外。
?
?optimize api:
optimize代表手动强制执行合并,它可以通过参数max_num_segments指定,把某个index在每个分片上的segments最终合并为几个(最小是1个),比如日志是按天创建索引存储,可以合并为一个segment,查询就会很快。
POST /logstash-2022-10/_optimize?max_num_segments=1
三:数据的写一致性如何保证
wait_for_active_shards:
在写数据时,可以通过参数wait_for_active_shards,指定多少个分片的数据都成功写入,才算成功。 默认是1,代表主分片成功即可(一条数据肯定只存在于一个主分片)。? 最大值是 1+number_of_replicas,代表主分片和所有副本分片都成功。
timeout:
在多少时间内没有成功,就返回失败。 结合上面
,只要在timeout时间内,
wait_for_active_shards数满足都能成功(比如超时时间内,shard从不可用到可用,最终也会成功)。
?wait_for_active_shards可在索引的setting属性中全局设置,也可对某个document设置:
put /mytest/_doc/1?wait_for_active_shards=2&timeout=10s
{
"name"
:
"xiao mi"
}
|