IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> nacos的学习 -> 正文阅读

[Java知识库]nacos的学习

nacos学习的心历路程

在spring-cloud-alibaba 的全年规划中,杨翊简单介绍了一下Nacos 从1.0的架构图和2.0的架构图,以及解决的问题,一下图片出于杨翊杭州分享。

nacos 1.0 架构

nacos1.0架构设计问题

nacos 2.0架构设计

nacos2.0架构设计问题

nacos 1.0 到2.0 中一直延续的 一致性协议

Distor

Distro是阿里巴巴的私有协议,目前流行的Nacos服务管理框架就采用了Distro协议。Distro 协议被定位为 临时数据的一致性协议 :该类型协议, 不需要把数据存储到磁盘或者数据库 ,因为临时数据通常和服务器保持一个session会话, 该会话只要存在,数据就不会丢失

Raft

该论文主要包含一下模块:

  1. 节点间的选举
  2. log 的复制
  3. 集群配置的改变

参考文档

raft论文中文版本,

开源java raft协议实现框架

[个人文章](解读Raft(一 算法基础) (qq.com))

SOFAJRaft官网

notify

相当于一个监听

spring cloud commons 和Spring Cloud Context

在进行nacos深入分析前,有必要回过头来说一下,spring cloud commons,其为我们提供了一些规约,各个功能提供者只需要实现规约来达到提供功能的目的。详情参见官方文档。

spring cloud commons 官方文档地址

nacos 的client 服务注册

注册的流程都是使用 nameingservice 调用http 请求进行注册的,我们不需要进行区分,但是心跳的话就需要分版本进行考虑了。其实服务注册的所有服务我们关注 NamingService 接口即可。其实还是使用ServiceRegistry 进行服务注册。并使用DiscoveryClient进行服务发现能力。

public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
    //通过监听WebServerInitializedEvent事件,将客户端信息注册到nacos 服务端
     protected void register() {
        this.serviceRegistry.register(this.getRegistration());
    }
}
    
public class NacosServiceRegistry implements ServiceRegistry<Registration> {
    
    private final NacosDiscoveryProperties nacosDiscoveryProperties;
    private final NamingService namingService;
    public void register(Registration registration) {
        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
        } else {
            String serviceId = registration.getServiceId();
            String group = this.nacosDiscoveryProperties.getGroup();
            Instance instance = this.getNacosInstanceFromRegistration(registration);

            try {
                //调用nameingservice 进行服务注册
                this.namingService.registerInstance(serviceId, group, instance);
                log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
            } catch (Exception var6) {
                log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var6});
                ReflectionUtils.rethrowRuntimeException(var6);
            }

        }
    }
    //基本信息都在instance 中
     private Instance getNacosInstanceFromRegistration(Registration registration) {
        Instance instance = new Instance();
        instance.setIp(registration.getHost());
        instance.setPort(registration.getPort());
        instance.setWeight((double)this.nacosDiscoveryProperties.getWeight());
        instance.setClusterName(this.nacosDiscoveryProperties.getClusterName());
        instance.setMetadata(registration.getMetadata());
        return instance;
    }
}


public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
        params.put("namespaceId", this.getNamespaceId());
        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {
            throw new IllegalArgumentException("no server available");
        } else {
            Exception exception = new Exception();
            if (servers != null && !servers.isEmpty()) {
                Random random = new Random(System.currentTimeMillis());
                int index = random.nextInt(servers.size());
				//随机找一个服务端进行注册
                for(int i = 0; i < servers.size(); ++i) {
                    String server = (String)servers.get(index);

                    try {
                        //进行注册
                        return this.callServer(api, params, server, method);
                    } catch (NacosException var11) {
                        exception = var11;
                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);
                    } catch (Exception var12) {
                        exception = var12;
                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);
                    }

                    index = (index + 1) % servers.size();
                }
            }
        }
    }
 public String callServer(String api, Map<String, String> params, String curServer, String method) throws NacosException {
        long start = System.currentTimeMillis();
        long end = 0L;
        this.checkSignature(params);
        List<String> headers = this.builderHeaders();
        String url;
        if (!curServer.startsWith("https://") && !curServer.startsWith("http://")) {
            if (!curServer.contains(":")) {
                curServer = curServer + ":" + this.serverPort;
            }

            url = HttpClient.getPrefix() + curServer + api;
        } else {
            url = curServer + api;
        }

        HttpResult result = HttpClient.request(url, headers, params, "UTF-8", method);
        end = System.currentTimeMillis();
        MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)).observe((double)(end - start));
        if (200 == result.code) {
            return result.content;
        } else if (304 == result.code) {
            return "";
        } else {
            throw new NacosException(500, "failed to req API:" + curServer + api + ". code:" + result.code + " msg: " + result.content);
        }
    }

nacos 提供 DiscoveryClient

使用自动配置给我们提供了一个discoveryclient供应用使用。而NacosServiceDiscovery才是真正的能力提供者,其最终还是调用NamgingService 进行能力提供。

@Configuration(proxyBeanMethods = false)
public class NacosDiscoveryClientConfiguration {
	//提供DiscoveryClient
	@Bean
	public DiscoveryClient nacosDiscoveryClient(
			NacosServiceDiscovery nacosServiceDiscovery) {
		return new NacosDiscoveryClient(nacosServiceDiscovery);
	}

	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",
			matchIfMissing = true)
	public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties,
			ObjectProvider<TaskScheduler> taskScheduler) {
		return new NacosWatch(nacosDiscoveryProperties, taskScheduler);
	}
    
    @Bean
	@ConditionalOnMissingBean
	public NacosServiceDiscovery nacosServiceDiscovery(
			NacosDiscoveryProperties discoveryProperties) {
		return new NacosServiceDiscovery(discoveryProperties);
	}
    

}

NamingService 接口

到此为止,我们基本上能通过编程的方式获取服务的地址了,但是我们没有显示的代码调用,这个是怎么做的呢?我猜测和ribbon 有关。再有我们看服务发现客户端是直接调用请求去注册中心查找的服务,但是当我们的注册中心关了之后,服务与服务之前还是可以连通的,这个能力,也就是路由缓存功能是谁提供的呢,肯定是由 nacos 提供的,这是因为subscribe 默认为true,这样的话,就会缓存,并通过udp和server 服务进行通讯,来获取最新的状态。udp的能力是有PushReceiver 提供的

@Override
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {

        ServiceInfo serviceInfo;
        if (subscribe) {
            //默认进入该条件
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }
        List<Instance> list;
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<Instance>();
        }
        return list;
    }
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
		//如果缓存中没有
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);

            serviceInfoMap.put(serviceObj.getKey(), serviceObj);

            updatingMap.put(serviceName, new Object());
            //调用http请求获取
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);

        } else if (updatingMap.containsKey(serviceName)) {

            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
		//开启定时任务
        scheduleUpdateIfAbsent(serviceName, clusters);

        return serviceInfoMap.get(serviceObj.getKey());
    }

根据架构图我们了解到,会有一个udp 去和server 进行通讯,来感知服务这就是PushReceiver

public class PushReceiver implements Runnable {
    @Override
    public void run() {
        while (true) {
            try {
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

                udpSocket.receive(packet);

                String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
			
                PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
                String ack;
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                    //调用hostReactor  方法进行对账并存储到 serviceInfoMap 集合中
                    hostReactor.processServiceJSON(pushPacket.data);

                    // send ack to server
                    ack = "{\"type\": \"push-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
                } else if ("dump".equals(pushPacket.type)) {
                    // dump data to server
                    ack = "{\"type\": \"dump-ack\""
                        + ", \"lastRefTime\": \"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\""
                        + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
                        + "\"}";
                } else {
                    // do nothing send ack only
                    ack = "{\"type\": \"unknown-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
                }

                udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
                    ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
    }
}
    //在自己的构造函数中开启自己
 public PushReceiver(HostReactor hostReactor) {
        try {
            this.hostReactor = hostReactor;
            udpSocket = new DatagramSocket();

            executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    return thread;
                }
            });

            executorService.execute(this);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] init udp socket failed", e);
        }
    }

ribbon如何使用我

直接调用我的 namingservice 方法即可

学习资料推广

我已经将springamqp 源码解析录制为视频上传到bibi,分为六个章节详细介绍了各个模块的具体内容

https://www.bilibili.com/video/BV1hN411Z7fn?share_source=copy_web

感兴趣的小伙伴可以看看
学习一下
录制不易,记得三联哦!

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章           查看所有文章
加:2021-08-08 13:34:31  更:2021-08-08 13:34:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/10 9:17:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码