IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> skywalking源码分析第十二篇一server-receiver-plugin之Register服务注册发现模块启动 -> 正文阅读

[Java知识库]skywalking源码分析第十二篇一server-receiver-plugin之Register服务注册发现模块启动

原理图

  • 不同信息的服务注册都是同一套流程
  • 先基于缓存和存储获取,获取成功则返回agent交由agent构建字典信息[基于字典传输降低带宽]
  • 信息获取失败则异步创建
  • agent获取失败则轮询发起服务注册请求
    在这里插入图片描述

源码分析一RegisterModuleProvider

  • 为grpc服务器和jetty服务器注册各类handler
  • 其中RegisterServiceHandler用于处理服务注册,服务实例注册以及[endpoint 和networkaddress]同步请求

高版本agent交互基本走grpc,skywalking-ui交互走jetty


public class RegisterModuleProvider extends ModuleProvider {
    @Override public void prepare() {
    }
    高版本agent交互基本走grpc,skywalking-ui交互走jetty
    @Override public void start() {
        GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class);
        grpcHandlerRegister.addHandler(new ApplicationRegisterHandler(getManager()));
        grpcHandlerRegister.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
        grpcHandlerRegister.addHandler(new ServiceNameDiscoveryHandler(getManager()));
        grpcHandlerRegister.addHandler(new NetworkAddressRegisterServiceHandler(getManager()));

        服务注册 服务实例注册 以及[endpoint 和networkaddress]同步请求
        grpcHandlerRegister.addHandler(new RegisterServiceHandler(getManager()));
        心跳请求 更新heatbeat
        grpcHandlerRegister.addHandler(new ServiceInstancePingServiceHandler(getManager()));
        其他handler主要处理低版本协议
        JettyHandlerRegister jettyHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(JettyHandlerRegister.class);
        jettyHandlerRegister.addHandler(new ApplicationRegisterServletHandler(getManager()));
        jettyHandlerRegister.addHandler(new InstanceDiscoveryServletHandler(getManager()));
        jettyHandlerRegister.addHandler(new InstanceHeartBeatServletHandler(getManager()));
        jettyHandlerRegister.addHandler(new NetworkAddressRegisterServletHandler(getManager()));
        jettyHandlerRegister.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
    }

    @Override public void notifyAfterCompleted() {
    }
}

源码分析一服务注册

注册方法概览

其他注册流程类似,不在重复叙述

RegisterServiceHandler方法作用
doServiceRegister服务注册
doServiceInstanceRegister服务实例注册
doEndpointRegister端点注册,服务端生成id,返回agent端,agent维护id和name字典
doNetworkAddressRegisterip注册,同endpoint流量
doServiceAndNetworkAddressMappingRegister维护服务和ip映射关系

什么是端点?

端点名一般由agent插件定义,并且在字典中维护[id和名称的关系],传输时通过id传输避免网络带宽

dubbo的端点为接口名
下图为mysql端点名称在这里插入图片描述
下图为elasticsearch端点名称在这里插入图片描述

什么是字典?

字典维护在agent端,agent注册的EndPoint和ip等信息会在服务端生成唯一id并返回agent,agent依据id和名称的关系构建字典映射

将来agent上报record和metrics等数据则通过字典表将endpoint,ip等信息转成对应的唯一id

作用: 减少网络传输的带宽

源码分析一doServiceRegister服务注册

  • 通过IServiceInventoryRegister获取serviceid
  • 返回agent相关信息
@Override public void doServiceRegister(Services request, StreamObserver<ServiceRegisterMapping> responseObserver) {
    ServiceRegisterMapping.Builder builder = ServiceRegisterMapping.newBuilder();
    request.getServicesList().forEach(service -> {
        String serviceName = service.getServiceName();
        服务注册的service_id 如果不存在则直接返回agentagent会轮询调用直到获取成功
        service_id不存在会异步创建
        int serviceId = serviceInventoryRegister.getOrCreate(serviceName, null);
        if (serviceId != Const.NONE) {
            KeyIntValuePair value = KeyIntValuePair.newBuilder().setKey(serviceName).setValue(serviceId).build();
            builder.addServices(value);
        }
    });
    responseObserver.onNext(builder.build());
    responseObserver.onCompleted();
}

源码分析一ServiceInventoryRegister.getOrCreate服务注册信息缓存获取或者异步创建

  • 通过缓存获取,缓存不存在则通过es获取
  • 存储层也获取不到则异步构建,agent获取不到注册结果会轮询调用
  • InventoryStreamProcessor处理创建serviceId流程
@Override public int getOrCreate(String serviceName, JsonObject properties) {
    通过缓存获取,缓存不存在则通过es获取
    int serviceId = getServiceInventoryCache().getServiceId(serviceName);
    存储层也获取不到则异步构建,返回agent NONE,agent注册后字典填充失败会轮询调用
    if (serviceId == Const.NONE) {
        对应service_inventory索引
        ServiceInventory serviceInventory = new ServiceInventory();
        serviceInventory.setName(serviceName);
        serviceInventory.setAddressId(Const.NONE);
        serviceInventory.setIsAddress(BooleanUtils.FALSE);

        long now = System.currentTimeMillis();
        serviceInventory.setRegisterTime(now);
        serviceInventory.setHeartbeatTime(now);
        serviceInventory.setMappingServiceId(Const.NONE);
        serviceInventory.setLastUpdateTime(now);
        serviceInventory.setProperties(properties);
        不存在异步添加
        InventoryStreamProcessor.getInstance().in(serviceInventory);
    }
    return serviceId;
}

源码分析一ServiceInventoryCache服务注册缓存获取

  • 缓存获取
  • 缓存获取失败则通过elasticSearch获取
public int getServiceId(String serviceName) {
    缓存获取
    Integer serviceId = serviceNameCache.getIfPresent(ServiceInventory.buildId(serviceName));
    if (Objects.isNull(serviceId) || serviceId == Const.NONE) {
        缓存不存在则通过存储层查找
        serviceId = getCacheDAO().getServiceId(serviceName);
        if (serviceId != Const.NONE) {
            serviceNameCache.put(ServiceInventory.buildId(serviceName), serviceId);
        }
    }
    return serviceId;
}

源码分析一InventoryStreamProcessor服务注册注册异步创建

RegisterDistinctWorker------RegisterRemoteWorker------RegisterPersistentWorker

流式处理节点名称作用
RegisterDistinctWorker缓冲注册请求,去重,批量发送下一个worker
RegisterRemoteWorker远程通信,发往其他集群节点处理
RegisterPersistentWorker缓冲注册请求,去重,加锁创建注册信息

为什么注册需要去重?

serviceId会有一个集群多个agent同时发送该serviceid的创建
agent 第一次注册获取serviceId失败会轮询请求

源码分析一RegisterDistinctWorker

  • 流式处理L1聚合阶段主要处理去重,缓冲

private void onWork(RegisterSource source) {
    messageNum++;

    if (!sources.containsKey(source)) {
        第一次出现则加入缓冲区
        sources.put(source, source);
    } else {
        已经存在则合并
        sources.get(source).combine(source);
    }
    消息数量超过1000或者一个批次的数据  【isEndOfBatch不在分析,其原理为,生产者生产时 endOfBatch  =false  消费者消费时,获取缓冲区所有的消息则发送nextWorker】
    也就是生产者在持续生产 则缓存区满1000则发送下一个worker, 生产者不在生产,则消费者将所有的消息的最后一条设置成end,触发nextWorker
    if (messageNum >= 1000 || source.isEndOfBatch()) {
        sources.values().forEach(nextWorker::in);
        sources.clear();
        messageNum = 0;
    }
}

源码分析一RegisterPersistentWorker

  • 流式处理L2聚合阶段主要处理注册请求的创建
  • 通过registerLockDao进行加锁保障线程安全
private void onWork(RegisterSource registerSource) {
    if (!sources.containsKey(registerSource)) {
        sources.put(registerSource, registerSource);
    } else {
        sources.get(registerSource).combine(registerSource);
    }

    try (HistogramMetrics.Timer timer = workerLatencyHistogram.createTimer()) {
        if (sources.size() > 1000 || registerSource.isEndOfBatch()) {
            sources.values().forEach(source -> {
                try {
                    存在则更新心跳时间等
                    RegisterSource dbSource = registerDAO.get(modelName, source.id());
                    if (Objects.nonNull(dbSource)) {
                        if (dbSource.combine(source)) {
                            registerDAO.forceUpdate(modelName, dbSource);
                        }
                    } else {
                        不存在加锁
                        int sequence;
                        返回一个自增id 为一个全局锁功能 确保创建流程的独占性 [乐观锁更新失败返回Const.NONE]
                        if ((sequence = registerLockDAO.getId(scopeId, source)) != Const.NONE) {
                            // --------------类似单例模式的双重锁机制,保障线程安全--------------
                            加锁成功获取
                            try {
                                dbSource = registerDAO.get(modelName, source.id());
                                获取存在则更新
                                if (Objects.nonNull(dbSource)) {
                                    if (dbSource.combine(source)) {
                                        registerDAO.forceUpdate(modelName, dbSource);
                                    }
                                } else {
                                    获取不存在则注册
                                    source.setSequence(sequence);
                                    registerDAO.forceInsert(modelName, source);
                                }
                            } catch (Throwable t) {
                                logger.error(t.getMessage(), t);
                            }
                        } else {
                            加锁失败说明有其他线程在处理注册请求,该线程无需创建
                            logger.info("{} inventory register try lock and increment sequence failure.", DefaultScopeDefine.nameOf(scopeId));
                        }
                    }
                } catch (Throwable t) {
                    logger.error(t.getMessage(), t);
                }
            });
            sources.clear();
        }
    }
}

总结

  • 服务注册将读写分离,读取失败由agent轮询注册
  • 异步写时通过registerLockDAO加锁保障线程安全
  • 其中不同信息的注册流程大同小异

备注: registerLockDAO原理

  • 存储模块启动时会通过RegisterLockEs7Installer完成registerLockDAO对应的es索引初始化
  • registerLockDAO通过对注册场景对应的scopeId进行es乐观锁自增更新
  • 更新成功加锁成功,更新失败加锁失败
 public void install() throws StorageException {
    删除其他代码
   	创建registerLockDAO的底层索引register_lock
    createIndex();
    遍历每个注册信息的scopeId,在register_lock插入一行数据
    主键_document_id = scopeId,乐观锁字段sequence值初始化为1
    for (Class registerSource : InventoryStreamProcessor.getInstance().getAllRegisterSources()) {
        int scopeId = ((Stream)registerSource.getAnnotation(Stream.class)).scopeId();
        putIfAbsent(scopeId);
    }
} 

register_lock索引概览

__document_idsequence注册类说明
141ServiceInventory
  • DefaultScopeDefine.SERVICE_INVENTORY值为14,代表ServiceInventory服务注册的场景编号为14,则在register_lock索引插入一行主键为14的数据
  • sequence为乐观锁机制,初始化为1
  • 以上信息都是基于ServiceInventory类的注解@Stream指定

@Stream(name = ServiceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INVENTORY, builder = ServiceInventory.Builder.class, processor = InventoryStreamProcessor.class)

IRegisterLockDAO乐观锁实现

  • 获取register_lock索引sequence字段
  • 对sequence字段自增并force强刷es
  • 更新成功则加锁成功

 @Override public int getId(int scopeId, RegisterSource registerSource) {
    String id = scopeId + "";
    int sequence = Const.NONE;
    try {
        GetResponse response = getClient().get(RegisterLockIndex.NAME, id);
        if (response.isExists()) {
            Map<String, Object> source = response.getSource();
            sequence = ((Number)source.get(RegisterLockIndex.COLUMN_SEQUENCE)).intValue();
            long version = response.getVersion();
            增加版本号
            sequence++;
            使用es的乐观锁
            lock(id, sequence, version);
        }
    } catch (Throwable t) {
        return Const.NONE;
    }
    return sequence;
}  

private void lock(String id, int sequence, long version) throws IOException {
    XContentBuilder source = XContentFactory.jsonBuilder().startObject();
    es sequence字段
    source.field(RegisterLockIndex.COLUMN_SEQUENCE, sequence);
    source.endObject();

    getClient().forceUpdate(RegisterLockIndex.NAME, id, source, version);
}
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-06 12:48:23  更:2022-03-06 12:51:06 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 13:43:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码