IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> nacos注册中心的注册原理深度解析 -> 正文阅读

[网络协议]nacos注册中心的注册原理深度解析

注册流程

在这里插入图片描述
整个注册中心的注册和发现流程主要有三个方面来完成:服务的提供方(以下简称server)、服务的消费者(以下简称client)、注册中心(nacos)。首先我们来探讨server与nacos的交互过程。

server需要通过nacos官方的OpenApi提供的接口来发起服务注册请求,随后server会定时向nacos发送心跳信息来进行心跳检测,对于使用者来说这一步可以采用ScheduledExecutorService创建定时任务来完成。nacos会异步的处理注册请求任务和心跳任务。

nacos心跳机制
在这里插入图片描述
nacos和client之间采取推拉结合的交互方式,一方面client可以通过定时任务每隔10s向nacos发起查询请求,如果服务列表改变nacos就会返回新列表,另一方面当本地服务实例发生变化时(即server实例注册成功或者心跳停止断开链接),nacos会主动通过UDP协议推送到client,udp协议非常快,不需要保持长连接。在注册中心的场景中client数量往往多于server,如果每一次服务更新,nacos要和成千上万的服务消费者去建立Tcp的话性能肯定是不行的。而如果UDP通知失败,客户端每10秒还会主动去拉一次,客户端拉取和服务器推送是互补的,这样既能保证server实例更新的时效性,又能提高效率。

分析nacos服务注册源码

官方OpenApi服务注册
在这里插入图片描述
我们跟进到接口源码,可以看到他是通过registerInstance()方法完成服务实例注册的

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {

    /**
     * Register new instance.
     *
     * @param request http request
     * @return 'ok' if success
     * @throws Exception any error during register
     */
    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        final Instance instance = parseInstance(request);
        //注册服务实例
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }
    
}

在nacos源码中有一个类ServiceManager,它实现了RecordListener接口,我们可以看到registerInstance()方法

  public class ServiceManager implements RecordListener<Service> {
    
     //服务注册
     public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        
        Service service = getService(namespaceId, serviceName);
        
        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }
        
     //注册实例
     public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        
        Service service = getService(namespaceId, serviceName);
        
        synchronized (service) {
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            //存放服务实例
            consistencyService.put(key, instances);
        }
    }
    
}

再跟进ConsistencyService,我们找到它的一个实现类DistroConsistencyServiceImpl在这里插入图片描述
跟进源码

public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {   
    
    @Override
    public void put(String key, Record value) throws NacosException {
        onPut(key, value);
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                globalConfig.getTaskDispatchPeriod() / 2);
    }
    
    public void onPut(String key, Record value) {
        
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            dataStore.put(key, datum);
        }
        
        if (!listeners.containsKey(key)) {
            return;
        }
        //notifier是一个阻塞队列,我们把注册任务添加进去
        notifier.addTask(key, DataOperation.CHANGE);
    }
    
     public void addTask(String datumKey, DataOperation action) {
            
            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            //将请求放入阻塞队列中
            tasks.offer(Pair.with(datumKey, action));
        }
    
}

Notifier是一个阻塞队列,我们把注册任务添加进去,然后执行,这一步的目的主要是将请求写入和执行请求分离开来,实现异步处理,再跟进Notifier处理请求的源码

public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {

      //创建Notifier对象
     private volatile Notifier notifier = new Notifier();

    //当前对象创建的时候调用此方法讲notifier任务加入线程池执行
    @PostConstruct
    public void init() {
        GlobalExecutor.submitDistroNotifyTask(notifier);
    }
    
}

public class GlobalExecutor {
    
      private static final ScheduledExecutorService DISTRO_NOTIFY_EXECUTOR = ExecutorFactory.Managed
           .newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
                    new NameThreadFactory("com.alibaba.nacos.naming.distro.notifier"));

    //将任务加入线程池
    public static void submitDistroNotifyTask(Runnable runnable) {
        DISTRO_NOTIFY_EXECUTOR.submit(runnable);
    }
    
}

 public class Notifier implements Runnable {
     
     @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");
            
            for (; ; ) {
                try {
                    
                    Pair<String, DataOperation> pair = tasks.take();
                    handle(pair);
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
     
 }

在这里插入图片描述

总结

server向nacos发起注册任务请求,并维持一个心跳检测的定时任务,naocs会通过阻塞队列异步地处理这些请求,并实时的通过UDP推送到client,为防止UDP数据丢失,client也会通过定时任务每隔10s向nacos发送拉取请求,当服务列表改变,nacos再返回。

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-09-10 11:12:15  更:2021-09-10 11:13:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年6日历 -2024/6/18 19:17:39-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码