IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 五、Nacos服务注册源码分析 -> 正文阅读

[Java知识库]五、Nacos服务注册源码分析

一、准备工作

在我们的微服务里,引入依赖,用于服务注册和发现

<dependency>
	<groupId>com.alibaba.cloud</groupId>
	<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

在配置文件中指定注册中心地址

spring:
  application:
    name: user-server  #微服务名称

  #配置nacos注册中心地址
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848

启动我们的nacos和微服务,我们得微服务是如何注册上去的?
在这里插入图片描述

二、源码分析

1、微服务端端

在我们依赖的nacos的jar包META-INF/spring.factories里我们看到没自动配置类

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\
com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\
com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\
com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\
com.alibaba.cloud.nacos.NacosServiceAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

在这里插入图片描述

注册相关的是NacosServiceRegistryAutoConfiguration配置类,它注册三个bean: NacosServiceRegistry,NacosRegistration和NacosAutoServiceRegistration。
NacosAutoServiceRegistration实现ApplicationListener接口

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
		matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
		AutoServiceRegistrationAutoConfiguration.class,
		NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

	@Bean
	public NacosServiceRegistry nacosServiceRegistry(
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosServiceRegistry(nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosRegistration nacosRegistration(
			ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
			NacosDiscoveryProperties nacosDiscoveryProperties,
			ApplicationContext context) {
		return new NacosRegistration(registrationCustomizers.getIfAvailable(),
				nacosDiscoveryProperties, context);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,
				autoServiceRegistrationProperties, registration);
	}

}

spring容器启动过程中核心方法:finishRefresh()->getLifecycleProcessor().onRefresh();
WebServerStartStopLifecycle(实现SmartLifecycle接口)会发布 ServletWebServerInitializedEvent事件。
NacosAutoServiceRegistration的onApplicationEvent方法处理WebServerInitializedEvent事件。

AbstractAutoServiceRegistration实现ApplicationListener

public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
...
}

NacosAutoServiceRegistration接收事件

public void onApplicationEvent(WebServerInitializedEvent event) {
	this.bind(event);
}

bind方法会调用start()

public void bind(WebServerInitializedEvent event) {
	ApplicationContext context = event.getApplicationContext();
	if(!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
		this.port.compareAndSet(0, event.getWebServer().getPort());
		//启动
		this.start();
	}
}

start方法调用register()

public void start() {
	if(!this.isEnabled()) {
		if(logger.isDebugEnabled()) {
			logger.debug("Discovery Lifecycle disabled. Not starting");
		}

	} else {
		if(!this.running.get()) {
			this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
			//注册
			this.register();
			if(this.shouldRegisterManagement()) {
				this.registerManagement();
			}

			this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
			this.running.compareAndSet(false, true);
		}

	}
}

register()方法

protected void register() {
    //用到了前面注册的bean
    //this.serviceRegistry是NacosServiceRegistry
	//this.getRegistration()是NacosRegistration
	this.serviceRegistry.register(this.getRegistration());
}

NacosServiceRegistry的register方法

@Override
public void register(Registration registration) {

	if (StringUtils.isEmpty(registration.getServiceId())) {
		log.warn("No service to register for nacos client...");
		return;
	}

	NamingService namingService = namingService();
	String serviceId = registration.getServiceId();
	String group = nacosDiscoveryProperties.getGroup();

	Instance instance = getNacosInstanceFromRegistration(registration);

	try {
	    //注册实例
		namingService.registerInstance(serviceId, group, instance);
		log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
				instance.getIp(), instance.getPort());
	}
	catch (Exception e) {
		log.error("nacos registry, {} register failed...{},", serviceId,
				registration.toString(), e);
		// rethrow a RuntimeException if the registration is failed.
		// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
		rethrowRuntimeException(e);
	}
}

registerInstance方法,注册心跳和服务

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
	NamingUtils.checkInstanceIsLegal(instance);
	String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
	if (instance.isEphemeral()) {
	    //注册心跳任务
		BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
		beatReactor.addBeatInfo(groupedServiceName, beatInfo);
	}
	//注册服务
	serverProxy.registerService(groupedServiceName, groupName, instance);
}

addBeatInfo注册心跳定时任务

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
	NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
	String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
	BeatInfo existBeat = null;
	//fix #1733
	if ((existBeat = dom2Beat.remove(key)) != null) {
		existBeat.setStopped(true);
	}
	dom2Beat.put(key, beatInfo);
	//线程池执行心跳任务,默认延时5秒执行,BeatTask内run方法再次执行心跳定时(嵌套)/nacos/v1/ns/instance/beat PUT
	executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
	MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

registerService方法,http调用nacos服务的/nacos/v1/ns/instance的POST方法接口


public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        
	NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
			instance);
	
	final Map<String, String> params = new HashMap<String, String>(16);
	params.put(CommonParams.NAMESPACE_ID, namespaceId);
	params.put(CommonParams.SERVICE_NAME, serviceName);
	params.put(CommonParams.GROUP_NAME, groupName);
	params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
	params.put("ip", instance.getIp());
	params.put("port", String.valueOf(instance.getPort()));
	params.put("weight", String.valueOf(instance.getWeight()));
	params.put("enable", String.valueOf(instance.isEnabled()));
	params.put("healthy", String.valueOf(instance.isHealthy()));
	params.put("ephemeral", String.valueOf(instance.isEphemeral()));
	params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
	//组织参数信息,请求地址:  /nacos/v1/ns/instance POST
	reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
	
}

总结:
1、自动配置类导入AbstractAutoServiceRegistration,实现ApplicationListener
2、注册心跳定时任务,默认每5秒执行一次,调用nacos的/nacos/v1/ns/instance/beat的PUT接口
3、注册服务信息,调用/nacos/v1/ns/instance的POST方法接口
在这里插入图片描述

2、Nacos端

找到服务注册接口:/v1/ns/instance的POST方法,在naming模块的InstanceController。

解析request获取Instance信息

@CanDistro
@PostMapping
@Secured(action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
	//namespaceId 命名空间
	final String namespaceId = WebUtils
			.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
	//group@@applicationName形式		
	final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
	//校验格式 :group@@applicationName
	NamingUtils.checkServiceNameFormat(serviceName);
	
	//解析request获取Instance信息
	final Instance instance = HttpRequestInstanceBuilder.newBuilder()
			.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
	getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
	return "ok";
}

InstanceOperatorServiceImpl.registerInstance方法

@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
	com.alibaba.nacos.naming.core.Instance coreInstance = parseInstance(instance);
	//注册实例
	serviceManager.registerInstance(namespaceId, serviceName, coreInstance);
}

ServiceManager.registerInstance方法:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
	//创建空服务
	createEmptyService(namespaceId, serviceName, instance.isEphemeral());
	
	Service service = getService(namespaceId, serviceName);
	
	checkServiceIsNull(service, namespaceId, serviceName);
	//将实例加入内存注册表
	addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

createEmptyService方法调用createServiceIfAbsent。

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
	createServiceIfAbsent(namespaceId, serviceName, local, null);
}

createServiceIfAbsent方法:

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
	//从注册表中获得Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>()		
	Service service = getService(namespaceId, serviceName);
	if (service == null) {
		
		Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
		service = new Service();
		service.setName(serviceName);
		service.setNamespaceId(namespaceId);
		service.setGroupName(NamingUtils.getGroupName(serviceName));
		// now validate the service. if failed, exception will be thrown
		service.setLastModifiedMillis(System.currentTimeMillis());
		service.recalculateChecksum();
		if (cluster != null) {
			cluster.setService(service);
			service.getClusterMap().put(cluster.getName(), cluster);
		}
		service.validate();
		//加入注册表和初始化
		putServiceAndInit(service);
		if (!local) {
			addOrReplaceService(service);
		}
	}
}

getService方法从注册表中获得,Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();注册表是一个双层的ConcurrentHashMap

public Service getService(String namespaceId, String serviceName) {
	if (serviceMap.get(namespaceId) == null) {
		return null;
	}
	return chooseServiceMap(namespaceId).get(serviceName);
}

putServiceAndInit方法,加入注册表和初始化

private void putServiceAndInit(Service service) throws NacosException {
    //放入注册表
	putService(service);
	service = getService(service.getNamespaceId(), service.getName());
	//初始化
	service.init();
	consistencyService
			.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
	consistencyService
			.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
	Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

init初始化方法,绑定服务端连接客户端的心跳任务,默认每五秒执行一次:

public void init() {
    //每5秒绑定一个定时任务
	HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
	for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
		entry.getValue().setService(this);
		entry.getValue().init();
	}
}

clientBeatCheckTask的run方法,判断当前时间减去最后一次心跳时间大于设置的心跳超时时间,默认15s,则健康状态是false。
当前时间减去最后一次心跳时间大于服务剔除时间,默认30s,则剔除服务

@Override
public void run() {
	try {
		// If upgrade to 2.0.X stop health check with v1
		if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
			return;
		}
		if (!getDistroMapper().responsible(service.getName())) {
			return;
		}
		
		if (!getSwitchDomain().isHealthCheckEnabled()) {
			return;
		}
		
		List<Instance> instances = service.allIPs(true);
		
		// first set health status of instances:
		for (Instance instance : instances) {
		    //当前时间减去最后一次心跳时间大于设置的心跳超时时间,默认15s
			if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
				if (!instance.isMarked()) {
					if (instance.isHealthy()) {
					    //设置健康状态是false
						instance.setHealthy(false);
						Loggers.EVT_LOG
								.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
										instance.getIp(), instance.getPort(), instance.getClusterName(),
										service.getName(), UtilsAndCommons.LOCALHOST_SITE,
										instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
						getPushService().serviceChanged(service);
					}
				}
			}
		}
		
		if (!getGlobalConfig().isExpireInstance()) {
			return;
		}
		
		// then remove obsolete instances:
		for (Instance instance : instances) {
			
			if (instance.isMarked()) {
				continue;
			}
			 //当前时间减去最后一次心跳时间大于服务剔除时间,默认30s
			if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
				// delete instance
				Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
						JacksonUtils.toJson(instance));
				//剔除实例		
				deleteIp(instance);
			}
		}
		
	} catch (Exception e) {
		Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
	}
	
}

deleteIp 方法剔除服务,调用自己的/v1/ns/instance的DELETE方法:

private void deleteIp(Instance instance) {
	
	try {
		NamingProxy.Request request = NamingProxy.Request.newRequest();
		request.appendParam(FieldsConstants.IP, instance.getIp())
				.appendParam(FieldsConstants.PORT, String.valueOf(instance.getPort()))
				.appendParam(FieldsConstants.EPHEMERAL, EPHEMERAL)
				.appendParam(FieldsConstants.CLUSTER_NAME, instance.getClusterName())
				.appendParam(FieldsConstants.SERVICE_NAME, service.getName())
				.appendParam(FieldsConstants.NAME_SPACE_ID, service.getNamespaceId());
		//http调用自己的/v1/ns/instance DELETE
		String url = "http://" + InternetAddressUtil.localHostIP() + InternetAddressUtil.IP_PORT_SPLITER + EnvUtil
				.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
				+ UtilsAndCommons.NACOS_NAMING_INSTANCE_CONTEXT + "?" + request.toUrl();
		
		// delete instance asynchronously:
		HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() {
			@Override
			public void onReceive(RestResult<String> result) {
				if (!result.ok()) {
					Loggers.SRV_LOG
							.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
									instance.toJson(), result.getMessage(), result.getCode());
				}
			}
			
			@Override
			public void onError(Throwable throwable) {
				Loggers.SRV_LOG
						.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(),
								throwable);
			}
			
			@Override
			public void onCancel() {
			
			}
		});
		
	} catch (Exception e) {
		Loggers.SRV_LOG
				.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
	}
}

下面看下,addInstance方法,service的全量实例放入注册表:

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
	
	String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
	
	Service service = getService(namespaceId, serviceName);
	
	synchronized (service) {
		List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
		
		Instances instances = new Instances();
		instances.setInstanceList(instanceList);
		//service的全量实例放入注册表
		consistencyService.put(key, instances);
	}
}

DistroConsistencyServiceImpl.put方法:

@Override
public void put(String key, Record value) throws NacosException {
	mapConsistencyService(key).put(key, value);
}

mapConsistencyService方法

private ConsistencyService mapConsistencyService(String key) {
    //临时还是持久的,我们是临时的。
	return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}

DistroConsistencyServiceImpl.put临时的

@Override
public void put(String key, Record value) throws NacosException {
    //更新内存注册表
	onPut(key, value);
	// If upgrade to 2.0.X, do not sync for v1.
	if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
		return;
	}
	//同步到集群其他节点,除了自己
	distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
			DistroConfig.getInstance().getSyncDelayMillis());
}

onPut方法:

public void onPut(String key, Record value) {
	
	if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
		Datum<Instances> datum = new Datum<>();
		datum.value = (Instances) value;
		datum.key = key;
		datum.timestamp.incrementAndGet();
		dataStore.put(key, datum);
	}
	
	if (!listeners.containsKey(key)) {
		return;
	}
	//加入阻塞队列
	notifier.addTask(key, DataOperation.CHANGE);
}

大致调用流程:
在这里插入图片描述

什么时候会从队列取?

DistroConsistencyServiceImpl 的init方法执行:

@PostConstruct
public void init() {
	GlobalExecutor.submitDistroNotifyTask(notifier);
}
public static void submitDistroNotifyTask(Runnable runnable) {
	DISTRO_NOTIFY_EXECUTOR.submit(runnable);
}

Notifier.run方法,死循环获取任务

@Override
public void run() {
	Loggers.DISTRO.info("distro notifier started");
	//死循环处理
	for (; ; ) {
		try {
		    //阻塞队列里取
			Pair<String, DataOperation> pair = tasks.take();
			//处理
			handle(pair);
		} catch (Throwable e) {
			Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
		}
	}
}

handle方法处理tasks

private void handle(Pair<String, DataOperation> pair) {
	try {
		String datumKey = pair.getValue0();
		DataOperation action = pair.getValue1();
		
		services.remove(datumKey);
		
		int count = 0;
		
		if (!listeners.containsKey(datumKey)) {
			return;
		}
		
		for (RecordListener listener : listeners.get(datumKey)) {
			
			count++;
			
			try {
				if (action == DataOperation.CHANGE) {
				    //listener 就是Servcie实例
					listener.onChange(datumKey, dataStore.get(datumKey).value);
					continue;
				}
				
				if (action == DataOperation.DELETE) {
					listener.onDelete(datumKey);
					continue;
				}
			} catch (Throwable e) {
				Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
			}
		}
		
		if (Loggers.DISTRO.isDebugEnabled()) {
			Loggers.DISTRO
					.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
							datumKey, count, action.name());
		}
	} catch (Throwable e) {
		Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
	}
}

Service的onChange方法

@Override
public void onChange(String key, Instances value) throws Exception {
	
	Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
	
	for (Instance instance : value.getInstanceList()) {
		
		if (instance == null) {
			// Reject this abnormal instance list:
			throw new RuntimeException("got null instance " + key);
		}
		
		if (instance.getWeight() > 10000.0D) {
			instance.setWeight(10000.0D);
		}
		
		if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
			instance.setWeight(0.01D);
		}
	}
	//更新到内存注册表
	updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
	
	recalculateChecksum();
}

updateIps方法

updateIps(List<Instance> ips, boolean ephemeral) {
   
    //老的实例     
	Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
	
	//临时map
	HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
	
	for (Instance ip : toUpdateInstances) {
		oldIpMap.put(ip.getDatumKey(), ip);
	}
	//更新实例list
	List<Instance> updatedIps = updatedIps(ips, oldIpMap.values());
	if (updatedIps.size() > 0) {
		for (Instance ip : updatedIps) {
			Instance oldIP = oldIpMap.get(ip.getDatumKey());
			
			// do not update the ip validation status of updated ips
			// because the checker has the most precise result
			// Only when ip is not marked, don't we update the health status of IP:
			if (!ip.isMarked()) {
				ip.setHealthy(oldIP.isHealthy());
			}
			
			if (ip.isHealthy() != oldIP.isHealthy()) {
				// ip validation status updated
				Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
						(ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
			}
			
			if (ip.getWeight() != oldIP.getWeight()) {
				// ip validation status updated
				Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP, ip);
			}
		}
	}
	//新增的
	List<Instance> newIPs = subtract(ips, oldIpMap.values());
	if (newIPs.size() > 0) {
		Loggers.EVT_LOG
				.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
						getName(), newIPs.size(), newIPs);
		
		for (Instance ip : newIPs) {
			HealthCheckStatus.reset(ip);
		}
	}
	//剔除的实例
	List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
	
	if (deadIPs.size() > 0) {
		Loggers.EVT_LOG
				.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
						getName(), deadIPs.size(), deadIPs);
		
		for (Instance ip : deadIPs) {
			HealthCheckStatus.remv(ip);
		}
	}
	
	toUpdateInstances = new HashSet<>(ips);
	//设置实例
	if (ephemeral) {
		ephemeralInstances = toUpdateInstances;
	} else {
		persistentInstances = toUpdateInstances;
	}
}

更新完注册表还要发布事件,udp的方式推送给订阅端。
getPushService().serviceChanged(this)

@JsonIgnore
public UdpPushService getPushService() {
    //获取UdpPushService
	return ApplicationUtils.getBean(UdpPushService.class);
}

发布ServiceChangeEvent事件

public void serviceChanged(Service service) {
	this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}

大致流程:
在这里插入图片描述

nacos服务端总结:
1、调用/v1/ns/instance的POST方法
2、创建一个空service,初始化一个心跳任务,定时检查客户端的一个状态,默认超过15秒则健康状态设为false,超过30秒,则剔除服务。
3、对应的service的全量实例放入注册表,加入队列tasks
4、一个死循环的线程一直从队列tasks里取,处理更新完注册表,将服务变动以udp方式通知给订阅端的客户端。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-22 20:22:50  更:2022-03-22 20:26:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 7:15:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码