本文是个人针对nacos 2.0.3版本的解读,有错误的地方,还希望各位指正批评。
一、概述
Nacos是Alibaba开源的产品,主要提供分布式微服务架构中的服务发现,配置管理,服务治理的解决方案。
Nacos主要提供了一下四大功能:
二、Ncaos集群选举原理
Nacos采用的是Raft算法实现集群Leader选举,和Zookeeper的选举策略大致相同,但是算法却比Zookeeper采用的选举算法更加简单明了。
Raft算法在选举的时候,有三种角色:
- Leader:已经选举成功,整个集群中的领导者,一个集群中只有一个领导者。专门负责接受客户端的请求。
- Candidate:集群正处于选举阶段,此时所有的节点都是Candidate状态。
- Follower:整个集群中已经选举成功,已经出现了Leader,那么其他的节点需要将自身的状态更改为Follower状态。
源码解读选举原理:
RaftCore为选举算法的主要实现类,入口方法在init上,有一个@PostConstruct注解,RaftCore类被加载的时候就会执行init方法。
@PostConstruct
public void init() throws Exception {
……省略部分代码……
masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
……省略部分代码……
}
- MasterElection:是一个线程类,直接看run方法。
public void run() {
try {
if (stopWork) {
return;
}
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.leaderDueMs > 0) {
return;
}
local.resetLeaderDue();
local.resetHeartbeatDue();
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}
}
private void sendVote() {
RaftPeer local = peers.get(NetUtils.localServer());
Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
local.term);
peers.reset();
local.term.incrementAndGet();
local.voteFor = local.ip;
local.state = RaftPeer.State.CANDIDATE;
Map<String, String> params = new HashMap<>(1);
params.put("vote", JacksonUtils.toJson(local));
for (final String server : peers.allServersWithoutMySelf()) {
final String url = buildUrl(server, API_VOTE);
try {
HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);
return;
}
RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);
Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
peers.decideLeader(peer);
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}
}
public RaftPeer decideLeader(RaftPeer candidate) {
peers.put(candidate.ip, candidate);
SortedBag ips = new TreeBag();
int maxApproveCount = 0;
String maxApprovePeer = null;
for (RaftPeer peer : peers.values()) {
if (StringUtils.isEmpty(peer.voteFor)) {
continue;
}
ips.add(peer.voteFor);
if (ips.getCount(peer.voteFor) > maxApproveCount) {
maxApproveCount = ips.getCount(peer.voteFor);
maxApprovePeer = peer.voteFor;
}
}
if (maxApproveCount >= majorityCount()) {
RaftPeer peer = peers.get(maxApprovePeer);
peer.state = RaftPeer.State.LEADER;
if (!Objects.equals(leader, peer)) {
leader = peer;
ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));
Loggers.RAFT.info("{} has become the LEADER", leader.ip);
}
}
return leader;
}
public synchronized RaftPeer receivedVote(RaftPeer remote) {
RaftPeer local = peers.get(NetUtils.localServer());
if (remote.term.get() <= local.term.get()) {
String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;
Loggers.RAFT.info(msg);
if (StringUtils.isEmpty(local.voteFor)) {
local.voteFor = local.ip;
}
return local;
}
local.resetLeaderDue();
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
local.term.set(remote.term.get());
Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
return local;
}
总结:
假设有A,B,C三个节点。
- 服务刚启动的时候:
- 服务刚启动的时候,A,B,C三个节点的状态都为FOLLOWER,假设A先发起了投票,那么A就会给自己投一票,并由FOLLOWER转化为CANDIDATE状态,并向其他节点发送投票请求。那可能会发生以下三种情况:
- 第一种情况:A的投票请求被C已经处理了,通过判断,假设C的纪元小于A的纪元,那么C必须给A投一票,并将自身状态改为FOLLOWER,并将纪元信息同步为A的纪元信息,那么此时A有两票,那么A成为Leader,并开始发送心跳信息给B、C节点,告知B、C节点自己为Leader。
- 第二种情况:A的投票请求被C已经处理了,通过判断,假设C的纪元大于A的纪元,那么C会给自己投一票,如果此时B也是这种情况,那么此时就会出现平票的现象。那么就必须等到下一轮投票,此时nacos集群会延长不可用时间。
- 第三种情况:A的投票请求都被处理了,但是投票信息中,B或者C为Leader,那么此时A必须将自己的状态更改为FOLLOWER,并将纪元信息同步为Leader的纪元信息
- Leader奔溃的时候:
- 假设A为Leader,但是A挂了,那么B、C发现接收不到A的心跳了,那么此时B,C两个节点的状态都为CANDIDATE状态,接下来B、C两个节点开始选举Leader。
三、配置中心
Nacos作为主流的分布式配置中心,提供了配置动态感知,动态更改的功能。
客户端处理逻辑:
- NacosConfigManager:利用Spring的SPI机制,在服务启动的加载,创建了configService对象实例。
public NacosConfigManager(NacosConfigProperties nacosConfigProperties) {
this.nacosConfigProperties = nacosConfigProperties;
createConfigService(nacosConfigProperties);
}
static ConfigService createConfigService(NacosConfigProperties nacosConfigProperties) {
if (Objects.isNull(service)) {
Class var1 = NacosConfigManager.class;
synchronized(NacosConfigManager.class) {
try {
if (Objects.isNull(service)) {
service = NacosFactory.createConfigService(nacosConfigProperties.assembleConfigServiceProperties());
}
} catch (NacosException var4) {
log.error(var4.getMessage());
throw new NacosConnectionFailureException(nacosConfigProperties.getServerAddr(), var4.getMessage(), var4);
}
}
}
return service;
}
- NacosContextRefresher:利用Spring的SPI机制,在服务启动的加载。NacosContextRefresher实现了ApplicationListener,重写了onApplicationEvent方法。
public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware {
……省略部分代码……
public void onApplicationEvent(ApplicationReadyEvent event) {
if (this.ready.compareAndSet(false, true)) {
this.registerNacosListenersForApplications();
}
}
private void registerNacosListenersForApplications() {
if (this.isRefreshEnabled()) {
Iterator var1 = NacosPropertySourceRepository.getAll().iterator();
while(var1.hasNext()) {
NacosPropertySource propertySource = (NacosPropertySource)var1.next();
if (propertySource.isRefreshable()) {
String dataId = propertySource.getDataId();
this.registerNacosListener(propertySource.getGroup(), dataId);
}
}
}
}
private void registerNacosListener(final String groupKey, final String dataKey) {
……省略部分代码……
try {
this.configService.addListener(dataKey, groupKey, listener);
} catch (NacosException var6) {
log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey, groupKey), var6);
}
}
……省略部分代码……
}
- ClientWorker:nacos配置中心中长轮询的实现。
public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,
final Properties properties) throws NacosException {
this.configFilterChainManager = configFilterChainManager;
init(properties);
agent = new ConfigRpcTransportClient(properties, serverListManager);
ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(ThreadUtils.getSuitableThreadCount(1), r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker");
t.setDaemon(true);
return t;
});
agent.setExecutor(executorService);
agent.start();
}
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
throws NacosException {
group = blank2defaultGroup(group);
String tenant = agent.getTenant();
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
synchronized (cache) {
for (Listener listener : listeners) {
cache.addListener(listener);
}
cache.setSyncWithServer(false);
agent.notifyListenConfig();
}
}
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
CacheData cache = getCache(dataId, group, tenant);
if (null != cache) {
return cache;
}
String key = GroupKey.getKeyTenant(dataId, group, tenant);
synchronized (cacheMap) {
CacheData cacheFromMap = getCache(dataId, group, tenant);
if (null != cacheFromMap) {
cache = cacheFromMap;
cache.setInitializing(true);
} else {
cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
cache.setTaskId(taskId);
if (enableRemoteSyncConfig) {
ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L, false);
cache.setContent(response.getContent());
}
}
Map<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
copy.put(key, cache);
cacheMap.set(copy);
}
LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
return cache;
}
public void executeConfigListen() {
Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
long now = System.currentTimeMillis();
boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
for (CacheData cache : cacheMap.get().values()) {
synchronized (cache) {
if (cache.isSyncWithServer()) {
cache.checkListenerMd5();
if (!needAllSync) {
continue;
}
}
if (!CollectionUtils.isEmpty(cache.getListeners())) {
if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
if (cacheDatas == null) {
cacheDatas = new LinkedList<CacheData>();
listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
}
cacheDatas.add(cache);
}
} else if (CollectionUtils.isEmpty(cache.getListeners())) {
if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
if (cacheDatas == null) {
cacheDatas = new LinkedList<CacheData>();
removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
}
cacheDatas.add(cache);
}
}
}
}
boolean hasChangedKeys = false;
if (!listenCachesMap.isEmpty()) {
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
String taskId = entry.getKey();
Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2);
List<CacheData> listenCaches = entry.getValue();
for (CacheData cacheData : listenCaches) {
timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant),
cacheData.getLastModifiedTs().longValue());
}
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
configChangeListenRequest.setListen(true);
try {
RpcClient rpcClient = ensureRpcClient(taskId);
ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(
rpcClient, configChangeListenRequest);
if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
Set<String> changeKeys = new HashSet<String>();
if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
hasChangedKeys = true;
for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse
.getChangedConfigs()) {
String changeKey = GroupKey
.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),
changeConfig.getTenant());
changeKeys.add(changeKey);
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
refreshContentAndCheck(changeKey, !isInitializing);
}
}
for (CacheData cacheData : listenCaches) {
String groupKey = GroupKey
.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
if (!changeKeys.contains(groupKey)) {
synchronized (cacheData) {
if (!cacheData.getListeners().isEmpty()) {
Long previousTimesStamp = timestampMap.get(groupKey);
if (previousTimesStamp != null) {
if (!cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp,
System.currentTimeMillis())) {
continue;
}
}
cacheData.setSyncWithServer(true);
}
}
}
cacheData.setInitializing(false);
}
}
} catch (Exception e) {
LOGGER.error("Async listen config change error ", e);
try {
Thread.sleep(50L);
} catch (InterruptedException interruptedException) {
}
}
}
}
if (!removeListenCachesMap.isEmpty()) {
for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
String taskId = entry.getKey();
List<CacheData> removeListenCaches = entry.getValue();
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);
configChangeListenRequest.setListen(false);
try {
RpcClient rpcClient = ensureRpcClient(taskId);
boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);
if (removeSuccess) {
for (CacheData cacheData : removeListenCaches) {
synchronized (cacheData) {
if (cacheData.getListeners().isEmpty()) {
ClientWorker.this
.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
}
}
}
}
} catch (Exception e) {
LOGGER.error("async remove listen config change error ", e);
}
try {
Thread.sleep(50L);
} catch (InterruptedException interruptedException) {
}
}
}
if (needAllSync) {
lastAllSyncTime = now;
}
if (hasChangedKeys) {
notifyListenConfig();
}
}
- ConfigRpcTransportClient:是ClientWorker中的一个内部类。
public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify)
throws NacosException {
ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant);
request.putHeader(NOTIFY_HEADER, String.valueOf(notify));
RpcClient rpcClient = getOneRunningClient();
if (notify) {
CacheData cacheData = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
if (cacheData != null) {
rpcClient = ensureRpcClient(String.valueOf(cacheData.getTaskId()));
}
}
ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts);
ConfigResponse configResponse = new ConfigResponse();
if (response.isSuccess()) {
LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, response.getContent());
configResponse.setContent(response.getContent());
String configType;
if (StringUtils.isNotBlank(response.getContentType())) {
configType = response.getContentType();
} else {
configType = ConfigType.TEXT.getType();
}
configResponse.setConfigType(configType);
String encryptedDataKey = response.getEncryptedDataKey();
LocalEncryptedDataKeyProcessor
.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, encryptedDataKey);
configResponse.setEncryptedDataKey(encryptedDataKey);
return configResponse;
……省略部分代码……
}
服务端处理逻辑:
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
……省略部分代码……
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
……省略部分代码……
}
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
……省略部分代码……
ConfigExecutor.executeLongPolling(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
- LongPollingService.ClientLongPolling:长轮询处理逻辑。
public void run() {
asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
@Override
public void run() {
……省略部分代码……
}, timeoutTime, TimeUnit.MILLISECONDS);
allSubs.add(this);
}
总结:
客户端逻辑:
-
客户端通过NacosConfigService 类的 getConfigInner 方法请求配置信息,优先会从本地文件中查找,如果本地文件不存在,那么就从服务端获取。 -
客户端每一个配置信息都会被封装成一个CacheData,CacheData中有几个重要的属性content是配置内容,MD5值是用来检测配置是否发生变更的关键,内部还维护着一个若干监听器组成的数组,一旦发生变更则依次回调这些监听器,而CacheData都保存在ClientWorker类中的原子变量 cacheMap 中。 -
ClientWorker中配置了一个专门用于监听配置信息变化的定时线程,执行的过程中做了一下几个工作:
-
检查本地配置,忽略本地快照不存在的配置项,检查是否存在需要回调监听器的配置项。 -
如果本地没有配置项的,从服务端拿,返回配置内容发生变更的配置项ID列表。 -
再根据每一个配置项ID再到服务端获取最新配置,更新本地快照,补全之前缺失的配置。 -
检查 MD5 标签是否一致,不一致需要回调监听器。
服务端逻辑:
- 服务在启动的时候,会将数据从数据库中load到本地磁盘上,并将一个比较重要的配置信息加载到内存中,比如MD5,并记录配置信息最后一次更新的时间。
- 当有配置发生变化的,由其中的一台节点将数据持久化到数据中,并将数据缓存到本地磁盘磁盘和内存中,当这台节点保存配置完成之后,就会发布一个
ConfigDataChangeEvent 的事件,通知其他节点进行数据同步。 - 服务端在接收到客户端长轮询请求的时候,处理逻辑在
LongPollingService 类中,LongPollingService 类中有一个ClientLongPolling 类,实现了Runnable接口,服务端将接收到的客户端的长轮询请求封装成ClientLongPolling通过定时任务执行,每一个请求都携带一个asyncContext对象。 - 客户端默认30s发起一次长轮询请求,服务端线程池延后 29.5s 执行并返回响应结果,这样最大可能得避免因为客户端这边因为等待时间超过30s使得请求超时。
- 服务端也通过一个队列 allSubs 保存了所有正在被hold 住的轮询请求,hold住的期间,如果有配置信息发生了变化,就会发布一个LocalDataChangeEvent事件,这个事件会被封装成DataChangeTask任务执行。
- DataChangeTask的任务就是从队列 allSubs这个队列中找到对应的ClientLongPolling请求,并且立即执行并将响应结果返回给客户端。
四、服务注册
Nacos2.0.3版本中服务注册中心默认是AP模式,如果需要设置为CP模式,那么客户端必须设置spring.cloud.nacos.discovery.ephemeral=false (默认为true) ,表示是启用AP模式。
Nacos2.0.3版本中默认使用HTTP协议,端口号为8848 ,还增加了gRPC协议,使用gRPC协议的有两个地方,一个是客户端和服务端通信的时候,一个是集群节点之间的通信的时候。 Nacos2.0.3中gRPC的端口号是基于HTTP端口进行偏移1000生成的,即默认客户端和服务端通信的时候的端口为:8848+1000=9848,而集群之间的通信是的端口偏移1001,即8848+1001=9849,如果手动开启相关端口的话,那么这里需要开通,8848,9848,9849,三个端口号。
nacos注册中心需要增加如下依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
服务启动的时候,会在spring.factories中找到NacosServiceRegistryAutoConfiguration类进行注入。在该类中会自动装配NacosAutoServiceRegistration类,而NacosAutoServiceRegistration间接继承了ApplicationListener,通过初始化事件进行启动nacos服务注册功能。
- AbstractAutoServiceRegistration.onApplicationEvent
public void onApplicationEvent(WebServerInitializedEvent event) {
this.bind(event);
}
- AbstractAutoServiceRegistration.register
protected void register() {
this.serviceRegistry.register(this.getRegistration());
}
- NacosServiceRegistry.register
public void register(Registration registration) {
NamingService namingService = this.namingService();
String serviceId = registration.getServiceId();
String group = this.nacosDiscoveryProperties.getGroup();
Instance instance = this.getNacosInstanceFromRegistration(registration);
namingService.registerInstance(serviceId, group, instance);
}
}
}
private Instance getNacosInstanceFromRegistration(Registration registration) {
Instance instance = new Instance();
instance.setIp(registration.getHost());
instance.setPort(registration.getPort());
instance.setWeight((double)this.nacosDiscoveryProperties.getWeight());
instance.setClusterName(this.nacosDiscoveryProperties.getClusterName());
instance.setEnabled(this.nacosDiscoveryProperties.isInstanceEnabled());
instance.setMetadata(registration.getMetadata());
instance.setEphemeral(this.nacosDiscoveryProperties.isEphemeral());
return instance;
}
- NamingClientProxyDelegate.registerService
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
private NamingClientProxy getExecuteClientProxy(Instance instance) {
return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}
基于AP模式(spring.cloud.nacos.discovery.ephemeral=true)
基于AP模式的时候,这里选择的getExecuteClientProxy(instance)的结果为grpcClientProxy,即使用grpc协议和服务端进行通信。
客户端处理逻辑:
- NamingGrpcClientProxy.registerService:grpc协议的实现为NamingGrpcClientProxy
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
instance);
redoService.cacheInstanceForRedo(serviceName, groupName, instance);
doRegisterService(serviceName, groupName, instance);
}
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
redoService.instanceRegistered(serviceName, groupName);
}
说明:
-
BeatReactor:客户端与服务端周期心跳检测类,内部定义ScheduledThreadPoolExecutor周期调度器,创建名为com.alibaba.nacos.naming.beat.sender线程周期的执行BeatTask任务(该类为BeatReactor中的一个内部类用于向服务端发送心跳信息,最终通过httpclient发送路径为/instance/beat的http请求),内部维护以serviceName+groupName+ip+host为key,BeatInfo为value的map,当client初始化向服务端注册实例时会创建一个BeatInfo对象,通过BeatReactor中addBeatInfo()函数写入,并在beatInfo属性period(默认5s)后调度一次BeatTask。关于心跳检测可以参考后续对心跳机制描述 -
HostReactor:客户端周期去拉取服务端代码,内部定义ScheduledThreadPoolExecutor周期调度器,创建名为com.alibaba.nacos.client.naming.updater线程周期的执行UpdateTask任务(该类为HostReactor中的一个内部类用于更新client中缓存的服务注册列表信息,在获取列表的同时,告诉服务度它的udp端口号信息,服务端生成对应的PushClient对象,一旦服务端中对应的Service信息发生来变更,服务端可以通过PushClient进行发送变更信息。UpdateTask以Service-cluster组合为单位来周期更新的,更新频率默认1s可设置。通过updateServiceNow()发送http请求-/instance/list) -
EventDispatcher:事件分发器 用于管理EventListener。内部定义ScheduledThreadPoolExecutor周期调度器,创建名为com.alibaba.nacos.naming.client.listener线程周期的执行Notifier任务(通过Notifier向注册的EventListener中发生NamingEvent事件,可用于本地扩展(实现ApplicationListener接口监控NamingEvent事件)) -
PushReceiver:用于接受服务端发送来的ACK数据并进行与本地信息对比更新,最后返回服务端ack信息,该类初始化时创建一个udpSocket,用于与服务端数据通信,定义ScheduledThreadPoolExecutor创建前缀名为com.alibaba.nacos.naming.push.receiver的调度器,用于执行PushReceiver(该类本身实现了Runnable接口)。
服务端处理逻辑:
服务端grpc的实现都继承了BaseGrpcServer,而BaseGrpcServer继承了BaseRpcServer,BaseRpcServer中有一个@PostConstruct修饰的start()方法,整个rpc的启动从这里开始。
BaseGrpcServer的实现有两种实现,其中GrpcSdkServer是专门用来处理客户端和服务端之间通信的,GrpcClusterServer是专门用来处理集群间通信的。
@PostConstruct
public void start() throws Exception {
……省略部分代码……
startServer();
……省略部分代码……
}
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
……省略部分代码……
final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
.asyncUnaryCall((request, responseObserver) -> {
grpcCommonRequestAcceptor.request(request, responseObserver);
});
……省略部分代码……
}
- InstanceRequestHandler.handle
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
Service service = Service
.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
switch (request.getType()) {
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(service, request, meta);
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
return deregisterInstance(service, request, meta);
default:
throw new NacosException(NacosException.INVALID_PARAM,
String.format("Unsupported request type %s", request.getType()));
}
}
public void registerInstance(Service service, Instance instance, String clientId) {
Service singleton = ServiceManager.getInstance().getSingleton(service);
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
InstancePublishInfo instanceInfo = getPublishInfo(instance);
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
if (null == publishers.put(service, instancePublishInfo)) {
MetricsMonitor.incrementInstanceCount();
}
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}publishEvent(final Event event) -> publishEvent(final Class<? extends Event> eventType, final Event event) -> publish(Event event)
public boolean publish(Event event) {
checkIsStart();
boolean success = this.queue.offer(event);
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
receiveEvent(event);
return true;
}
return true;
}
static {
…………省略部分代码…………
if (iterator.hasNext()) {
clazz = iterator.next().getClass();
} else {
clazz = DefaultPublisher.class;
}
DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {
try {
EventPublisher publisher = clazz.newInstance();
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
throw new NacosRuntimeException(SERVER_ERROR, ex);
}
};
…………省略部分代码…………
}
public class DefaultPublisher extends Thread implements EventPublisher {}
public void init(Class<? extends Event> type, int bufferSize) {
setDaemon(true);
setName("nacos.publisher-" + type.getName());
this.eventType = type;
this.queueMaxSize = bufferSize;
this.queue = new ArrayBlockingQueue<>(bufferSize);
start();
}
public void run() {
openEventHandler();
}
void openEventHandler() {
try {
…………省略部分代码…………
for (; ; ) {
if (shutdown) {
break;
}
final Event event = queue.take();
receiveEvent(event);
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : ", ex);
}
}
void receiveEvent(Event event) {
for (Subscriber subscriber : subscribers) {
notifySubscriber(subscriber, event);
}
…………省略部分代码…………
}
public void notifySubscriber(final Subscriber subscriber, final Event event) {
final Runnable job = () -> subscriber.onEvent(event);
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job);
}
…………省略部分代码…………
}
- DistroClientDataProcessor.onEvent:ClientEvent.ClientChangedEvent事件的处理类
public void onEvent(Event event) {
if (EnvUtil.getStandaloneMode()) {
return;
}
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
} else {
syncToAllServer((ClientEvent) event);
}
}
private void syncToAllServer(ClientEvent event) {
Client client = event.getClient();
…………省略部分代码…………
if (event instanceof ClientEvent.ClientDisconnectEvent) {
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);
} else if (event instanceof ClientEvent.ClientChangedEvent) {
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);
}
}
- DistroProtocol.syncToTarget
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
targetServer);
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
}
}
- NacosDelayTaskExecuteEngine.processTasks:执行任务
protected void processTasks() {
…………省略部分代码…………
try {
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
retryFailedTask(taskKey, task);
}
}
}
- PushDelayTaskExecuteEngine:
public class PushDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {
…………省略部分代码…………
private static class PushDelayTaskProcessor implements NacosTaskProcessor {
…………省略部分代码…………
@Override
public boolean process(NacosTask task) {
PushDelayTask pushDelayTask = (PushDelayTask) task;
Service service = pushDelayTask.getService();
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
return true;
}
}
}
- TaskExecuteWorker.addTask:TaskExecuteWorker是一个线程实现类,
public void addTask(Object tag, AbstractExecuteTask task) {
NacosTaskProcessor processor = getProcessor(tag);
if (null != processor) {
processor.process(task);
return;
}
TaskExecuteWorker worker = getWorker(tag);
worker.process(task);
}
public boolean process(NacosTask task) {
if (task instanceof AbstractExecuteTask) {
putTask((Runnable) task);
}
return true;
}
private void putTask(Runnable task) {
try {
queue.put(task);
} catch (InterruptedException ire) {
log.error(ire.toString(), ire);
}
}
- TaskExecuteWorker是一个线程实现类,默认调用的就是PushExecuteTask.run,
public void run() {
while (!closed.get()) {
try {
Runnable task = queue.take();
long begin = System.currentTimeMillis();
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
log.warn("task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e.toString(), e);
}
}
}
public void run() {
try {
PushDataWrapper wrapper = generatePushData();
for (String each : getTargetClientIds()) {
Client client = delayTaskEngine.getClientManager().getClient(each);
if (null == client) {
continue;
}
Subscriber subscriber = delayTaskEngine.getClientManager().getClient(each).getSubscriber(service);
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
}
}
- TaskExecuteWorker继承了NacosTaskProcessor,并且内部有一个继承了Thread类的InnerWorker类的实现,在TaskExecuteWorker构造器中new了他的实例,并启动了线程。
public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
this.name = name + "_" + mod + "%" + total;
this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
this.closed = new AtomicBoolean(false);
this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
new InnerWorker(name).start();
}
private class InnerWorker extends Thread {
InnerWorker(String name) {
setDaemon(false);
setName(name);
}
@Override
public void run() {
while (!closed.get()) {
try {
Runnable task = queue.take();
long begin = System.currentTimeMillis();
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
log.warn("task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e.toString(), e);
}
}
}
}
- ClientServiceIndexesManager.addPublisherIndexes():服务端这边最终就是将服务信息和客户端信息放在ConcurrentMap<Service, Set> publisherIndexes = new ConcurrentHashMap<>();集合中,使用的时候,通过ClientServiceIndexesManager和Service能够获取到所有的该服务实例的clientId,然后通过ClientManager和clientId集合能够获取到该服务所有实例节点信息。
private void addPublisherIndexes(Service service, String clientId) {
publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
publisherIndexes.get(service).add(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
AP模式总结:
-
客户端在启动的时候,将服务信息封装成instance,包含ip、端口号、服务名、集群名等信息,最终使用HttpClientRequest将请求发送出去注册服务信息,并启动一个心跳检测的定时任务,定时任务默认5s执行一次。服务端返回心跳结果中没有当前服务信息的话,则执行注册服务的逻辑。 -
服务端在接受到客户端的服务注册的请求之后,经过一系列的准备工作之后,因为是AP模式,只需要将服务信息放到缓存中即可。nacos的服务注册模型实际上就是一个ConcurrentHashMap: Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
-
nacos在AP模式下,采用Distro协议,首先向任务阻塞对列中添加一个当前服务的实例,然后再去更新本地服务列表缓存,最后再去遍历所有的集群节点,并创建数据同步的任务,将数据同步任务添加到阻塞队列中进行数据同步,所以AP模式下,nacos不保证数据完全同步。 -
nacos在更新本地服务列表数据的时候,为了防止并打读写冲突,采用的写时复制技术,先将原来的服务列表集合复制一份,然后在新的服务列表集合上进行更新删除操作,最后用新的服务列表集合替换旧的服务列表集合。 -
nacos更新完数据之后,会发布一系列的数据更改事件,通知客户端服务数据已经发生改变,并发送UDP数据包到客户端,客户端收到UDP数据后会反馈一个ACK给服务端,如果服务端没有收到ACK,那么再次尝试发送UDP数据包到客户端,如果重试时间超出规定时间依旧没有收到ACK,那么就不在发送了。 -
客户端也会通过定时任务不断的从服务端拉取服务列表信息来更新自己的本地缓存。 -
客户端也会通过心跳机制上报自己的健康信息,默认是5s上报一次,如果nacos服务端15秒内未收到客户端的心跳,那么就将该服务标记为不健康状态,之后超过30秒还未收到心跳上报,那么就会删除这个服务,如果后续这个服务又重新启动,那么就需要走注册流程。 -
nacos服务端也会主动进行心跳检测,默认时间间隔为20s。 -
nacos和client之间采取推拉结合的交互方式,一方面client可以通过定时任务每隔10s向nacos发起查询请求,如果服务列表改变nacos就会返回新列表,另一方面当本地服务实例发生变化时(即server实例注册成功或者心跳停止断开链接),nacos会主动通过UDP协议推送到client,udp协议非常快,不需要保持长连接。客户端拉取和服务器推送是互补的,能在一定程度上保证数据的时效性并提高性能。
基于CP模式(spring.cloud.nacos.discovery.ephemeral=false)
基于AP模式的时候,这里选择的getExecuteClientProxy(instance)的结果为httpClientProxy,即使用http协议和服务端进行通信。
客户端处理逻辑:
- NamingHttpClientProxy.registerService:使用http协议和后端服务通信
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, groupedServiceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put(IP_PARAM, instance.getIp());
params.put(PORT_PARAM, String.valueOf(instance.getPort()));
params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
String method) throws NacosException {
try {
HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
- PushReceiver.run:客户端会开启一个线程,不断的监听服务端发送服务注册信息同步命令,并将服务端服务注册信息同步到自己本地的服务注册信息,并将ack返回到服务端。
public void run() {
while (!closed) {
try {
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
udpSocket.receive(packet);
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
if (PUSH_PACKAGE_TYPE_DOM.equals(pushPacket.type) || PUSH_PACKAGE_TYPE_SERVICE.equals(pushPacket.type)) {
serviceInfoHolder.processServiceInfo(pushPacket.data);
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"\"}";
} else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) {
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))
+ "\"}";
} else {
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
packet.getSocketAddress()));
} catch (Exception e) {
if (closed) {
return;
}
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
………省略部分代码………
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
………省略部分代码………
}
服务端处理逻辑:
- InstanceController.register:服务端提供了一系列的controller接口,供客户端不断的调用。
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = HttpRequestInstanceBuilder.newBuilder() .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
return "ok";
}
- InstanceOperatorClientImpl.registerInstance
public void registerInstance(String namespaceId, String serviceName, Instance instance) {
boolean ephemeral = instance.isEphemeral();
String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
createIpPortClientIfAbsent(clientId);
Service service = getService(namespaceId, serviceName, ephemeral);
clientOperationService.registerInstance(service, instance, clientId);
}
- ClientOperationServiceProxy.registerInstance
public void registerInstance(Service service, Instance instance, String clientId) {
final ClientOperationService operationService = chooseClientOperationService(instance);
operationService.registerInstance(service, instance, clientId);
}
- PersistentClientOperationServiceImpl.registerInstance
public void registerInstance(Service service, Instance instance, String clientId) {
final InstanceStoreRequest request = new InstanceStoreRequest();
request.setService(service);
request.setInstance(instance);
request.setClientId(clientId);
final WriteRequest writeRequest = WriteRequest.newBuilder().setGroup(group())
.setData(ByteString.copyFrom(serializer.serialize(request))).setOperation(DataOperation.ADD.name())
.build();
try {
protocol.write(writeRequest);
} catch (Exception e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
}
}
public Response write(WriteRequest request) throws Exception {
CompletableFuture<Response> future = writeAsync(request);
return future.get(10_000L, TimeUnit.MILLISECONDS);
}
public CompletableFuture<Response> commit(final String group, final Message data,
final CompletableFuture<Response> future) {
………省略部分代码………
final Node node = tuple.node;
if (node.isLeader()) {
applyOperation(node, data, closure);
} else {
invokeToLeader(group, data, rpcRequestTimeoutMs, closure);
}
return future;
}
public void applyOperation(Node node, Message data, FailoverClosure closure) {
final Task task = new Task();
task.setDone(new NacosClosure(data, status -> {
NacosClosure.NacosStatus nacosStatus = (NacosClosure.NacosStatus) status;
closure.setThrowable(nacosStatus.getThrowable());
closure.setResponse(nacosStatus.getResponse());
closure.run(nacosStatus);
}));
task.setData(ByteBuffer.wrap(data.toByteArray()));
node.apply(task);
}
- NacosStateMachine.onApply:当服务注册数据写入集群中成功超过一半的节点之后,就会触发
NacosStateMachine.onApply 方法。
public void onApply(Iterator iter) {
try {
while (iter.hasNext()) {
Status status = Status.OK();
try {
………省略部分代码………
if (message instanceof WriteRequest) {
Response response = processor.onApply((WriteRequest) message);
postProcessor(response, closure);
}
………省略部分代码………
} catch (Throwable e) {
} finally {
Optional.ofNullable(closure).ifPresent(closure1 -> closure1.run(status));
}
………省略部分代码………
}
public Response onApply(WriteRequest request) {
final InstanceStoreRequest instanceRequest = serializer.deserialize(request.getData().toByteArray());
final DataOperation operation = DataOperation.valueOf(request.getOperation());
final Lock lock = readLock;
lock.lock();
try {
switch (operation) {
case ADD:
onInstanceRegister(instanceRequest.service, instanceRequest.instance,
instanceRequest.getClientId());
break;
case DELETE:
onInstanceDeregister(instanceRequest.service, instanceRequest.getClientId());
break;
default:
return Response.newBuilder().setSuccess(false).setErrMsg("unsupport operation : " + operation)
.build();
}
return Response.newBuilder().setSuccess(true).build();
} finally {
lock.unlock();
}
}
private void onInstanceRegister(Service service, Instance instance, String clientId) {
Service singleton = ServiceManager.getInstance().getSingleton(service);
if (!clientManager.contains(clientId)) {
clientManager.clientConnected(clientId, new ClientAttributes());
}
Client client = clientManager.getClient(clientId);
InstancePublishInfo instancePublishInfo = getPublishInfo(instance);
client.addServiceInstance(singleton, instancePublishInfo);
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
}
CP模式总结:
-
客户端在启动的时候,将服务信息封装成instance,包含ip、端口号、服务名、集群名等信息,最终使用HttpClientRequest将请求发送出去注册服务信息。 -
服务端在接受到客户端的服务注册的请求之后,经过一系列的准备工作之后,因为是CP模式,leader接收到服务信息后,立即写入。nacos的服务注册模型实际上就是一个ConcurrentHashMap: Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
-
nacos在更新本地服务列表数据的时候,为了防止并打读写冲突,采用的写时复制技术,先将原来的服务列表集合复制一份,然后在新的服务列表集合上进行更新删除操作,最后用新的服务列表集合替换旧的服务列表集合。 -
nacos更新完数据之后,会发布一系列的数据更改事件,通知客户端服务数据已经发生改变,并发送UDP数据包到客户端,客户端收到UDP数据后会反馈一个ACK给服务端,如果服务端没有收到ACK,那么再次尝试发送UDP数据包到客户端,如果重试时间超出规定时间依旧没有收到ACK,那么就不在发送了。 -
客户端也会通过定时任务不断的从服务端拉取服务列表信息来更新自己的本地缓存。 -
nacos服务端也会主动进行心跳检测,默认时间间隔为20s。
总结:
在nacos2.0.3版本下,服务注册功能在服务端使用AP模式还是使用CP模式,取决于客户端的配置(spring.cloud.nacos.discovery.ephemeral),默认为AP模式。
对于服务端来说,再启动的时候,同时启动了HTTP和GRPC两种模式的通信方式,客户端和服务端的具体使用哪种协议交互取取决于客户端参数控制,不管是CP模式还是AP模式,服务端在处理服务注册信息变更时,逻辑基本上都是一样的,只不过,AP模式下,写服务信息时采用了Raft协议,必须等到过半数的节点写数据成功后leader才会真正写入成功,而CP模式就是一上来就直接在leader上写入服务信息,所以CP模式不能保证所有节点的信息完全同步。另外不管CP还是AP模式获取服务注册成功都会向ClientServiceIndexesManager和ClientManager写入信息,需要获取某个服务的所有的实例的时候,通过ClientServiceIndexesManager和Service能够获取到所有的该服务实例的clientId,然后通过ClientManager和clientId集合能够获取到该服务所有实例节点信息。
客户端在启动时候会开启PushReceiver线程进行监控服务端同步服务注册信息的命令,也就是UDP监听方式。
Instance:nacos数据模型中最小存储单位,主要由ip+port来确定唯一性,一个client对应一个instance
Cluster:集群由相同的配置的Instance构成,内部存储了该集群下persistentInstances与ephemeralInstances。Cluster实例完成之后通过HealthCheckTask来检测所属Instance活性
Service:代表一个服务实例,由多个Cluster构,实现了RecordListener接口,当服务下的实例集群发生变化(增删该,通过事件驱动来解耦),触发其onChange事件,更新本服务器实例信息之后,同时通过PushService来推送注册到其实例下的client变更的节点信息
namespace:命名空间,可以使用它来管理一个注册中心管理多个环境(开发,测试(不同环境),生产)等
DistroConsistencyServiceImpl:AP模型体现,所有节点都是对等的(nacos这里是参考了Eureka服务的原理),负责管理ephemeral实例信息(数据存储在内存中,由DataStore存储,初始化时会执行一次LoadDataTask任务来同步其它服务上的已存储的ephemeral实例信息,通过定期执行同步任务LoadDataTask,与其它节点做到最终数据统一,一旦实例数据发生变化触发通知机制,不仅通知其它节点也会push发送客户端,也通过TaskDispatcher来添加任务最终与其它节点进行同步) .内部包含一个TaskDispatcher来管理一批TaskScheduler的执行,而TaskScheduler通过BlockingQueue来存储数据变更同步到其它服务节点任务。最终使得集群中临时节点实例信息达到最终一致。
RaftConsistencyServiceImpl:CP模型体现,这里nacos对Raft算法的实现。除了leader写入后,follower过半数统一后再写入,follower也会通过leader心跳定期同步数据。负责保存persistent实例信息(数据存储在本地文件磁盘中,由RaftStore存储,实际上在RaftCore中会存在一份缓存数据用于读,避免频繁读写磁盘,写的动作在写入磁盘时刷新缓存)
五、心跳检查和健康检查
心跳检查
客户端处理逻辑
nacos中心跳检测一般指客户端在注册临时节点的会启动一个定时线程,这个定时线程会不断的向服务端发送心跳请求以达到和服务端保持通信。
- NamingHttpClientProxy.registerService
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat;
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get(CLIENT_BEAT_INTERVAL_FIELD).asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
} catch (Exception unknownEx) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
} finally {
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put(IP_PARAM, beatInfo.getIp());
params.put(PORT_PARAM, String.valueOf(beatInfo.getPort()));
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}
服务端处理逻辑
- InstanceController:服务端中提供了一个InstanceController,InstanceController提供了一个"/instance/beat"接口,供客户端调用。
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat,
serviceName, namespaceId);
BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();
builder.setRequest(request);
int resultCode = getInstanceOperator()
.handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);
result.put(CommonParams.CODE, resultCode);
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,
getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
- InstanceOperatorClientImpl.handleBeat
public int handleBeat(String namespaceId, String serviceName, String ip, int port, String cluster,
RsInfo clientBeat, BeatInfoInstanceBuilder builder) throws NacosException {
Service service = getService(namespaceId, serviceName, true);
String clientId = IpPortBasedClient.getClientId(ip + InternetAddressUtil.IP_PORT_SPLITER + port, true);
IpPortBasedClient client = (IpPortBasedClient) clientManager.getClient(clientId);
if (null == client || !client.getAllPublishedService().contains(service)) {
if (null == clientBeat) {
return NamingResponseCode.RESOURCE_NOT_FOUND;
}
Instance instance = builder.setBeatInfo(clientBeat).setServiceName(serviceName).build();
registerInstance(namespaceId, serviceName, instance);
client = (IpPortBasedClient) clientManager.getClient(clientId);
}
if (!ServiceManager.getInstance().containSingleton(service)) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (null == clientBeat) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(cluster);
clientBeat.setServiceName(serviceName);
}
ClientBeatProcessorV2 beatProcessor = new ClientBeatProcessorV2(namespaceId, clientBeat, client);
HealthCheckReactor.scheduleNow(beatProcessor);
client.setLastUpdatedTime();
return NamingResponseCode.OK;
}
- ClientBeatProcessorV2:是一个线程实现类。
public void run() {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
int port = rsInfo.getPort();
String serviceName = NamingUtils.getServiceName(rsInfo.getServiceName());
String groupName = NamingUtils.getGroupName(rsInfo.getServiceName());
Service service = Service.newService(namespace, groupName, serviceName, rsInfo.isEphemeral());
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(service);
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo);
}
instance.setLastHeartBeatTime(System.currentTimeMillis());
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
rsInfo.getServiceName(), ip, port, rsInfo.getCluster(), UtilsAndCommons.LOCALHOST_SITE);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
}
}
}
总结
- 客户端启动的是,如果使用的是临时节点,那么在注册服务的时候,就会开启一个异步线程不断的向服务端发送心跳请求,异步线程默认延迟5s执行,后续每5s执行一次,最终都会向服务端发送一个"/instance/beat"的http请求。
- 服务端收到客户端的心跳请求之后,对请求参数进行系列的处理之后,会先判断当前实例是否存在,如果不存在,就会先注册该实例,否则开启一个异步线程,该异步线程会立马执行,线程的实现在ClientBeatProcessorV2和ClientBeatProcessor中。
- 不管是哪种实现,两种逻辑都是一样的,都是拿到所有的实例信息,循环遍历所有的实例信息,将每一个实例的最后心跳时间更改为当前系统时间,并且如果该实例健康状态为false,那么就设置为true,并发布服务更改事件。
健康检查
服务端对于每一个Sevice都会创建一个健康检查的延时任务,默认是刚启动后延迟5s执行,之后每5s执行一次。
public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
public void run() {
List<Instance> instances = service.allIPs(true);
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
getPushService().serviceChanged(service);
}
}
}
}
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
deleteIp(instance);
}
}
}
总结:
- 服务端在接受到客户端注册的请求之后,会创建一个Service,并开启健康检查,即调用Service.init方法。
- Service.init方法实际上也是开启了一个异步线程,异步线程默认延迟5s执行,后续每5s执行一次,在ClientBeatCheckTask中实现。
- 实现逻辑是拿到所有的实例信息,循环遍历所有的实例信息,获取每一个实例的最后心跳时间,如果最后心跳时间和当前时间相差超过15s,就将该实例将康状态更改为false,并发布服务更改事件。
- 再次遍历所有的实例信息,获取每一个实例的最后心跳时间,如果最后心跳时间和当前时间相差超过30s,就删除从服务列表信息中删除该实例,实际就是通过http调用删除实例的接口。
|