nacos学习的心历路程
在spring-cloud-alibaba 的全年规划中,杨翊简单介绍了一下Nacos 从1.0的架构图和2.0的架构图,以及解决的问题,一下图片出于杨翊杭州分享。


nacos 1.0 架构


nacos1.0架构设计问题

nacos 2.0架构设计


nacos2.0架构设计问题

nacos 1.0 到2.0 中一直延续的 一致性协议
Distor
Distro是阿里巴巴的私有协议,目前流行的Nacos服务管理框架就采用了Distro协议。Distro 协议被定位为 临时数据的一致性协议 :该类型协议, 不需要把数据存储到磁盘或者数据库 ,因为临时数据通常和服务器保持一个session会话, 该会话只要存在,数据就不会丢失
Raft
该论文主要包含一下模块:
- 节点间的选举
- log 的复制
- 集群配置的改变
参考文档
raft论文中文版本,
开源java raft协议实现框架
[个人文章](解读Raft(一 算法基础) (qq.com))
SOFAJRaft官网
notify
相当于一个监听
spring cloud commons 和Spring Cloud Context
在进行nacos深入分析前,有必要回过头来说一下,spring cloud commons,其为我们提供了一些规约,各个功能提供者只需要实现规约来达到提供功能的目的。详情参见官方文档。
spring cloud commons 官方文档地址
nacos 的client 服务注册
注册的流程都是使用 nameingservice 调用http 请求进行注册的,我们不需要进行区分,但是心跳的话就需要分版本进行考虑了。其实服务注册的所有服务我们关注 NamingService 接口即可。其实还是使用ServiceRegistry 进行服务注册。并使用DiscoveryClient进行服务发现能力。
public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
protected void register() {
this.serviceRegistry.register(this.getRegistration());
}
}
public class NacosServiceRegistry implements ServiceRegistry<Registration> {
private final NacosDiscoveryProperties nacosDiscoveryProperties;
private final NamingService namingService;
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
} else {
String serviceId = registration.getServiceId();
String group = this.nacosDiscoveryProperties.getGroup();
Instance instance = this.getNacosInstanceFromRegistration(registration);
try {
this.namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
} catch (Exception var6) {
log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var6});
ReflectionUtils.rethrowRuntimeException(var6);
}
}
}
private Instance getNacosInstanceFromRegistration(Registration registration) {
Instance instance = new Instance();
instance.setIp(registration.getHost());
instance.setPort(registration.getPort());
instance.setWeight((double)this.nacosDiscoveryProperties.getWeight());
instance.setClusterName(this.nacosDiscoveryProperties.getClusterName());
instance.setMetadata(registration.getMetadata());
return instance;
}
}
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
params.put("namespaceId", this.getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {
throw new IllegalArgumentException("no server available");
} else {
Exception exception = new Exception();
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for(int i = 0; i < servers.size(); ++i) {
String server = (String)servers.get(index);
try {
return this.callServer(api, params, server, method);
} catch (NacosException var11) {
exception = var11;
LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);
} catch (Exception var12) {
exception = var12;
LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);
}
index = (index + 1) % servers.size();
}
}
}
}
public String callServer(String api, Map<String, String> params, String curServer, String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0L;
this.checkSignature(params);
List<String> headers = this.builderHeaders();
String url;
if (!curServer.startsWith("https://") && !curServer.startsWith("http://")) {
if (!curServer.contains(":")) {
curServer = curServer + ":" + this.serverPort;
}
url = HttpClient.getPrefix() + curServer + api;
} else {
url = curServer + api;
}
HttpResult result = HttpClient.request(url, headers, params, "UTF-8", method);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)).observe((double)(end - start));
if (200 == result.code) {
return result.content;
} else if (304 == result.code) {
return "";
} else {
throw new NacosException(500, "failed to req API:" + curServer + api + ". code:" + result.code + " msg: " + result.content);
}
}
nacos 提供 DiscoveryClient
使用自动配置给我们提供了一个discoveryclient供应用使用。而NacosServiceDiscovery才是真正的能力提供者,其最终还是调用NamgingService 进行能力提供。

@Configuration(proxyBeanMethods = false)
public class NacosDiscoveryClientConfiguration {
@Bean
public DiscoveryClient nacosDiscoveryClient(
NacosServiceDiscovery nacosServiceDiscovery) {
return new NacosDiscoveryClient(nacosServiceDiscovery);
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",
matchIfMissing = true)
public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties,
ObjectProvider<TaskScheduler> taskScheduler) {
return new NacosWatch(nacosDiscoveryProperties, taskScheduler);
}
@Bean
@ConditionalOnMissingBean
public NacosServiceDiscovery nacosServiceDiscovery(
NacosDiscoveryProperties discoveryProperties) {
return new NacosServiceDiscovery(discoveryProperties);
}
}
NamingService 接口

到此为止,我们基本上能通过编程的方式获取服务的地址了,但是我们没有显示的代码调用,这个是怎么做的呢?我猜测和ribbon 有关。再有我们看服务发现客户端是直接调用请求去注册中心查找的服务,但是当我们的注册中心关了之后,服务与服务之前还是可以连通的,这个能力,也就是路由缓存功能是谁提供的呢,肯定是由 nacos 提供的,这是因为subscribe 默认为true,这样的话,就会缓存,并通过udp和server 服务进行通讯,来获取最新的状态。udp的能力是有PushReceiver 提供的
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
根据架构图我们了解到,会有一个udp 去和server 进行通讯,来感知服务这就是PushReceiver
public class PushReceiver implements Runnable {
@Override
public void run() {
while (true) {
try {
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
udpSocket.receive(packet);
String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
hostReactor.processServiceJSON(pushPacket.data);
ack = "{\"type\": \"push-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
} else if ("dump".equals(pushPacket.type)) {
ack = "{\"type\": \"dump-ack\""
+ ", \"lastRefTime\": \"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\""
+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
ack = "{\"type\": \"unknown-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
}
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
udpSocket = new DatagramSocket();
executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}
ribbon如何使用我
直接调用我的 namingservice 方法即可
学习资料推广
我已经将springamqp 源码解析录制为视频上传到bibi,分为六个章节详细介绍了各个模块的具体内容

https://www.bilibili.com/video/BV1hN411Z7fn?share_source=copy_web
感兴趣的小伙伴可以看看 学习一下 录制不易,记得三联哦!
|