目录
前言
客户端服务注册
服务端服务注册
前言
本文只针对服务注册的相关源码进行介绍。整体的介绍过程从客户端到服务端进行相应的源码解析,Nacos的源码版本为1.2.1。
客户端服务注册
通过spring.factories可以找到服务注册的配置类为NacosServiceRegistryAutoConfiguration
com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration?
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(nacosDiscoveryProperties, context);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
?其中实现了服务注册功能的类为NacosAutoServiceRegistration。从这个类中找不出相应服务注册的入口,那么直接到它的父类AbstractAutoServiceRegistration去探索。
org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration?
public abstract class AbstractAutoServiceRegistration<R extends Registration>
implements AutoServiceRegistration, ApplicationContextAware,
ApplicationListener<WebServerInitializedEvent>
从AbstractAutoServiceRegistration类中可以看到它实现了一个特殊的接口ApplicationListener。这个接口可以用来监听容器中发布的事件,在这里可以看出他监听的是WebServerInitializedEvent事件,这个事件会在应用上下文刷新后和WebServer已经准备好之后被发布。回过头去看ApplicationListener,这个接口需要实现onApplicationEvent方法,这个方法就是服务注册的入口
org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#onApplicationEvent
public void onApplicationEvent(WebServerInitializedEvent event) {
this.bind(event);
}
从bind方法进去后最终会到NacosServiceRegistry类的register方法,这个方法即为服务注册方法?
com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
// 获取服务id(spring.application.name)
String serviceId = registration.getServiceId();
// 获取group(spring.cloud.nacos.discovery.group,默认为DEFAULT_GROUP)
String group = nacosDiscoveryProperties.getGroup();
// 创建实例对象
Instance instance = getNacosInstanceFromRegistration(registration);
try {
// 这里面的源码主要做了两件事:1.发送心跳 2.实例注册
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
// rethrow a RuntimeException if the registration is failed.
// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}
com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(java.lang.String, java.lang.String, com.alibaba.nacos.api.naming.pojo.Instance)
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// 是否为临时实例,默认为true
if (instance.isEphemeral()) {
// 创建心跳对象
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
// 这里面就是一个定时任务向服务端发送心跳请求(时间间隔为preserved.heart.beat.interval,默认5s),api路径:/nacos/v1/ns/instance/beat
this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
// 向服务端发送服务注册请求,api路径:/nacos/v1/ns/instance
this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
这里的发送心跳定时任务里面大概的逻辑就是调用服务端的api,如果发现找不到实例就会进行服务注册,这块先暂时不细讲,放在后续心跳检测章节进行介绍。
发送心跳检测请求和发送服务注册请求都是调用服务端api,其源码在com.alibaba.nacos.client.naming.net.NamingProxy#reqAPI(java.lang.String, java.util.Map<java.lang.String,java.lang.String>, java.lang.String, java.util.List<java.lang.String>, java.lang.String)
public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
// 随机挑选一台Nacos服务进行调用
String server = servers.get(index);
try {
// 这里面就是利用http请求进行api调用
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
// 调用失败直接挑选下一台
index = (index + 1) % servers.size();
}
}
// 调用失败,会执行该备选方案进行对服务端的请求(这里只对配置一台Nacos服务的情况才生效执行,原因是因为nacosDomain变量在Nacos服务列表大小为1的时候才进行赋值)
if (StringUtils.isNotBlank(nacosDomain)) {
// 对这一台Nacos服务进行请求,最多重试3次
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
}
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}",
api, servers, exception.getErrCode(), exception.getErrMsg());
throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
}
服务端服务注册
在介绍服务端的注册流程之前先了解下Nacos的服务注册表结构
/**
* Map<namespace, Map<group::serviceName, Service>>
*/
// Nacos的服务注册表结构为双层Map,其中第一层Map的key为namespaceId,第二层Map的key为groupName@@serviceName
private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
// Service中有不同的集群Cluster
private Map<String, Cluster> clusterMap = new HashMap<>();
// 集群Cluster中有持久化实例集合与临时实例集合
@JSONField(serialize = false)
private Set<Instance> persistentInstances = new HashSet<>();
@JSONField(serialize = false)
private Set<Instance> ephemeralInstances = new HashSet<>();
?对应图示的话如下图所示:
?客户端调用服务端进行服务注册的api路径为/nacos/v1/ns/instance,从该路径可以找到服务端注册的入口为com.alibaba.nacos.naming.controllers.InstanceController#register
public String register(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 进行实例注册
serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request));
return "ok";
}
?com.alibaba.nacos.naming.core.ServiceManager#registerInstance
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 从服务注册表中获取服务,获取不到就创建新的服务放入注册表中
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 获取服务
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 实例注册
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
这里先从createEmptyService方法开始介绍,从该方法点进去可以看到其真正执行的方法为createServiceIfAbsent。
com.alibaba.nacos.naming.core.ServiceManager#createServiceIfAbsent?
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
// 从服务注册表中获取服务
Service service = getService(namespaceId, serviceName);
if (service == null) {
// 获取的服务为空则创建一个新的服务对象
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
// 对该服务以及其下的集群cluster进行一些名称的校验
service.validate();
// 将服务放入到服务注册表中、健康检查定时任务以及数据一致性监听
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
?com.alibaba.nacos.naming.core.ServiceManager#putServiceAndInit
private void putServiceAndInit(Service service) throws NacosException {
// 将服务放入服务注册表中
putService(service);
// 健康检测的定时任务
service.init();
// 临时实例的数据一致性监听
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
// 持久实例的数据一致性监听
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
}
接着看到getService方法,这个方法顾名思义就是去获取服务注册表中的服务,由于在第一步createEmptyService方法中已创建了对应的服务,因此该方法能直接获取到。直接跳到第三个方法addInstance。
com.alibaba.nacos.naming.core.ServiceManager#addInstance
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
// 生成服务唯一key
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 获取服务
Service service = getService(namespaceId, serviceName);
synchronized (service) {
// 获取该服务下当前所有的实例 + 新注册的实例
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
// 创建实例集合对象
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 保存实例
consistencyService.put(key, instances);
}
}
这个方法中比较重要的两个方法为addIpAddressed和consistencyService.put方法,先从addIpAddresses方法开始介绍。从该方法点进去可以看到真正的执行方法为updateIpAddresses
com.alibaba.nacos.naming.core.ServiceManager#updateIpAddresses?
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {
// 根据唯一key获取Datum对象
Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
// 获取当前所有的实例列表
List<Instance> currentIPs = service.allIPs(ephemeral);
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
Set<String> currentInstanceIds = Sets.newHashSet();
for (Instance instance : currentIPs) {
currentInstances.put(instance.toIPAddr(), instance);
currentInstanceIds.add(instance.getInstanceId());
}
Map<String, Instance> instanceMap;
if (datum != null) {
// 这里面的代码就是遍历Datum对象中存放的实例列表,判断当前的实例列表中是否存在该实例,若存在则将该实例的健康状态与最后一次心跳检测时间的值赋值成当前实例对应的值,将实例放入Map中返回
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
instanceMap = new HashMap<>(ips.length);
}
// 遍历新注册的实例
for (Instance instance : ips) {
// 判断该服务下的集群中是否含有该实例所属的集群,没有的话则新建并初始化
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(), service);
cluster.init();
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJSON());
}
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
// 实例移除事件,将实例去除
instanceMap.remove(instance.getDatumKey());
} else {
// 放入新注册的实例
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
instanceMap.put(instance.getDatumKey(), instance);
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: "
+ JSON.toJSONString(instanceMap.values()));
}
return new ArrayList<>(instanceMap.values());
}
接着介绍一下consistencyService.put方法。由于Nacos默认都是临时实例,临时实例走的是Distro协议,因此consistencyService.put方法真正的实现类为DistroConsistencyServiceImpl
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put
public void put(String key, Record value) throws NacosException {
// 将新注册的实例更新到服务注册表中
onPut(key, value);
// 同步最新的实例信息到其他服务节点
taskDispatcher.addTask(key);
}
这里先从onPut方法开始看,源码路径为com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#onPut
public void onPut(String key, Record value) {
// 判断该key是否为临时服务的key
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
// 创建Datum对象
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// 将Datum对象放到dataStore中
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
// 这里面的代码逻辑就是将一个变更事件放入到阻塞队列中。该方法在Notifier类中,Notifier类实现了Runnable,具体的实现逻辑在run方法中,下面会具体介绍
notifier.addTask(key, ApplyAction.CHANGE);
}
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#run
public void run() {
Loggers.DISTRO.info("distro notifier started");
while (true) {
try {
// 从阻塞队列中取出
Pair pair = tasks.take();
if (pair == null) {
continue;
}
String datumKey = (String) pair.getValue0();
ApplyAction action = (ApplyAction) pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
continue;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == ApplyAction.CHANGE) {
// 变更事件通知,里面具体的逻辑就是更新服务注册表中的实例
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == ApplyAction.DELETE) {
// 删除事件通知
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
?Notifier类在添加任务的时候事件为change事件,因此这里暂时只介绍change事件的实现逻辑,具体的源码路径
com.alibaba.nacos.naming.core.Service#onChange
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
// 对每个实例的权重进行校验并重新赋值
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
// 将实例更新到服务注册表中
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
com.alibaba.nacos.naming.core.Service#updateIPs
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
// 新建一个Map,大小为该服务下集群Map的大小
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
// 将该服务下所有的集群的名称作为key放入Map中,value为空列表
ipMap.put(clusterName, new ArrayList<>());
}
// 遍历所有实例(这里的instances为旧实例 + 新注册的实例)
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
if (StringUtils.isEmpty(instance.getClusterName())) {
// 实例的集群名称为空的话赋值一个默认值DEFAULT
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
// 判断当前服务下的集群Map中是否含有该实例所属的集群,没有的话新建集群并初始化
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJSON());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
// 根据实例的集群名称获取ipMap中的实例列表
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
// 获取为空则新建一个链表放入ipMap中
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
// 将实例放入链表中
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
// 更新每个集群中的实例集合
clusterMap.get(entry.getKey()).updateIPs(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
// 将该服务新的数据推送给客户端
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}",
getNamespaceId(), getName(), stringBuilder.toString());
}
com.alibaba.nacos.naming.core.Cluster#updateIPs
public void updateIPs(List<Instance> ips, boolean ephemeral) {
// 判断是否为临时实例,这里为true,取的是ephemeralInstances(临时实例集合)
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
// 新建一个Map,大小为上述的toUpdateInstances大小
HashMap<String, Instance> oldIPMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) {
// 遍历当前的实例放入
oldIPMap.put(ip.getDatumKey(), ip);
}
// 获取需要更新的实例
List<Instance> updatedIPs = updatedIPs(ips, oldIPMap.values());
if (updatedIPs.size() > 0) {
for (Instance ip : updatedIPs) {
Instance oldIP = oldIPMap.get(ip.getDatumKey());
// do not update the ip validation status of updated ips
// because the checker has the most precise result
// Only when ip is not marked, don't we update the health status of IP:
if (!ip.isMarked()) {
ip.setHealthy(oldIP.isHealthy());
}
if (ip.isHealthy() != oldIP.isHealthy()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}",
getService().getName(), (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
}
if (ip.getWeight() != oldIP.getWeight()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(), ip.toString());
}
}
}
// 找出新的实例列表中存在且旧的实例列表中不存在的实例(即新注册的实例)
List<Instance> newIPs = subtract(ips, oldIPMap.values());
if (newIPs.size() > 0) {
Loggers.EVT_LOG.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}",
getService().getName(), getName(), newIPs.size(), newIPs.toString());
for (Instance ip : newIPs) {
// 对该实例加入健康状态检查
HealthCheckStatus.reset(ip);
}
}
// 找出旧的实例列表中存在且新的实例列表中不存在的实例(即需要删除的实例)
List<Instance> deadIPs = subtract(oldIPMap.values(), ips);
if (deadIPs.size() > 0) {
Loggers.EVT_LOG.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}",
getService().getName(), getName(), deadIPs.size(), deadIPs.toString());
for (Instance ip : deadIPs) {
// 对该实例移除健康状态检查
HealthCheckStatus.remv(ip);
}
}
toUpdateInstances = new HashSet<>(ips);
if (ephemeral) {
// 进行实例列表的替换
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
在更新实例数据时很巧妙的设计在于运用到了CopyOnWrite的思想。从上面两个图可以看出在操作clusterMap和ephemeralInstances时都是新建一个副本出来操作完后再进行替换。运用读写分离思想的好处在于因为客户端在做服务发现的时候需要去获取服务端的服务注册表,服务端这里在注册的时候如果直接去操作注册表,那么可能在操作到一半的时候客户端已经把数据同步过去了,此时会出现脏数据,如果想要解决这个问题那必须得在注册前进行加锁,注册完后进行解锁,但是这样做性能会降低;那么如果以副本拷贝的形式进行操作最后再替换,这样可以虽然客户端可能会同步到旧的数据,但是不会出现脏数据问题,实现了一个高可用。
在服务注册完成后,需要将新的服务数据推送给客户端,具体推送数据源码路径为
com.alibaba.nacos.naming.push.PushService#onApplicationEvent
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
// 创建定时任务进行一个推送(udpSender为SingleThreadScheduledExecutor,只有一个核心线程的线程池)
Future future = udpSender.schedule(new Runnable() {
@Override
public void run() {
try {
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
// 遍历所有的客户端
for (PushClient client : clients.values()) {
// 僵尸节点直接跳过
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
}
Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
// 默认推送的时间为10s
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
// 封装推送的对象
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key));
// 推送
udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
com.alibaba.nacos.naming.push.PushService#udpPush
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
if (ackEntry == null) {
Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
return null;
}
// 超过最大发送重试次数
if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}
try {
if (!ackMap.containsKey(ackEntry.key)) {
totalPush++;
}
ackMap.put(ackEntry.key, ackEntry);
udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
Loggers.PUSH.info("send udp packet: " + ackEntry.key);
// 发送给客户端(数据会被对应客户端的 com.alibaba.nacos.client.naming.core.PushReceiver#run()方法接收处理)
udpSocket.send(ackEntry.origin);
// 更新重试次数
ackEntry.increaseRetryTime();
// 添加一个重试的任务,延时10s执行
executorService.schedule(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS),
TimeUnit.MILLISECONDS);
return ackEntry;
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}",
ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return null;
}
}
回到consistencyService.put中,onPut方法执行完后,会有一个taskDispatcher.addTask方法,该方法将同步最新的实例信息到其他服务节点。这个方法通过TaskDispatcher任务分发器将需要同时的服务key添加到TaskScheduler类的阻塞队列中。TaskScheduler实现了Runnable接口,其实现的run方法通过遍历所有的集群节点利用数据同步复制器DataSyncer实现同步。
com.alibaba.nacos.naming.consistency.ephemeral.distro.TaskDispatcher.TaskScheduler#run
public void run() {
List<String> keys = new ArrayList<>();
while (true) {
try {
String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
TimeUnit.MILLISECONDS);
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("got key: {}", key);
}
if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
continue;
}
if (StringUtils.isBlank(key)) {
continue;
}
if (dataSize == 0) {
keys = new ArrayList<>();
}
keys.add(key);
dataSize++;
// 判断是否已有1000个key需要同步或者(当前时间 - 最后一次分发时间)是否大于分发间隔时间才进行分发
if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
// 遍历所有集群节点
for (Server member : dataSyncer.getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
// 创建同步任务
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getKey());
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
}
// 该同步任务提交到数据复制器中
dataSyncer.submit(syncTask, 0);
}
lastDispatchTime = System.currentTimeMillis();
dataSize = 0;
}
} catch (Exception e) {
Loggers.DISTRO.error("dispatch sync task failed.", e);
}
}
}
com.alibaba.nacos.naming.consistency.ephemeral.distro.DataSyncer#submit
public void submit(SyncTask task, long delay) {
// If it's a new task:
if (task.getRetryCount() == 0) {
// 遍历所有需要同步的key
Iterator<String> iterator = task.getKeys().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
// taskMap中已存在该key的任务
if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
// associated key already exist:
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync already in process, key: {}", key);
}
// 直接移除
iterator.remove();
}
}
}
if (task.getKeys().isEmpty()) {
// all keys are removed:
return;
}
// 异步进行同步
GlobalExecutor.submitDataSync(() -> {
// 1. check the server
// 检查服务器列表
if (getServers() == null || getServers().isEmpty()) {
Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
return;
}
// 当前所需同步的key列表
List<String> keys = task.getKeys();
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);
}
// 2. get the datums by keys and check the datum is empty or not
// 根据key列表从dataStore获取所有的Datum
Map<String, Datum> datumMap = dataStore.batchGet(keys);
if (datumMap == null || datumMap.isEmpty()) {
// clear all flags of this task:
for (String key : keys) {
// 为空的话直接移除所有
taskMap.remove(buildKey(key, task.getTargetServer()));
}
return;
}
// 序列化
byte[] data = serializer.serialize(datumMap);
long timestamp = System.currentTimeMillis();
// 发送同步数据的请求
boolean success = NamingProxy.syncData(data, task.getTargetServer());
if (!success) {
// 不成功则重试
SyncTask syncTask = new SyncTask();
syncTask.setKeys(task.getKeys());
syncTask.setRetryCount(task.getRetryCount() + 1);
syncTask.setLastExecuteTime(timestamp);
syncTask.setTargetServer(task.getTargetServer());
retrySync(syncTask);
} else {
// clear all flags of this task:
for (String key : task.getKeys()) {
// 成功直接将taskMap缓存中的数据清掉
taskMap.remove(buildKey(key, task.getTargetServer()));
}
}
}, delay);
}
|