IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Elasticsearch CCR源码分析(补充) -> 正文阅读

[大数据]Elasticsearch CCR源码分析(补充)

接上篇TODO

Elasticsearch CCR源码分析

上篇TODO:

http请求(ccr/follow)接收到后,follow集群节点开始全量同步,是以snapshot的模式去拉leader集群数据的,那么是在什么时候将leader集群伪装成snapshot的repository的?理论上应该是在Node初始化的时候...还未验证,后续再补充该逻辑...
?

如何加载ccr repositoryService

1.node启动:

进入构造方法:

org.elasticsearch.node.Node#Node(org.elasticsearch.env.Environment, java.util.Collection<java.lang.Class<? extends org.elasticsearch.plugins.Plugin>>, boolean)

2.CcrRepositoryManager在节点启动的时候通过调用为每一个remoteCluster注册了名为 “_ccr_” + remoteCluster?的ccrRepository

Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
                .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
                                                 scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,
                                                 namedWriteableRegistry).stream())
                .collect(Collectors.toList());

在createComponents方法(org.elasticsearch.xpack.ccr.Ccr#createComponents)中,调用了

new CcrRepositoryManager(settings, clusterService, (NodeClient) client)

继续加载其他...

开始调用org.elasticsearch.xpack.ccr.CcrRepositoryManager.RemoteSettingsUpdateListener#updateRemoteCluster

@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy, boolean compressionEnabled,
                                           TimeValue pingSchedule) {
            String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias;
            if (addresses.isEmpty()) {
                deleteRepository(repositoryName);
            } else {
                putRepository(repositoryName);
            }
        }

经历一系列方法:

?最终--->org.elasticsearch.xpack.ccr.CcrRepositoryManager#putRepository

3.加载snapshot/restore的module

modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), transportService,
                clusterService, threadPool, xContentRegistry));

4.进入RepositoriesModule构造方法:

Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry);

5.进入getInternalRepositories实现类:

执行重写的?getInternalRepositories方法

org.elasticsearch.xpack.ccr.Ccr#getInternalRepositories

@Override
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
        Repository.Factory repositoryFactory =
            (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool.get());
        return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
    }

6.org.elasticsearch.repositories.RepositoriesModule#RepositoriesModule构造了repositoriesService

?

?7.开始注入初始化的一些bean或者service(相当于Spring的单例池)

injector = modules.createInjector();

8.node初始化的最后,暴露rest API

if (NetworkModule.HTTP_ENABLED.get(settings)) {
                logger.debug("initializing HTTP handlers ...");
                actionModule.initRestHandlers(() -> clusterService.state().nodes());
            }

9.小结

Node初始化的时候会将很多handler、service初始化完成(包括CCR相关service,repository),并且直接为每一个remoteCluster注册了名为 “_ccr_” + remoteCluster的ccrRepository

等到触发全量同步的时候(API触发或者auto API自动触发,检测是否是开启了Ccr且是Ccr的Restore流程,则会走到CcrRepository,否则走正常的repository),会走到org.elasticsearch.xpack.ccr.repository.CcrRepository的逻辑开始restore(即将leader集群视为一个仓库,leader数据视为名为“_latest_”的一个snapshot,获取元数据 & 数据)

void restoreFiles() throws IOException {
            ArrayList<FileInfo> fileInfos = new ArrayList<>();
            for (StoreFileMetaData fileMetaData : sourceMetaData) {
                ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length());
                fileInfos.add(new FileInfo(fileMetaData.name(), fileMetaData, fileSize));
            }
            SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos);
            // 获取数据的核心方法
            restore(snapshotFiles);
        }

TODO

到此CCR的逻辑已经完全通了,CCR借助snapshot的逻辑进行restore的细节仍然不明朗,例如,把leader集群视为一个repository,将leader集群的数据作为snapshot,是如何进行恢复的?直接拷贝文件到follow集群本地,follow集群的shard逐一开始恢复?增量复制的时候直接读取leader集群的operations(translog),是直接读取在内存中直接回放(bulk写入follow集群shard)?还是其他逻辑?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-14 23:05:55  更:2021-07-14 23:06:28 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/28 9:51:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码