IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> skywalking源码分析第七篇一StorageModule模块启动 -> 正文阅读

[大数据]skywalking源码分析第七篇一StorageModule模块启动

存储实现本文默认elasticsearch

数据模型

  • StorageData代表一条数据
  • 每一条数据都会被存储到elasticsearch
  • 数据类上如果标注@Stream注解,则会被构建成Model对象[第五篇annotationScan已经介绍]
  • StorageModuleProvider启动时会基于Model向es创建索引

在这里插入图片描述

存储层架构设计

  • StorageDAO 类似工厂用于获取真正的dao实现
  • 不同dao实现用于读写StorageData
  • StorageBuilder用于完成与es交互的防腐层转换,es层全部基于map传递数据,StorageBuilder支持map和StorageData的相互转换
  • stream注解的扫描会完成Model的创建,model对应es的索引
  • ModelInstaller会根据model对象向es存储发送请求索引创建请求
    在这里插入图片描述

注册数据锁原理图

  • 有一种特殊的Dao 叫做IRegisterLockDAO
  • IRegisterLockDAO用于对注册数据进行加锁
  • 实现原理基于es的版本号[乐观锁机制]
  • 每一个@stream内有一个scopeId,比如endpoint注册信息,其写入es时会获取锁
  • [启动阶段会为EndpointInventory创建endpoint_inventory索引,存储相关数据,同时会为register_lock创建一个id为16的数据,表示EndpointInventory当前的版本号]
  • 将来在对endpoint_inventory写数据需要依据register_lock索引id为16的数据版本号++是否成功来判断是否加锁成功,从而决定写endpoint_inventory索引
    在这里插入图片描述

源码分析一StorageModuleElasticsearchProvider

  • prepare完成esclient的创建和服务注册
  • start完成es索引的创建【init模式】
public class StorageModuleElasticsearch7Provider extends ModuleProvider {

    @Override
    public void prepare() throws ServiceNotProvidedException {
     
        if (!StringUtil.isEmpty(config.getNameSpace())) {
            config.setNameSpace(config.getNameSpace().toLowerCase());
        }
        elasticSearch7Client = new ElasticSearch7Client(config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config.getTrustStorePass(), config.getNameSpace(), config.getUser(), config.getPassword());
        构建esClient
        注册serviceImpl 必须要注册所有StorageModule.services方法的接口
        this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests()));
        this.registerServiceImplementation(StorageDAO.class, new StorageEs7DAO(elasticSearch7Client));
        this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockEs77DAOImpl(elasticSearch7Client));
        this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearch7Client, new ElasticsearchStorageTTL()));

        this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
        this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
        this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEs7DAO(elasticSearch7Client));
        this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));

        this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearch7Client));
        this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEs7DAO(elasticSearch7Client));
        this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize()));
        this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEs7DAO(elasticSearch7Client, config.getMetadataQueryMaxSize()));
        this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client));
        this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEs7DAO(elasticSearch7Client));
        this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearch7Client));
        this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEs7DAO(elasticSearch7Client));
    }

    @Override
    public void start() throws ModuleStartException {
        overrideCoreModuleTTLConfig();

        try {
            elasticSearch7Client.connect();
            根据mode模式决定是否初始化es索引
            StorageEs7Installer installer = new StorageEs7Installer(getManager(), config);
            installer.install(elasticSearch7Client);
            初始化锁索引
            RegisterLockEs7Installer lockInstaller = new RegisterLockEs7Installer(elasticSearch7Client);
            lockInstaller.install();
        } catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) {
            throw new ModuleStartException(e.getMessage(), e);
        }
    }

    @Override
    public void notifyAfterCompleted() {
        无需处理
    }

    @Override
    public String[] requiredModules() {
        return new String[]{CoreModule.NAME};
    }

}

索引创建ModelInstaller.install

  • StorageModels获取es索引映射对象
  • init模式调用esclient创建索引
 public final void install(Client client) throws StorageException {
        IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
        StorageModels获取es索引映射对象
        List<Model> models = modelGetter.getModels();

        no-init 不会创建表 es索引等
        if (RunningMode.isNoInitMode()) {
            for (Model model : models) {
                while (!isExists(client, model)) {
                    try {
                        logger.info("table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.", model.getName());
                        Thread.sleep(3000L);
                    } catch (InterruptedException e) {
                        logger.error(e.getMessage());
                    }
                }
            }
        } else {
            init 索引不存在在创建索引
            for (Model model : models) {
                if (!isExists(client, model)) {
                    logger.info("table: {} does not exist", model.getName());
                    createTable(client, model);
                }
            }
        }
    }

源码分析一@stream注解处理

  • 分为四类StreamProcessor处理数据[agent上报分为4类]
  • StreamProcessor将来处理agent上报的数据
public class StreamAnnotationListener implements AnnotationListener {

    @SuppressWarnings("unchecked")
    @Override public void notify(Class aClass) {
        if (aClass.isAnnotationPresent(Stream.class)) {
            Stream stream = (Stream)aClass.getAnnotation(Stream.class);
            根据不同元组的Stream标记 创建Storage元信息,并构建Model 将来ModelInstaller 会根据Model信息执行es索引初始化
             构建entryWorkers数据的用于流式处理
            if (stream.processor().equals(InventoryStreamProcessor.class)) {
                InventoryStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else if (stream.processor().equals(RecordStreamProcessor.class)) {
                RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else if (stream.processor().equals(MetricsStreamProcessor.class)) {
                MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else if (stream.processor().equals(TopNStreamProcessor.class)) {
                TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else {
                throw new UnexpectedException("Unknown stream processor.");
            }
        } else {
            throw new UnexpectedException("Stream annotation listener could only parse the class present stream annotation.");
        }
    }
}

举例一InventoryStreamProcessor.create

  • StorageModels根据stream注解添加Model[installer根据此model合集在es创建索引]
  • 构建worker链路[RegisterDistinctWorker[l1聚合]-> RegisterRemoteWorker -> RegisterPersistentWorker[l2聚合]]

public void create(ModuleDefineHolder moduleDefineHolder, Stream stream,
    Class<? extends RegisterSource> inventoryClass) {
    StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
    IRegisterDAO registerDAO;
    try {
        registerDAO = storageDAO.newRegisterDao(stream.builder().newInstance());
    } catch (InstantiationException | IllegalAccessException e) {
        throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " register DAO failure.", e);
    }

    IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
    添加到StorageModels 之后installer会根据其包含的model以及init模式进行存储层的table创建[es,db等]
    Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), false);
    最后一个worker 对请求去重 例如多个服务实例一起进行应用名注册  此外服务注册失败会进行去重
    RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId());

    String remoteReceiverWorkerName = stream.name() + "_rec";
    IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
    workerInstanceSetter.put(remoteReceiverWorkerName, persistentWorker, inventoryClass);
    发往其他节点处理
    /*
    core:
      default:
        # Mixed: Receive agent data, Level 1 aggregate, Level 2 aggregate
        # Receiver: Receive agent data, Level 1 aggregate
        # Aggregator: Level 2 aggregate


     */
    RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
    第一个worker
    RegisterDistinctWorker distinctWorker = new RegisterDistinctWorker(moduleDefineHolder, remoteWorker);

    entryWorkers.put(inventoryClass, distinctWorker);
}

总结

  • Storage完成es索引的创建
  • 概述Storage上报数据的分类以及Processor数据处理的分类
  • 概述Storage的架构设计
  • 讲解一个特殊的索引register_lock提供的加锁功能
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-03 16:21:05  更:2022-03-03 16:24:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 20:05:23-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码