IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Spring Cloud Alibaba组件原理分析篇1_Nacos 原理分析 -> 正文阅读

[Java知识库]Spring Cloud Alibaba组件原理分析篇1_Nacos 原理分析

目录

  • Nacos概述
  • 微服务的注册与发现

一、Nacos概述

1.Nacos是什么

What does it do

Nacos (official site: nacos.io) is an easy-to-use platform designed for dynamic service discovery(动态服务发现) and configuration(动态配置) and service management.(服务管理) It helps you to build cloud native applications and microservices platform easily.
Service is a first-class citizen in Nacos. (服务是Nacos的公民)Nacos supports almost all type of services,for example,Dubbo/gRPC service, Spring Cloud RESTFul service or Kubernetes service.

Nacos provides four major functions.

Service Discovery and Service Health Check 服务发现与服务健康

Nacos makes it simple for services to register themselves and to discover other services via a DNS or HTTP interface. Nacos also provides real-time health checks of services to prevent sending requests to unhealthy hosts or service instances.

Dynamic Configuration Management 动态配置管理

Dynamic Configuration Service allows you to manage configurations of all services in a centralized and dynamic manner across all environments. Nacos eliminates 消除 the need to redeploy重发布 applications and services when configurations are updated, which makes configuration changes more efficient and agile.

Dynamic DNS Service 动态DNS路由服务

Nacos supports weighted routing, making it easier for you to implement mid-tier load balancing, flexible routing policies, flow control, and simple DNS resolution services in the production environment within your data center. It helps you to implement DNS-based service discovery easily and prevent applications from coupling to vendor-specific service discovery APIs.

Service and MetaData Management 服务和元数据管理

Nacos provides an easy-to-use service dashboard 控制面板 to help you manage your services metadata 元数据, configuration 配置, kubernetes DNS, service health 服务健康 and metrics statistics指标统计.

官网:https://nacos.io/zh-cn/docs/what-is-nacos.html有很详细点的QuickStrart,下载一个nacos-server然后执行指令即可开启Nacos服务

GitHub地址:https://github.com/alibaba/nacos,这个地址是下载的源码文件,他是一个Maven构建的SpringBoot工程,包含Nacos-server和Nacos-client

在这里插入图片描述

2.Nacos中相关概念

  • 地域
    物理的数据中心,资源创建成功后不能更换。

  • 可用区
    同一地域内,电力和网络互相独立的物理区域。同一可用区内,实例的网络延迟较低。

  • 接入点
    地域的某个服务的入口域名。

  • 命名空间
    用于进行租户粒度的配置隔离。不同的命名空间下,可以存在相同的 Group 或 Data ID 的配置。Namespace 的常用场景之一是不同环境的配置的区分隔离,例如开发测试环境和生产环境的资源(如配置、服务)隔离等。

  • 配置
    在系统开发过程中,开发者通常会将一些需要变更的参数、变量等从代码中分离出来独立管理,以独立的配置文件的形式存在。目的是让静态的系统工件或者交付物(如 WAR,JAR 包等)更好地和实际的物理运行环境进行适配。配置管理一般包含在系统部署的过程中,由系统管理员或者运维人员完成。配置变更是调整系统运行时的行为的有效手段。

  • 配置管理
    系统配置的编辑、存储、分发、变更管理、历史版本管理、变更审计等所有与配置相关的活动。

  • 配置项
    一个具体的可配置的参数与其值域,通常以 param-key=param-value 的形式存在。例如我们常配置系统的日志输出级别(logLevel=INFO|WARN|ERROR) 就是一个配置项。

  • 配置集
    一组相关或者不相关的配置项的集合称为配置集。在系统中,一个配置文件通常就是一个配置集,包含了系统各个方面的配置。例如,一个配置集可能包含了数据源、线程池、日志级别等配置项。

  • 配置集 ID
    Nacos 中的某个配置集的 ID。配置集 ID 是组织划分配置的维度之一。Data ID 通常用于组织划分系统的配置集。一个系统或者应用可以包含多个配置集,每个配置集都可以被一个有意义的名称标识。Data ID 通常采用类 Java 包(如 com.taobao.tc.refund.log.level)的命名规则保证全局唯一性。此命名规则非强制。

  • 配置分组
    Nacos 中的一组配置集,是组织配置的维度之一。通过一个有意义的字符串(如 Buy 或 Trade )对配置集进行分组,从而区分 Data ID 相同的配置集。当您在 Nacos 上创建一个配置时,如果未填写配置分组的名称,则配置分组的名称默认采用 DEFAULT_GROUP 。配置分组的常见场景:不同的应用或组件使用了相同的配置类型,如 database_url 配置和 MQ_topic 配置。

  • 配置快照
    Nacos 的客户端 SDK 会在本地生成配置的快照。当客户端无法连接到 Nacos Server 时,可以使用配置快照显示系统的整体容灾能力。配置快照类似于 Git 中的本地 commit,也类似于缓存,会在适当的时机更新,但是并没有缓存过期(expiration)的概念。

  • 服务
    通过预定义接口网络访问的提供给客户端的软件功能。

  • 服务名
    服务提供的标识,通过该标识可以唯一确定其指代的服务。

  • 服务注册中心
    存储服务实例和服务负载均衡策略的数据库。

  • 服务发现
    在计算机网络上,(通常使用服务名)对服务下的实例的地址和元数据进行探测,并以预先定义的接口提供给客户端进行查询。

  • 元信息
    Nacos数据(如配置和服务)描述信息,如服务版本、权重、容灾策略、负载均衡策略、鉴权配置、各种自定义标签 (label),从作用范围来看,分为服务级别的元信息、集群的元信息及实例的元信息。

  • 应用
    用于标识服务提供方的服务的属性。

  • 服务分组
    不同的服务可以归类到同一分组。

  • 虚拟集群
    同一个服务下的所有服务实例组成一个默认集群, 集群可以被进一步按需求划分,划分的单位可以是虚拟集群。

  • 实例
    提供一个或多个服务的具有可访问网络地址(IP:Port)的进程。

  • 权重
    实例级别的配置。权重为浮点数。权重越大,分配给该实例的流量越大。

  • 健康检查
    以指定方式检查服务下挂载的实例 (Instance) 的健康度,从而确认该实例 (Instance) 是否能提供服务。根据检查结果,实例 (Instance) 会被判断为健康或不健康。对服务发起解析请求时,不健康的实例 (Instance) 不会返回给客户端。

  • 健康保护阈值
    为了防止因过多实例 (Instance) 不健康导致流量全部流向健康实例 (Instance) ,继而造成流量压力把健康实例 (Instance) 压垮并形成雪崩效应,应将健康保护阈值定义为一个 0 到 1 之间的浮点数。当域名健康实例数 (Instance) 占总服务实例数 (Instance) 的比例小于该值时,无论实例 (Instance) 是否健康,都会将这个实例 (Instance) 返回给客户端。这样做虽然损失了一部分流量,但是保证了集群中剩余健康实例 (Instance) 能正常工作。

二、微服务注册与发现

1.Nacos-client提供注册接口原理

服务注册的原理分析

分析微服务模块是如何借助nacos-client提供的接口完成注册的?以及注册的时机?
在这里插入图片描述

Nacos是一个SpringBoot工程,其他微服务模块也是一个SpringBoot工程,当其他微服务模块引入对应的服务注册与发现的依赖之后

<dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
            <version>2.2.5.RELEASE</version>
        </dependency>

查看该依赖,发现会引入nacos-client、nacos-api等依赖
在这里插入图片描述

在这里插入图片描述

然后在服务启动的时候,会调用nacos-client中提供的OpenAPI接口,像nacos-server发起http请求完成服务注册,OpenAPI注册接口举栗

在这里插入图片描述

服务注册的时机分析

当微服务启动的时候就要像nacos-server注册,此处探究的是什么时候完成OpenAPI注册接口的调用的?

分析入口从自动装配开始,自动装配原理参考往期博客:《Spring源码深度解析 郝佳 第2版》SpringBoot体系分析、Starter的原理,导入相应的依赖starter依赖之后,内部包含一个spring.factories文件,该文件内容

在这里插入图片描述

NacosServiceRegistryAutoConfiguration类以及核心的四个类

微服务注册的自动配置类,发现内部主要注册了3个bean,主要涉及的关键类

  • NacosDiscoveryProperties :承载配置文件的实例
  • NacosServiceRegistry :实现ServiceRegistry接口,内部包含真正的注册register方法,需要拿到参数NacosRegistration
  • NacosRegistration :实现Registration接口,内部继承ServiceInstance接口,包含getInstanceId、getSrviceId、getHost、getPort等方法
  • NacosAutoServiceRegistration :继承AbstractAutoServiceRegistration完成注册的核心实现,AbstractAutoServiceRegistration实现了ApplicationListener接口重写onApplicationEvent方法,当监听到NacosAutoServiceRegistration 实例创建的时候会调用相应的bind—>start—>register—>NacosServiceRegistry.register—>NacosNamingService.registerInstance—>NamingProxy.registerService,最后完成配置文件的填充然后注册,注册的话就是调用reqApi—>委托NacosRestTemplate.exchangeForm---->this.requestClient().execute(uri, httpMethod, requestEntity)完成http请求
// 标注配置类
@Configuration(
    proxyBeanMethods = false
)

@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
// 是否开启自动注册,缺省开启
@ConditionalOnProperty(
    value = {"spring.cloud.service-registry.auto-registration.enabled"},
    matchIfMissing = true
)
@AutoConfigureAfter({AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class})

public class NacosServiceRegistryAutoConfiguration {
    public NacosServiceRegistryAutoConfiguration() {
    }

	// 1. 需要注册的serviceDiscovery封装,内部封装 NacosDiscoveryProperties 
    @Bean
    public NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
        return new NacosServiceRegistry(nacosDiscoveryProperties);
    }

	// 2. NacosRegistration 
    @Bean
    @ConditionalOnBean({AutoServiceRegistrationProperties.class})
    public NacosRegistration nacosRegistration(ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {
        return new NacosRegistration((List)registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);
    }

	// 3. NacosAutoServiceRegistration 
    @Bean
    @ConditionalOnBean({AutoServiceRegistrationProperties.class})
    public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
        return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
    }
}

// NacosDiscoveryProperties配置信息的封装bean
@ConfigurationProperties("spring.cloud.nacos.discovery")
public class NacosDiscoveryProperties {
    private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryProperties.class);
    public static final String PREFIX = "spring.cloud.nacos.discovery";
    private static final Pattern PATTERN = Pattern.compile("-(\\w)");
    private String serverAddr;
    private String username;
    private String password;
    private String endpoint;
    private String namespace;
    private long watchDelay = 30000L;
    private String logName;
    @Value("${spring.cloud.nacos.discovery.service:${spring.application.name:}}")
    private String service;
    private float weight = 1.0F;
    private String clusterName = "DEFAULT";
    private String group = "DEFAULT_GROUP";
    private String namingLoadCacheAtStart = "false";
    private Map<String, String> metadata = new HashMap();
    private boolean registerEnabled = true;
    private String ip;
    private String networkInterface = "";
    private int port = -1;
    private boolean secure = false;
    private String accessKey;
    private String secretKey;
    private Integer heartBeatInterval;
    private Integer heartBeatTimeout;
    private Integer ipDeleteTimeout;
    private boolean instanceEnabled = true;
    private boolean ephemeral = true;
	
	...
	...
}
// AbstractAutoServiceRegistration完成注册的核心实现

AbstractAutoServiceRegistration实现了ApplicationListener接口重写onApplicationEvent方法,当监听到NacosAutoServiceRegistration 实例创建的时候会调用相应的bind—>start—>register—>NacosServiceRegistry.register—>NacosNamingService.registerInstance—>NamingProxy.registerService,最后完成配置文件的填充然后注册,注册的话就是调用reqApi—>委托NacosRestTemplate.exchangeForm---->this.requestClient().execute(uri, httpMethod, requestEntity)完成http请求

// NamingProxy.registerService方法,内部封装注册service信息,调用reqApi完成接口调用
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
        Map<String, String> params = new HashMap(16);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("groupName", groupName);
        params.put("clusterName", instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
		// 像对应的服务注册OpenAPI发请求
        this.reqApi(UtilAndComs.nacosUrlInstance, params, "POST");
    }
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException {
        params.put("namespaceId", this.getNamespaceId());
        if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(this.nacosDomain)) {
            throw new NacosException(400, "no server available");
        } else {
            NacosException exception = new NacosException();
            if (StringUtils.isNotBlank(this.nacosDomain)) {
                int i = 0;

                while(i < this.maxRetry) {
                    try {
                        return this.callServer(api, params, body, this.nacosDomain, method);
                    } catch (NacosException var12) {
                        exception = var12;
                        if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
                            LogUtils.NAMING_LOGGER.debug("request {} failed.", this.nacosDomain, var12);
                        }

                        ++i;
                    }
                }
            } else {
                Random random = new Random(System.currentTimeMillis());
                int index = random.nextInt(servers.size());
                int i = 0;

                while(i < servers.size()) {
                    String server = (String)servers.get(index);

                    try {
                    	// 调用接口
                        return this.callServer(api, params, body, server, method);
                    } catch (NacosException var13) {
                        exception = var13;
                        if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
                            LogUtils.NAMING_LOGGER.debug("request {} failed.", server, var13);
                        }

                        index = (index + 1) % servers.size();
                        ++i;
                    }
                }
            }

            LogUtils.NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", new Object[]{api, servers, exception.getErrCode(), exception.getErrMsg()});
            throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
        }
    }

后续就是委托NacosRestTemplate.exchangeForm---->this.requestClient().execute(uri, httpMethod, requestEntity)完成http请求

2.Nacos-server的服务注册原理

nacos-server接收到服务注册请求处理过程及原理,在naming模块下可以看到nacos-server处理注册、心跳等请求的controllers

在这里插入图片描述

// 2.1服务注册的接口实现

在InstanceController中,可以看到register方法

// InstanceController

 @CanDistro
    @PostMapping
    @Secured(action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        final Instance instance = HttpRequestInstanceBuilder.newBuilder()
                .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
        // 委托ServiceManager完成注册
        getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

// ServiceManager
/**
     * Register an instance to a service in AP mode.
     *
     * <p>This method creates service or cluster silently if they don't exist.
     *
     * @param namespaceId id of namespace
     * @param serviceName service name
     * @param instance    instance to register
     * @throws Exception any error occurred in the process
     */
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        
        // 1. 初始化一个空服务结构,便于存放服务的信息Service
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        
        Service service = getService(namespaceId, serviceName);
        
        checkServiceIsNull(service, namespaceId, serviceName);
        // 2. 真正的服务注册,注册Instance
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }
// 2.2服务注册的容器实现
// 1. 初始化一个空服务结构,一个Map嵌套Map结构,便于存放服务整体信息Service
  • // 外层map的key可以理解为环境,如dev、prod、test等,对应namespace
  • // 内层的key可以理解为group,如订单服务的组,每个组中可以包含集群部署的微服务,对应groupName
  • // 内层的value可以理解为就是各个的微服务模块Service信息,对应name,其中每个Service又可以对应多个Instance实例

// Service
public class Service implements Serializable {
    
    private static final long serialVersionUID = -3470985546826874460L;
    
    /**
     * service name.
     */
    private String name;
    
    /**
     * protect threshold.
     */
    private float protectThreshold = 0.0F;
    
    /**
     * application name of this service.
     */
    private String appName;
    
    /**
     * Service group to classify services into different sets.
     */
    private String groupName;
    
    private Map<String, String> metadata = new HashMap<String, String>();
}
    
// ServiceManager
 public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            
            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();
            
            // 像服务注册的容器put,是一个Map嵌套Map结构
            // 委托putService
            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }
// ServiceManager

private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

public void putService(Service service) {
        if (!serviceMap.containsKey(service.getNamespaceId())) {
        	// 像容器中添加一个空服务
            serviceMap.putIfAbsent(service.getNamespaceId(), new ConcurrentSkipListMap<>());
        }
        serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
    }
// 2. 真正的服务注册,注册服务每个Instance实例

每个微服务实例需要注册进去的信息由Instance封装,属性有

  • instanceId
  • ip
  • port
  • weight = 1.0D
  • healthy = true
  • enabled = true
  • ephemeral = true
  • clusterName
  • serviceName
  • metadata
public class Instance implements Serializable {
    
    private static final long serialVersionUID = -742906310567291979L;
    
    /**
     * unique id of this instance.
     */
    private String instanceId;
    
    /**
     * instance ip.
     */
    private String ip;
    
    /**
     * instance port.
     */
    private int port;
    
    /**
     * instance weight.
     */
    private double weight = 1.0D;
    
    /**
     * instance health status.
     */
    private boolean healthy = true;
    
    /**
     * If instance is enabled to accept request.
     */
    private boolean enabled = true;
    
    /**
     * If instance is ephemeral.
     *
     * @since 1.0.0
     */
    private boolean ephemeral = true;
    
    /**
     * cluster information of instance.
     */
    private String clusterName;
    
    /**
     * Service information of instance.
     */
    private String serviceName;
    
    /**
     * user extended attributes.
     */
    private Map<String, String> metadata = new HashMap<String, String>();
}
// ServiceManager

 public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
		// 生成一个key
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
		// 当前微服务整体信息
        Service service = getService(namespaceId, serviceName);

        synchronized (service) {
        	// 当前微服务下的多个实例
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
			// 放入map完成注册,委托DelegateConsistencyServiceImpl的put
            consistencyService.put(key, instances);
        }
    }
// DelegateConsistencyServiceImpl
 @Override
    public void put(String key, Record value) throws NacosException {
    
        mapConsistencyService(key).put(key, value);
    }

最终委托给DistroConsistencyServiceImpl的put。内部逻辑:

  • 在onPut方法完成真正的注册进map, 最后保存在DataStore中,他的value为Datum类型,datum.value属性为 T,可以包含Recod类型,也就是Instances类型转化的,对应多个Instance实例。然后将注册的实例信息放进一个同步阻塞队列之后就返回,因此Nacos速度很高
  • 在sync方法中完成Nacos集群中实例信息的同步
//  DistroConsistencyServiceImpl implements EphemeralConsistencyService
 @Override
    public void put(String key, Record value) throws NacosException {
    	// 1. 真正的放入map
        onPut(key, value);
        // If upgrade to 2.0.X, do not sync for v1.
        if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
            return;
        }
        // 2. 一般Nacos都是集群,集群同步信息
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                DistroConfig.getInstance().getSyncDelayMillis());
    }
//DistroConsistencyServiceImpl
// 1. onPut
public void onPut(String key, Record value) {
        
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        	
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            // 1.1 最后保存在DataStore中,datum的value属性为 T,可以包含Recod类型,也就是Instances类型转化的,对应多个Instance实例
            dataStore.put(key, datum);
        }
        
        if (!listeners.containsKey(key)) {
            return;
        }
        
        // 1.2 添加注册的实例到内部类 高速阻塞队列
        notifier.addTask(key, DataOperation.CHANGE);
    }
    

// 内部类Notifier 完成注册实例入队、出队消费等任务
  // 内部类Notifier 
public class Notifier implements Runnable {

        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
		// 同步阻塞队列,存放需要注册微服务实例
        private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);

        /**
         * Add new notify task to queue.
         *
         * @param datumKey data key
         * @param action   action for data
         */
	 // 1.2 添加注册的实例到内部类 高速阻塞队列
        public void addTask(String datumKey, DataOperation action) {

            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            // 入队
            tasks.offer(Pair.with(datumKey, action));
        }

        public int getTaskSize() {
            return tasks.size();
        }

		// 多线程消费队列中的实例,调用handle处理
        @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");

            for (; ; ) {
                try {
                	// 取出
                    Pair<String, DataOperation> pair = tasks.take();
                    // 消费
                    handle(pair);
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
		// 处理
        private void handle(Pair<String, DataOperation> pair) {
            try {
                String datumKey = pair.getValue0();
                DataOperation action = pair.getValue1();

                services.remove(datumKey);

                int count = 0;

                if (!listeners.containsKey(datumKey)) {
                    return;
                }

                for (RecordListener listener : listeners.get(datumKey)) {

                    count++;

                    try {
                        if (action == DataOperation.CHANGE) {
                        	// 真正的注册,委托Service完成注册
                            listener.onChange(datumKey, dataStore.get(datumKey).value);
                            continue;
                        }

                        if (action == DataOperation.DELETE) {
                            listener.onDelete(datumKey);
                            continue;
                        }
                    } catch (Throwable e) {
                        // 日志信息...
                    }
                }

                if (Loggers.DISTRO.isDebugEnabled()) {
                   // 日志信息...
                }
            } catch (Throwable e) {
                // 日志信息...
            }
        }
        
// 真正的注册,委托Service完成注册
// Service
@Override
    public void onChange(String key, Instances value) throws Exception {
        
        Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
        
        // 遍历
        for (Instance instance : value.getInstanceList()) {
            
            if (instance == null) {
                // Reject this abnormal instance list:
                throw new RuntimeException("got null instance " + key);
            }
            
            if (instance.getWeight() > 10000.0D) {
                instance.setWeight(10000.0D);
            }
            
            if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
                instance.setWeight(0.01D);
            }
        }
        // 注册
        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
        
        recalculateChecksum();
    }
// 异步消费同步阻塞队列实现实例注册的时机

Notifier实现Runnable接口,可以使用多线程处理注册实例,具体的时机就是在DistroConsistencyServiceImpl的init方法,他标注PostConstruct注解,会在Bean完成初始化之后自动调用

@PostConstruct
    public void init() {
    	// 将Notifier 实例交给定时任务线程池处理
        GlobalExecutor.submitDistroNotifyTask(notifier);
    }
// GlobalExecutor
public static void submitDistroNotifyTask(Runnable runnable) {
        DISTRO_NOTIFY_EXECUTOR.submit(runnable);
    }
// GlobalExecutor
private static final ScheduledExecutorService DISTRO_NOTIFY_EXECUTOR = ExecutorFactory.Managed
            .newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
                    new NameThreadFactory("com.alibaba.nacos.naming.distro.notifier"));
// 2.3服务下线的检测实现

在上面ServiceManager.createServiceIfAbsent---->putServiceAndInit内部会调用

  • putService,将服务注册的信息保存到Service中
  • init,将服务的状态更新,包括上线、下线,定时任务健康检测,默认5s执行一次检测遍历全部实例,使用当前系统时间减去上一次获取的心跳时间,若大于阈值默认15s,则更新状态为下线
private void putServiceAndInit(Service service) throws NacosException {
        putService(service);
        service = getService(service.getNamespaceId(), service.getName());
        // 内部创建定时任务,更新服务的状态信息
        service.init();
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }

// core.Service extends pojo.Service
 public void init() {
 		// 健康检测
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        // 集群递归检测
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }

定时检测源码

// HealthCheckReactor
 /**
     * Schedule client beat check task with a delay.
     *
     * @param task client beat check task
     */
    public static void scheduleCheck(BeatCheckTask task) {
        Runnable wrapperTask =
                task instanceof NacosHealthCheckTask ? new HealthCheckTaskInterceptWrapper((NacosHealthCheckTask) task)
                        : task;
        futureMap.computeIfAbsent(task.taskKey(),
                k -> GlobalExecutor.scheduleNamingHealth(wrapperTask, 5000, 5000, TimeUnit.MILLISECONDS));
    }
    
// 定时执行
public static ScheduledFuture<?> scheduleNamingHealth(Runnable command, long initialDelay, long delay,
            TimeUnit unit) {
        return NAMING_HEALTH_EXECUTOR.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }

3. 服务心跳检测原理分析

AbstractAutoServiceRegistration#bind—>start—>register—>NacosServiceRegistry.register—>NacosNamingService.registerInstance方法中,发现一个判断临时实例和封装BeatInfo实例的过程

// NacosServiceRegistry
public void register(Registration registration) {
        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
        } else {
            NamingService namingService = this.namingService();
            String serviceId = registration.getServiceId();
            String group = this.nacosDiscoveryProperties.getGroup();
            // 获取当前service的信息封装成instance实例
            Instance instance = this.getNacosInstanceFromRegistration(registration);

            try {
            	// 注册
                namingService.registerInstance(serviceId, group, instance);
                log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
            } catch (Exception var7) {
                log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});
                ReflectionUtils.rethrowRuntimeException(var7);
            }

        }
    }
 // NamingService
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        // 判断是不是临时实例,默认是的,也就是不将service持久化到磁盘
        if (instance.isEphemeral()) {
        	// 封装心跳信息
            BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);
            // 进行心跳发送任务设置
            this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }

        this.serverProxy.registerService(groupedServiceName, groupName, instance);
    }

// 封装心跳信息
// BeanReactor
 public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(groupedServiceName);
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        // 默认服务像nacos发送心跳的时间间隔
        beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
        return beatInfo;
    }
// 线程池定时发送心跳请求

// 添加定时发送心跳信息
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {
            existBeat.setStopped(true);
        }

        this.dom2Beat.put(key, beatInfo);
        // 线程池,执行BeatTask任务,它是一个Runnable接口
        this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
    }

// BeatTask 类
class BeatTask implements Runnable {
        BeatInfo beatInfo;

        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }

        public void run() {
            if (!this.beatInfo.isStopped()) {
                long nextTime = this.beatInfo.getPeriod();

                try {
                    JsonNode result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);
                    long interval = result.get("clientBeatInterval").asLong();
                    boolean lightBeatEnabled = false;
                    if (result.has("lightBeatEnabled")) {
                        lightBeatEnabled = result.get("lightBeatEnabled").asBoolean();
                    }

                    BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                    if (interval > 0L) {
                        nextTime = interval;
                    }

                    int code = 10200;
                    if (result.has("code")) {
                        code = result.get("code").asInt();
                    }

                    if (code == 20404) {
                        Instance instance = new Instance();
                        instance.setPort(this.beatInfo.getPort());
                        instance.setIp(this.beatInfo.getIp());
                        instance.setWeight(this.beatInfo.getWeight());
                        instance.setMetadata(this.beatInfo.getMetadata());
                        instance.setClusterName(this.beatInfo.getCluster());
                        instance.setServiceName(this.beatInfo.getServiceName());
                        instance.setInstanceId(instance.getInstanceId());
                        instance.setEphemeral(true);

                        try {
                            BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance);
                        } catch (Exception var10) {
                        }
                    }
                } catch (NacosException var11) {
                    LogUtils.NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", new Object[]{JacksonUtils.toJson(this.beatInfo), var11.getErrCode(), var11.getErrMsg()});
                }

                BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
            }
        }
    }
// 心跳发送接口

在这里插入图片描述

之后才是NamingProxy.registerService完成配置文件的填充然后注册,注册的话就是调用reqApi—>委托NacosRestTemplate.exchangeForm---->this.requestClient().execute(uri, httpMethod, requestEntity)完成http请求

4. 服务发现原理分析

入口NacosDiscoveryAutoConfiguration类服务发现自动配置类



// 声明配置类
@Configuration(
    proxyBeanMethods = false
)
// 服务自动注册发现开关 开启此配置类生效
@ConditionalOnDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {
    public NacosDiscoveryAutoConfiguration() {
    }

	// 1. 承载 NacosDiscoveryProperties配置信息的bean
    @Bean
    @ConditionalOnMissingBean
    public NacosDiscoveryProperties nacosProperties() {
        return new NacosDiscoveryProperties();
    } 
	// 2. 构建一个将要注册的ServiceDiscovery
	// 参数为上面的NacosDiscoveryProperties  和 NacosServiceManager
    @Bean
    @ConditionalOnMissingBean
    public NacosServiceDiscovery nacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties, NacosServiceManager nacosServiceManager) {
        return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);
    }
}

// 2. 构建一个将要注册的ServiceDiscovery

待更新…

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-04-26 11:28:38  更:2022-04-26 11:29:49 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 3:12:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码