前言:
在多服务提供者的情况下,如果消费者需要获取一个特定的提供者来进行消费,则需要注册中心的帮助。
在Dubbo中,服务提供者启动时向注册中心注册当前provider_url,消费者在启动时也从注册中心中拉取当前接口的所有提供者信息,缓存到当前应用。
而当服务提供者发生变动时(新的提供者上线或者已有提供者下线),注册中心也会将变动消息推送到各个消费者,以实现注册信息的及时更新。
有关于注册中心,接口为Registry,Dubbo中提供的注册中心有如下:
?
本文就来看下Zookeeper(默认)注册中心的使用。其他的基本都是类似的实现,读者可自行阅读。
需要重点关注的有两方面:服务提供者如何注册URL到注册中心去 ;服务消费者如何从注册中心获取提供者列表信息; 服务消费者如何监听服务提供者变动;
下面就从这三方面来介绍
1.服务提供者注册信息到注册中心
实际在之前的博客中有简单介绍过,本文再梳理下。
ZookeeperRegistry通过实现Registry接口的的register()方法来实现注册URL
public abstract class FailbackRegistry extends AbstractRegistry {
@Override
public void register(URL url) {
// URL格式进行判断
if (!acceptable(url)) {
logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
return;
}
// 调用父类的register()方法,父类只是将url添加到Set集合中
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// 重点在这里,交由子类来实现,具体见1.1
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 如果设置check=false,则只打印异常,否则抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
}
1.1 ZookeeperRegistry.doRegister() 真正注册provider_url
public class ZookeeperRegistry extends FailbackRegistry {
public void doRegister(URL url) {
try {
// 到Zookeeper上创建临时节点信息,具体见1.2
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
1.2 CuratorZookeeperClient.create() 创建节点
public abstract class AbstractZookeeperClient<TargetDataListener, TargetChildListener> implements ZookeeperClient {
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
if(persistentExistNodePath.contains(path)){
return;
}
if (checkExists(path)) {
persistentExistNodePath.add(path);
return;
}
}
// 先判断父节点是否已创建,如果未创建,则先创建一个持久的父节点
int i = path.lastIndexOf('/');
if (i > 0) {
create(path.substring(0, i), false);
}
// 直到此时,才将当前url注册为一个临时节点
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
persistentExistNodePath.add(path);
}
}
}
针对provider_url的注册,先创建父节点为持久化节点,再创建子节点为临时节点。通过可视化工具,我们可以看到,其结构如下:
?
2.消费者启动时获取provider的列表信息
2.1 注册消费者url到注册中心
消费者启动时,也会像提供者一样,先将自己的URL注册到注册中心去
public class RegistryProtocol implements Protocol {
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
// 在这里将当前consumer_url注册到Zookeeper上,代码同上述provider注册是一致的
registry.register(directory.getRegisteredConsumerUrl());
}
...
}
}
2.2 消费者获取服务提供者列表信息
public class ZookeeperRegistry extends FailbackRegistry {
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
...
} else {
List<URL> urls = new ArrayList<>();
// 转换path为/dubbo/org.apache.dubbo.demo.DemoService/providers(本例中为此)
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkClient.create(path, false);
// 添加对path的监听
// 这里的Listener是谁呢?实际上一篇文章中已经有过说明了,就是RegistryDirectory对象本身
// 注意:这里返回的children中已经包含了当前provider全量url信息,后面的都是加工阶段了,将urls添加到RegistryDirectory中
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 主动调用一次notify()方法,获取provider_url列表,我们重点关注下这个方法
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
ZookeeperRegistry.doSubscribe() 方法实现了两个功能:
* 对provider路径的监听
* 获取所需provider_url的列表信息
2.2.1?ZookeeperRegistry.notify()?
直接调用父类的实现
public abstract class FailbackRegistry extends AbstractRegistry {
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
// 做完url和listener的条件判断后,交由doNotify()方法来执行
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list, retry regularly
addFailedNotified(url, listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
// 交由父类执行
super.notify(url, listener, urls);
}
}
public abstract class AbstractRegistry implements Registry {
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
...
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
// 最核心的就是这一句代码,执行Listener.notify()方法
// 那么这个Listener是谁呢?接着往下看
listener.notify(categoryList);
saveProperties(url);
}
}
}
建议结合上一篇文章RegistryDirectory的说明来看,因为这里的ChildListener实现类就是RegistryDirectory对象本身。通过RegistryDirectory对象的继承实现关系我们也看到了
// 在这里实现了NotifyListener接口
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
}
}
// 在这里将provider添加到RegistryDirectory中,关于其详细信息,前一篇博客中已经有过说明,不再赘述
refreshOverrideAndInvoker(providerURLs);
}
}
2.3 消费者监听服务提供者列表信息
实际2.2中已经有过介绍了,就是在创建consumer_url Zookeeper节点时,创建了对providers path的监听(ChildListener接口)
最终ChildListener的实现类RegistryDirectory来实现了其notify()方法,完成了对provider变动的监听。
总结:
本文通过三个问题来总结了注册中心的作用。相比较默认的ZookeeperRegistry注册中心(笔者默认的,实际项目中可以根据各自需要来使用不同的注册中心),其他注册中心的功能也同样是这几种,代码比较简答,笔者不再赘述,读者可自行阅读。
|