前言:
在分析完Dubbo的整体架构之后,我们对每个层次来单独分析下。
我们的消费者在启动时,会去查询其所有对应的provider,并将URL转换为Invoker保存到当前内存,并启动对provider的监听,当其发生变动时,可以及时反馈到当前,对Invoker列表进行更新。
那么以上是如何实现的呢?
作为注册中心层,我们可以看到结构如下图:
从RegistryProtocol开始,在RegistryFactory中获取到对应的Registry(示例中采取的是ZookeeperRegistry)
new一个RegistryDirectory,其总负责对注册中心的监听,当有provider发生变动时,可以及时反馈到consumer。
本文就从RegistryProtocol.refer()方法开始聊起。
1.RegistryProtocol.refer()
public class RegistryProtocol implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 获取注册url,本例中以zookeeper:// 开头
url = getRegistryUrl(url);
// 所以从registryFactory中获取到的最终为ZookeeperRegistry
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 继续调用doRefer()方法
return doRefer(cluster, registry, type, url);
}
}
通过注册url zookeeper://... 来确定最终使用的注册中心类型为:ZookeeperRegistry。
2.?RegistryProtocol.doRefer()
public class RegistryProtocol implements Protocol {
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
// 调用ZookeeperRegistry.register()方法,将当前consumer_url注册到Zookeeper上(本质上就是创建一个临时节点)
// 具体见2.1
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// RegistryDirectory订阅url变更
directory.subscribe(toSubscribeUrl(subscribeUrl));
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}
}
2.1 ZookeeperRegistry.registry() 注册url
registry()方法在父类FailbackRegistry.java中,最终还是调用到子类的doRegistry()方法
public class ZookeeperRegistry extends FailbackRegistry {
public void doRegister(URL url) {
try {
// 创建临时节点
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
2.2 RegistryDirectory.subscribe()?
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
// 调用ZookeeperRegistry.subscribe()订阅方法
registry.subscribe(url, this);
}
}
这里需要注意的是将当前this 也就是RegistryDirectory本身当做listener传入,所以最终监听被触发时,还是会调用到RegistryDirectory
2.3?ZookeeperRegistry.subscribe()?订阅节点变更
public class ZookeeperRegistry extends FailbackRegistry {
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 匹配所有接口
if (ANY_VALUE.equals(url.getServiceInterface())) {
...
} else {
List<URL> urls = new ArrayList<>();
// 获取到的path,在本例中即为:/dubbo/org.apache.dubbo.demo.DemoService/providers
// 也就是provider的路径
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkClient.create(path, false);
// 创建对该provider_path的监听,监听器本身为RegistryDirectory
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 最后触发一次notify,调用RegistryDirectory.notify()
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
总结:消费者启动时,创建对provider_path(本例中为/dubbo/org.apache.dubbo.demo.DemoService/providers)的监听,监听器为RegistryDirectory。
2.4?RegistryDirectory.notify() 触发监听回调
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
// router相关,非本文重点
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
}
}
// 在这里将URL转换为Invoker,保存到RegistryDirectory.urlInvokerMap中
refreshOverrideAndInvoker(providerURLs);
}
}
总结:
我们可以把RegistryDirectory当做注册中心的操作层,所需要的provider信息都存放在RegistryDirectory中。
而具体的操作则交由ZookeeperRegistry来实现。
|