注册流程
整个注册中心的注册和发现流程主要有三个方面来完成:服务的提供方(以下简称server)、服务的消费者(以下简称client)、注册中心(nacos)。首先我们来探讨server与nacos的交互过程。
server需要通过nacos官方的OpenApi提供的接口来发起服务注册请求,随后server会定时向nacos发送心跳信息来进行心跳检测,对于使用者来说这一步可以采用ScheduledExecutorService创建定时任务来完成。nacos会异步的处理注册请求任务和心跳任务。
nacos心跳机制 nacos和client之间采取推拉结合的交互方式,一方面client可以通过定时任务每隔10s向nacos发起查询请求,如果服务列表改变nacos就会返回新列表,另一方面当本地服务实例发生变化时(即server实例注册成功或者心跳停止断开链接),nacos会主动通过UDP协议推送到client,udp协议非常快,不需要保持长连接。在注册中心的场景中client数量往往多于server,如果每一次服务更新,nacos要和成千上万的服务消费者去建立Tcp的话性能肯定是不行的。而如果UDP通知失败,客户端每10秒还会主动去拉一次,客户端拉取和服务器推送是互补的,这样既能保证server实例更新的时效性,又能提高效率。
分析nacos服务注册源码
官方OpenApi服务注册 我们跟进到接口源码,可以看到他是通过registerInstance()方法完成服务实例注册的
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
}
在nacos源码中有一个类ServiceManager,它实现了RecordListener接口,我们可以看到registerInstance()方法
public class ServiceManager implements RecordListener<Service> {
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
consistencyService.put(key, instances);
}
}
}
再跟进ConsistencyService,我们找到它的一个实现类DistroConsistencyServiceImpl 跟进源码
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, DataOperation.CHANGE);
}
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
}
}
Notifier是一个阻塞队列,我们把注册任务添加进去,然后执行,这一步的目的主要是将请求写入和执行请求分离开来,实现异步处理,再跟进Notifier处理请求的源码
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
private volatile Notifier notifier = new Notifier();
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
}
public class GlobalExecutor {
private static final ScheduledExecutorService DISTRO_NOTIFY_EXECUTOR = ExecutorFactory.Managed
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.distro.notifier"));
public static void submitDistroNotifyTask(Runnable runnable) {
DISTRO_NOTIFY_EXECUTOR.submit(runnable);
}
}
public class Notifier implements Runnable {
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
Pair<String, DataOperation> pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
}
总结
server向nacos发起注册任务请求,并维持一个心跳检测的定时任务,naocs会通过阻塞队列异步地处理这些请求,并实时的通过UDP推送到client,为防止UDP数据丢失,client也会通过定时任务每隔10s向nacos发送拉取请求,当服务列表改变,nacos再返回。
|