IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Elasticsearch CCR源码分析 -> 正文阅读

[大数据]Elasticsearch CCR源码分析

本文基于Elasticsearch6.8.5版本

ES使用的是Guice框架,依赖注入和暴露接口的方式和Spring差距较大,可先查看guice框架

节点启动过程:

org/elasticsearch/bootstrap/Elasticsearch.java(main)---》org/elasticsearch/node/Node.java(构造方法)加载插件&module---其中包括---》org/elasticsearch/xpack/ccr/Ccr.java(注册各种action)

CCR是插件的形式:

Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin, RepositoryPlugin

CCR的几个操作:

官网操作API:https://www.elastic.co/guide/en/elasticsearch/reference/7.x/ccr-apis.html

1.本地集群连接远程集群

PUT /_cluster/settings
{
  "persistent" : {
    "cluster" : {
      "remote" : {
        "leader" : {
          "seeds" : [
            "127.0.0.1:9300" 
          ]
        }
      }
    }
  }
}

2.在远程集群创建leader索引

远程集群创建leader索引,和正常创建索引操作一样,需要开启soft_deletes(7.x版本已经默认开启)

3.在本地集群创建follower索引

手动创建:需要指定远程集群和复制的远程索引

PUT /<follower_index>/_ccr/follow?wait_for_active_shards=1
{ 
   "remote_cluster" : "<remote_cluster>", 
   "leader_index" : "<leader_index>" 
} 
自动跟随创建:通过auto_follow API建立自动跟随的模板

PUT /_ccr/auto_follow/<auto_follow_pattern_name>
{
  "remote_cluster" : "<remote_cluster>",
  "leader_index_patterns" :
  [
    "<leader_index_pattern>"
  ],
  "follow_index_pattern" : "<follow_index_pattern>"
}

源码分析:

1.执行 _ccr/follow接口,手动创建一个follow索引,该接口会生成索引并完成全量同步

org.elasticsearch.xpack.ccr.rest.RestPutFollowAction类暴露该接口

/**
     * 暴露 {index}/_ccr/follow 接口
     * @param settings
     * @param controller
     */
    public RestPutFollowAction(Settings settings, RestController controller) {
        super(settings);
        controller.registerHandler(RestRequest.Method.PUT, "/{index}/_ccr/follow", this);
    }

通过import关联到org.elasticsearch.xpack.core.ccr.action.PutFollowAction类

import static org.elasticsearch.xpack.core.ccr.action.PutFollowAction.INSTANCE;

PutFollowAction动作在ccr.java中注册了处理类:TransportPutFollowAction

TransportMasterNodeAction将请求转发给follow集群的master节点

在该方法中org.elasticsearch.action.support.master.TransportMasterNodeAction.AsyncSingleAction#doStart

执行:transportService.sendRequest这一行,实现请求转发(如果当前节点就是master,则不转发)

-->follow集群的master节点执行TransportMasterNodeAction.masterOperation()

-->TransportPutFollowAction.createFollowerIndex()

-->restoreSnapshot() 从"_ccr_$remoteClusterName"的repository的"_latest_"快照恢复

-->afterRestoreStarted()

--->initiateFollowing() 该方法核心主要是构造一个ResumeFollowRequest请求,ResumeFollowRequest请求的handler会对每个主分片构造一个任务,去不断刷新leader的更新数据(由此开始增量同步上面的流程主要是全量同步

---->TransportResumeFollowAction.start() 开始恢复数据并且修改index状态

----->persistentTasksService.sendStartRequest( ShardFollowTask.NAME, shardFollowTask) 作用:Notifies the master node to create new persistent task and to assign it to a node(通知follow集群的主节点创建新的持久性任务---增量同步数据,long pull,并将其分配给各个节点)

// 执行sendStartRequest之前先create了一个task
final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request.getParameters(),
                leaderIndexMetadata, followIndexMetadata, filteredHeaders);
// 循环,对每个shard产生task
for (int shardId = 0; shardId < numShards; shardId++) {
            String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
            final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request.getParameters(),
                leaderIndexMetadata, followIndexMetadata, filteredHeaders);
            persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, handler.getActionListener(shardId));
        }

到此,只剩下增量同步的逻辑,全部由ShardFollowTask完成;

PS:入口类Ccr在getPersistentTasksExecutor方法中,完成了对ShardFollowTasksExecutor的注册,该类为执行ShardFollowTask的执行器

如上sendStartRequest方法发送一个Task时,会触发到org.elasticsearch.cluster.service.ClusterApplierService#runTask监听,开始执行ShardFollowTasksExecutor中的方法;

------>org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#start 开启增量复制

------->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#coordinateReads

---->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#sendShardChangesRequest(long, int, long)?

----->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#innerSendShardChangesRequest

作用:获取setting、mapping、translog、 leaderGlobalCheckpoint、leaderMaxSeqNo并更新

--->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#handleReadResponse

--->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#innerHandleReadResponse

--->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#coordinateWrites

--->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#sendBulkShardOperationsRequest

--->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#

innerSendBulkShardOperationsRequest

作用:写入数据, 得到followerGlobalCheckpoint、followerMaxSeqNo并更新

2.执行 /auto_follow接口(当有新的集群创建时,自动触发全量同步)

1. 执行该API时,集群状态中会保存该pattern

2. Master节点上的AutoFollowCoordinator会一直监听remoteCluster的集群状态,如果发现新创建了索引,且符合pattern,则调用TransportPutFollowAction,执行与上述 _ccr/follow 接口一样的流程

请求接收细节流程

在follow集群执行该api时代码执行流程(只关心CCR逻辑时可忽略)

1.接收到请求,经过netty的各个方法

2.org.elasticsearch.rest.RestController#dispatchRequest

(在RestController类中初始化NodeClient client,往后传递)

3.解析&处理该request

4.到达?org.elasticsearch.xpack.ccr.rest.RestPutFollowAction#getName

5.到org.elasticsearch.xpack.core.ccr.action.PutFollowAction#fromXContent 进一步处理request

6.执行client.execute到?org.elasticsearch.client.support.AbstractClient#execute

7.回到AbstractClient的实现类?org.elasticsearch.client.node.NodeClient#doExecute

8.到org.elasticsearch.client.node.NodeClient#executeLocally方法中执行transportAction(action).execute(request, listener); 到此关联到了transportAction

9.到org.elasticsearch.action.support.TransportAction#execute,继续执行到该行代码:requestFilterChain.proceed(task, actionName, request, listener);在该类继续执行到?this.action.filters[i].apply(task, actionName, request, listener, this);

10.到org.elasticsearch.xpack.security.action.filter.SecurityActionFilter#apply

执行到chain.proceed(task, action, request, listener);

11.到org.elasticsearch.action.support.TransportAction.RequestFilterChain#proceed

执行this.action.doExecute(task, request, listener);

12.到TransportMasterNodeAction类

org.elasticsearch.action.support.master.TransportMasterNodeAction#doExecute(org.elasticsearch.tasks.Task, Request, org.elasticsearch.action.ActionListener<Response>)

开启线程:new AsyncSingleAction(task, request, listener).start();

13.接下来是几个线程相关类

org.elasticsearch.common.util.concurrent.EsExecutors.DirectExecutorService#execute

org.elasticsearch.common.util.concurrent.AbstractRunnable#run

14.开始执行masterOperation方法,执行到org.elasticsearch.action.support.master.TransportMasterNodeAction#masterOperation(org.elasticsearch.tasks.Task, Request, org.elasticsearch.cluster.ClusterState, org.elasticsearch.action.ActionListener<Response>)

15.进一步找到masterOperation方法实现类TransportPutFollowAction

org.elasticsearch.xpack.ccr.action.TransportPutFollowAction#masterOperation

到此:关联到TransportPutFollowAction类

通过入口类的action绑定,完成在action中找到对应实现类的过程

重点:

增量同步是如何实现不断刷新的(long pull模式)???

1.coordinateReads方法读取leader集群信息以及operations

2.org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#start

coordinateReads->sendShardChangesRequest(long, int, long)->sendShardChangesRequest(long, int, long, java.util.concurrent.atomic.AtomicInteger)->handleReadResponse->innerHandleReadResponse(执行了coordinateWrites)->coordinateReads/sendShardChangesRequest

TODO:

http请求(ccr/follow)接收到后,follow集群节点开始全量同步,是以snapshot的模式去拉leader集群数据的,那么是在什么时候将leader集群伪装成snapshot的repository的?理论上应该是在Node初始化的时候...还未验证,后续再补充该逻辑...

PS:

如有缺失或错误,欢迎补充&指正...

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-09 17:33:49  更:2021-07-09 17:34:13 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 17:39:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码