在这里,得取分两个概念,一个instance即一个实例,也就是一个Client,而ClientManager是所有实例的管理器
先看看ClientManager
ClientManager是一个接口
首先看看这个接口定义了什么规范
public interface ClientManager {
boolean clientConnected(String clientId, ClientAttributes attributes);
boolean clientConnected(Client client);
boolean syncClientConnected(String clientId, ClientAttributes attributes);
boolean clientDisconnected(String clientId);
Client getClient(String clientId);
boolean contains(final String clientId);
Collection<String> allClientId();
boolean isResponsibleClient(Client client);
boolean verifyClient(String clientId);
}
?? 客户端管理工具嘛,所以自然是客户端的CRUD ?? 接下来看看都有什么接口实现了它 本篇,就以EphemeralIpPortClientManager为例子来看,因为这个是短暂临时的客户端,也就是我们的服务上线掉线啥的就是它管理的
EphemeralIpPortClientManager
谈谈clients这个客户端的家
@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {
private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();
}
?? 首先他是一个Component,那么我就理解他是一个单例的spring bean ?? 其次映入眼帘的就是一个ConcurrentMap,说明这肯定是一个重要的东西,于是debug 这个例子里,我起了两个服务,分别为 9998端口的server1 和 9999端口的server2 ,如果看了上一篇的心跳机制,可能会联想到每次客户端上报心跳都会去ClientManager的这个clients里面获取IpPortBasedClient,所以我就在这个地方打断点,看看clients里面有什么?? ?? 可以看到,这里有四个客户端!!而不是两个,奇怪的点是,明明我服务起了两个服务,但是为什么却有四个客户端呢???有一个9999和9998我能“李姐”,但是另外两个是啥? 于是翻看调用栈信息,可以看到: 从InstanceController调用来的,于是接着看看是什么接口来这里注册的? ?? 这里是调用了/list 接口,意思就是查询指定服务的所有实例,但是主要的地方还是return那个地方执行了listInstance那里,这里贴一下里面的源码
@Override
public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,
boolean healthOnly) {
if (subscriber.getPort() > 0 && pushService.canEnablePush(subscriber.getAgent())) {
String clientId = IpPortBasedClient.getClientId(subscriber.getAddrStr(), true);
createIpPortClientIfAbsent(clientId);
clientOperationService.subscribeService(service, subscriber, clientId);
}
}
com.alibaba.nacos.naming.core#InstanceOperatorClientImpl
private void createIpPortClientIfAbsent(String clientId) {
if (!clientManager.contains(clientId)) {
clientManager.clientConnected(clientId, new ClientAttributes());
}
}
?? 这里的第7行,createIpPortClientIfAbsent会在clientManager里面创建新的client对象(如果clients那个ConcurrentHashMap里面不存在的话) ?? clientInfo.version.compareTo(VersionUtil.parseVersion(switchDomain.getPushJavaVersion())) 这行代码是第5行 canEnablePush做的判断,逻辑是获取版本号的之类的,对应上面注释的逻辑,要用于适配1.x的逻辑
🌔 个人理解: 所以这里破案了,为何明明有两个服务,但是 EphemeralIpPortClientManager 却有四个客户端,其中两个就是对应的我们的客户端实例,也就是在registerInstance的时候创建的客户端,当然这两个客户端也会在上报心跳的的时候来到clients这个map里来校验,因为心跳信息里面也保存了客户端的信息。另外两个,我的理解是客户端拉取当前服务的实例的进程,即每个实例会有另外一个进程来拉取该服务集群的其他的客户端的信息(不过这里的注释写的是 For adapt 1.X subscribe logic ,本人客户端使用1.x, 源码调试使用的2.x进行的调试 在1.x的客户端代码中是有一个 HostReactor 来拉取的list的 , 不过2.x的代码中并不存在这个类,结合官方的注释,可能这个 HostReactor 已经在2.x的版本中进行了移除,不过兼容1.x的版本,所以依然是向clients里面添加了)
所以个人理解,clients这个ConcurrentHashMap里面保存的是客户端实例,对与1.x而言,这个clients保存的客户端概念不止是实例,还有实例内部的一些其他的线程(就像根据服务名拉取实例信息的/list接口一样),但是到了2.x(本人拉的2.0.3)而言,客户端的代码里面已经移除了 HostReactor 这个类了,可能对于2.x而言,客户端只是一个单单的客户端,不再算上其他的线程,当然这里做了兼容处理,所以依然是想client里面添加了。
当然上面扯了这么多,只有一点比较重要,上面那堆瞎扯的纯属个人猜测 ,就是ClientManager是保存客户端的管理者,这里就站在2.x的角度理解,其内部的clients保存的就是客户端的实例,该实例是对于IP+端口的维度而言, 被保存在 private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>(); 这里贴一下clients的entry:
过期的客户端清洁器 ExpiredClientCleaner
构造器初始化的时候提交的一个周期性为5s的客户端清理线程
@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {
public EphemeralIpPortClientManager(DistroMapper distroMapper, SwitchDomain switchDomain) {
this.distroMapper = distroMapper;
GlobalExecutor.scheduleExpiredClientCleaner(new ExpiredClientCleaner(this, switchDomain), 0,
Constants.DEFAULT_HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS);
clientFactory = ClientFactoryHolder.getInstance().findClientFactory(ClientConstants.EPHEMERAL_IP_PORT);
}
private static class ExpiredClientCleaner implements Runnable {
private final EphemeralIpPortClientManager clientManager;
private final SwitchDomain switchDomain;
public ExpiredClientCleaner(EphemeralIpPortClientManager clientManager, SwitchDomain switchDomain) {
this.clientManager = clientManager;
this.switchDomain = switchDomain;
}
@Override
public void run() {
long currentTime = System.currentTimeMillis();
for (String each : clientManager.allClientId()) {
IpPortBasedClient client = (IpPortBasedClient) clientManager.getClient(each);
if (null != client && isExpireClient(currentTime, client)) {
clientManager.clientDisconnected(each);
}
}
}
}
}
这里贴了EphemeralIpPortClientManager的构造器,从构造器可以得知: ?? 第6行 传入了clientManager这个对象,同时向 ScheduledExecutorService 线程池提交了一个任务(这是一个周期性的任务,即固定了一段时间后周而复始的执行,时间是5s一次)
public static void scheduleExpiredClientCleaner(Runnable runnable, long initialDelay, long delay, TimeUnit unit) {
EXPIRED_CLIENT_CLEANER_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, delay, unit);
}
那么这个任务做了什么呢? ?? 看上面的run方法,遍历拿到所有的客户端的信息,校验这个客户端是否过期 noUpdatedTime = currentTime - client.getLastUpdatedTime() 如果过期,则通过传入的clientManager移除
DistroMapper
@Component("distroMapper")
public class DistroMapper extends MemberChangeListener {
private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();
private final DistroMapper distroMapper;
private final ClientFactory<IpPortBasedClient> clientFactory;
public EphemeralIpPortClientManager(DistroMapper distroMapper, SwitchDomain switchDomain) {
this.distroMapper = distroMapper;
GlobalExecutor.scheduleExpiredClientCleaner(new ExpiredClientCleaner(this, switchDomain), 0,
Constants.DEFAULT_HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS);
clientFactory = ClientFactoryHolder.getInstance().findClientFactory(ClientConstants.EPHEMERAL_IP_PORT);
}
}
这个是通过构造器注入的一个bean,里面保存了nacos服务的一些信息,和nacos集群也有点关系,之后有时间会往里面扒,这里就不细化了
Client
Client接口
public interface Client {
String getClientId();
boolean isEphemeral();
void setLastUpdatedTime();
long getLastUpdatedTime();
boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
InstancePublishInfo removeServiceInstance(Service service);
InstancePublishInfo getInstancePublishInfo(Service service);
Collection<Service> getAllPublishedService();
boolean addServiceSubscriber(Service service, Subscriber subscriber);
boolean removeServiceSubscriber(Service service);
Subscriber getSubscriber(Service service);
Collection<Service> getAllSubscribeService();
ClientSyncData generateSyncData();
boolean isExpire(long currentTime);
void release();
}
可以先看看这里有抽象了哪些接口出来
IpPortBasedClient
看过之前的文章可能会对这个类有印象,服务注册的时候,或者心跳上报的时候,都会从ClientManager里面取出它来,我认为它就是客户端的载体,可以看看下图 IpPortBasedClient是继承自AbstractClient,实现了Client接口
public abstract class AbstractClient implements Client {
protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
protected volatile long lastUpdatedTime;
}
public class IpPortBasedClient extends AbstractClient {
public class IpPortBasedClient extends AbstractClient {
public static final String ID_DELIMITER = "#";
private final String clientId;
private final boolean ephemeral;
private final String responsibleId;
private ClientBeatCheckTaskV2 beatCheckTask;
private HealthCheckTaskV2 healthCheckTaskV2;
public IpPortBasedClient(String clientId, boolean ephemeral) {
this.ephemeral = ephemeral;
this.clientId = clientId;
this.responsibleId = getResponsibleTagFromId();
}
private String getResponsibleTagFromId() {
int index = clientId.indexOf(IpPortBasedClient.ID_DELIMITER);
return clientId.substring(0, index);
}
public static String getClientId(String address, boolean ephemeral) {
return address + ID_DELIMITER + ephemeral;
}
@Override
public String getClientId() {
return clientId;
}
@Override
public boolean isEphemeral() {
return ephemeral;
}
public String getResponsibleId() {
return responsibleId;
}
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));
}
@Override
public boolean isExpire(long currentTime) {
return isEphemeral() && getAllPublishedService().isEmpty() && currentTime - getLastUpdatedTime() > ClientConfig
.getInstance().getClientExpiredTime();
}
public Collection<InstancePublishInfo> getAllInstancePublishInfo() {
return publishers.values();
}
@Override
public void release() {
super.release();
if (ephemeral) {
HealthCheckReactor.cancelCheck(beatCheckTask);
} else {
healthCheckTaskV2.setCancelled(true);
}
}
private HealthCheckInstancePublishInfo parseToHealthCheckInstance(InstancePublishInfo instancePublishInfo) {
HealthCheckInstancePublishInfo result;
if (instancePublishInfo instanceof HealthCheckInstancePublishInfo) {
result = (HealthCheckInstancePublishInfo) instancePublishInfo;
} else {
result = new HealthCheckInstancePublishInfo();
result.setIp(instancePublishInfo.getIp());
result.setPort(instancePublishInfo.getPort());
result.setHealthy(instancePublishInfo.isHealthy());
result.setCluster(instancePublishInfo.getCluster());
result.setExtendDatum(instancePublishInfo.getExtendDatum());
}
if (!ephemeral) {
result.initHealthCheck();
}
return result;
}
public void init() {
if (ephemeral) {
beatCheckTask = new ClientBeatCheckTaskV2(this);
HealthCheckReactor.scheduleCheck(beatCheckTask);
} else {
healthCheckTaskV2 = new HealthCheckTaskV2(this);
HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
}
}
public void putServiceInstance(Service service, InstancePublishInfo instance) {
if (null == publishers.put(service, parseToHealthCheckInstance(instance))) {
MetricsMonitor.incrementInstanceCount();
}
}
}
}
?? IpPortBasedClient是客户端的实体,同时保存了响应的一些信息 ?? 大部分的方法都是获取客户端的信息,如clientId,是否是短暂在线等 接着我们看看IpPortBasedClient是何时被new出来的:
public class EphemeralIpPortClientManager implements ClientManager {
@Override
public boolean clientConnected(final Client client) {
clients.computeIfAbsent(client.getClientId(), s -> {
Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;
ipPortBasedClient.init();
return ipPortBasedClient;
});
return true;
}
@Override
public boolean clientConnected(String clientId, ClientAttributes attributes) {
return clientConnected(clientFactory.newClient(clientId, attributes));
}
}
?? 从这块代码中可以知道,IpPortBasedClient是在客户端连接的时候被new出来,同时保存在ClientManager里面了,也就是我们的客户端管理器 ?? 同时,调用了其init()方法
public void init() {
if (ephemeral) {
beatCheckTask = new ClientBeatCheckTaskV2(this);
HealthCheckReactor.scheduleCheck(beatCheckTask);
} else {
healthCheckTaskV2 = new HealthCheckTaskV2(this);
HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
}
}
?? 向线程池提交了一个ClientBeatCheckTaskV2任务,GlobalExecutor.scheduleNamingHealth(wrapperTask, 5000, 5000, TimeUnit.MILLISECONDS) 是一个延迟5s,周期为5s的周期性的线程任务 接下来看看这个ClientBeatCheckTaskV2是个什么任务
public class ClientBeatCheckTaskV2 extends AbstractExecuteTask implements BeatCheckTask, NacosHealthCheckTask {
@Override
public void doHealthCheck() {
try {
Collection<Service> services = client.getAllPublishedService();
for (Service each : services) {
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client
.getInstancePublishInfo(each);
interceptorChain.doInterceptor(new InstanceBeatCheckTask(client, each, instance));
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
}
public class InstanceBeatCheckTask implements Interceptable {
@Override
public void passIntercept() {
for (InstanceBeatChecker each : CHECKERS) {
each.doCheck(client, service, instancePublishInfo);
}
}
}
public class UnhealthyInstanceChecker implements InstanceBeatChecker {
@Override
public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
if (instance.isHealthy() && isUnhealthy(service, instance)) {
changeHealthyStatus(client, service, instance);
}
}
private void changeHealthyStatus(Client client, Service service, HealthCheckInstancePublishInfo instance) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client last beat: {}", instance.getIp(),
instance.getPort(), instance.getCluster(), service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getLastHeartBeatTime());
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
}
}
?? 上面放了三个类的三个方法 ?? 第一个就是我们的任务,执行doHealthCheck(),即健康检查,在第10行,对服务进行健康检查 ?? 这里用了一个拦截器链(玩的有点花),不过实际执行的还是InstanceBeatCheckTask的passIntercept ?? 最终实际执行的还是 UnhealthyInstanceChecker 的 docheck方法,其实际逻辑是判断实例的健康状态,也就是System.currentTimeMillis() - instance.getLastHeartBeatTime() > beatTimeout 根据最后一次心跳的间隔时间来进行的判断,如果不健康则修改实例的健康状态changeHealthyStatus,并且发送服务和客户端的变更事件
既然是检查最后一次心跳来校验健康状态,那么这里就回顾一下上一章心跳里更新实例的最后一次心跳时间的代码
public class ClientBeatProcessorV2 implements BeatProcessor {
@Override
public void run() {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
int port = rsInfo.getPort();
String serviceName = NamingUtils.getServiceName(rsInfo.getServiceName());
String groupName = NamingUtils.getGroupName(rsInfo.getServiceName());
Service service = Service.newService(namespace, groupName, serviceName, rsInfo.isEphemeral());
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(service);
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo);
}
instance.setLastHeartBeatTime(System.currentTimeMillis());
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
rsInfo.getServiceName(), ip, port, rsInfo.getCluster(), UtilsAndCommons.LOCALHOST_SITE);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
}
}
}
}
?? ClientBeatProcessorV2,也就是上一章心跳里面,发送心跳的时候,执行的异步任务,其在第18行修改的实例的心跳时间
略微总结一下
IpPortBasedClient是客户端在ClientManager保存的实体,里面有客户端信息,同时其被创建的初期(其创建是在客户端注册的时候去ClientManager找,如果没找到则通过clientConnected方法创建),会有一个周期性的异步5s的任务,根据最后一次心跳时间来检测实例的健康状态(心跳时间就是客户端上报心跳信息会进行更改) Nacos源码篇
语雀版文档
Nacos源码注释
|