? 我有一个大胆的想法,假如我们获取数据的需求并不需要对数据排序。我觉得我们完全可以按序从每个分片上依次获取数据。这样我们可以避免一个问题,数据不用在每个分片查询然后再在协调节点上排序。假如我们有200个分片,想要获取全部数据,那就先从第一个分片上获取全部的数据,然后再从第二个分片上滚动获取数据,直到循环获取到第200个分片上的数据。这将避免了一些IO,避免了无用的网络传输,避免了协调节点汇总数据做出的资源浪费操作。因为我们本身并只是想要快速的获取到全部的数据,也不关注数据是否排序了!?
原文地址:
Elasticsearch之SearchScroll原理剖析和性能及稳定性优化 - 知乎Elasticsearch是一款优秀的开源企业级搜索引擎,其查询接口主要为Search接口,提供了丰富的各类查询、排序、统计聚合等功能。本文将要介绍的是另一个查询接口SearchScroll,同时介绍一下我们在这方面做的一些性能…https://zhuanlan.zhihu.com/p/231790621
查询剪枝
SearchScroll多并发场景下,请求刚到协调节点上,会查询出每个shard在哪些节点上,然后将请求转发到这些节点上。当查询请求到达data节点上,根据slice参数重写query时候,会判断该shard应不应该被当前slice进行查询。主要判断逻辑本文上述章节已经介绍。如果该slice不应该查询本shard,则直接返回一个MatchNoDocsQuery这样的filter,相当于该请求在data节点上浪费了一次查询。虽然加了MatchNoDocsQuery的原请求执行速度很快,但是会占用线程池浪费一些cpu时间,而且会浪费线程池的队列空间。
假如用户有512个shard,且用户用512个并发进行访问。需要注意的是,每个并发请求都会转发到所有的shard上,因此在集群的data节点上瞬间会有512*512=26万个任务需要执行,其中仅有512个任务是真正需要执行的,其它的请求都是在浪费集群资源。默认情况下单个节点查询线程池队列是1000,一般集群也没有那么多data节点,难支撑26万个请求。
针对该问题,我们将slice的MatchNoDocsQuery的filter过滤提前到协调节点,不需要再转发这些无用的请求。在协调节点上会计算哪些shard需要真正执行查询任务,因此我们将MatchNoDocsQuery的filter逻辑前置,达到查询剪枝的目的。
除此之外,在并发数和shard数不相等时候,一个并发请求可能会发送到n个shard上。假如用户需要返回m条数据,会向n个shard各请求m条数据,然后在协调节点需要将nm条数据进行排序,选出前m条进行fetch然后再返回给用户,这样相当于浪费了(n-1)m条数据的计算和io资源。因此可以仅从一个shard上获取数据,按顺序将所有shard上的数据拉取结束,在挨个拉取的过程中,还要保持之前在各个shard创建的searchContext,避免SearchContext失效。
查询剪枝后,并发访问方式下,scroll_id也将变得特别短。之前用户拿到的scroll_id特别长,跟用户的shard数成正比,当shard数较多时候,scroll_id也特别长,在传输过程和scroll_id编码解析过程中都会浪费一些系统资源。
shard选择策略
一个索引通常会有很多副本,当请求到达协调节点后,请求应该转发到哪个副本呢?
默认情况下,采用的是随机策略,将所有副本打乱随机拿出一个副本即可。默认的随机策略能够将请求均匀地打散在每一个shard上。假如我们的data节点处理能力不一致,或者由于一些原因造成某些机器负载较高,那么采用随机策略可能不太适用。Elasticsearch提供了一个自适应的选择策略,其能够根据当前的每个节点的状态来选择最佳的副本。参考因素有如下源码列出的,包括节点的client数、队列长度、响应时间、服务时间等。因此,通过"cluster.routing.use_adaptive_replica_selection"参数将副本自适应选择策略打开,能够发挥每一台机器的能力,请求延时能够有效降低,每台机器的负载能够更加均匀。
ComputedNodeStats(int clientNum, NodeStatistics nodeStats) {
this(nodeStats.nodeId, clientNum,(int) nodeStats.queueSize.getAverage(), nodeStats.responseTime.getAverage(), nodeStats.serviceTime);
}
针对SearchScroll请求,如果是频率较高的拉取不同索引的少量数据,那么副本自适应选择策略可以满足需求。但是针对一些大索引拉取数据的case则不再适用。假如某一个索引有512个shard,且需要拉取的数据较多,那么集群资源可能仅够该索引大量拉取,不会再有其他请求过来。当512个并发请求一下子进来协调节点,这时候协调节点会拉取每个data节点的状态来决定把请求发往哪个副本。但是512个并发是一起过来的,因此拿到的nodeStats可能是一致的,会造请求发往相同的data节点,造成一些data节点负载较高,而其他data节点负载较低。SearchScroll的首轮请求会决定了后续请求在哪个data节点执行,因此后续所有请求和首轮一样,造成各个data节点负载不一致。
针对这种情况,如果索引shard较多,且用户是SearchScroll请求,则需要不再使用副本自适应选择策略。
请求支持重试
自Elasticsearch支持SearchScroll以来,scroll_id都是不变的,所有的游标位点信息都是维护在data节点的searchContext中。scroll_id仅仅编码了node_id和context_id。协调节点根据node_id将请求转发到对应的data节点,在data节点上根据context_id拿到searchContext,最后拿到所有相关的具体信息。
当前scroll_id是不支持重试的,强行进行重试可能会造成数据丢失,因此遇到失败则需要全部重新拉取。比如用户有100条数据需要拉取,每次拉10条。当拉取20~30条时候,Elasticsearch已经拿到数据,代表着data节点的游标位点信息已经更新,但是用户网络发生问题,没有取到这10条数据。这时候用户忽略网络异常而继续请求的话,会拿到30~40的10条数据,而20~30的10条数据再也拿不到,造成读取数据丢失。针对这一问题,我们将searchContext中维护的last_emitted_doc编码到scroll_id中,这样在部分场景失败下就可以进行重试。
之前scroll_id的编码是为query_type + array_size + array[context_id + node_id],我们优化后的scroll_id为增加了version、index_name、last_emitted_doc等信息:
- version字段是为了以后做版本兼容使用,当前的scroll_id并没有版本的概念,因此版本兼容难做。
- index_name是索引的名字。虽然该字段对查询没有任何用处,但是在stats监控中需要用到。之前我们仅能统计SearchScroll的整个集群或者Node级别的监控,现在拿到index_name后,可以做到索引级别更细粒度的监控,比如拿到某一个索引Scroll阶段的query、merge、sort、fetch等各项监控信息。
- last_emitted_doc是新增的字段,在Elasticsearch中是ScoreDoc.java,主要编码的是doc和score两个字段。如果ScoreDoc是FieldDoc子类型,则还会编码fields。
scroll_id中编码last_emitted_doc后,用户的每次请求我们都能拿到当前的游标位点信息。在协调节点中,通过InternalScrollSearchRequest将该Request从协调节点发送到data节点,最终data节点不再从searchContext中拿last_emitted_doc,而是从InternalScrollSearchRequest拿到last_emitted_doc游标位点。
除此之外,当前Elasticsearch的SearchContext是不支持并发访问的,且没有给出任何提示,如果并发访问会造成拿到的数据错乱。因此,我们将SearchContext加了状态,如果访问一个正在被访问的SearchContext,则抛出冲突异常。