一、前言
本系列为个人Dubbo学习笔记衍生篇,是正文篇之外的衍生内容,内容来源于《深度剖析Apache Dubbo 核心技术内幕》, 过程参考官方源码分析文章。仅用于个人笔记记录。本文分析基于Dubbo2.7.0版本,由于个人理解的局限性,若文中不免出现错误,感谢指正。
1. 前提
Dubbo 在 2.7 版本之后提供了新的监听方式。本文来分析 Dubbo 2.6 和 Dubbo 2.7 版本之间对于配置更新的差异。
首先我们需要知道 Dubbo 2.7提供了 应用级别配置 和 服务级别配置
- 应用级别配置:配置会作用于指定的应用名的应用。Dubbo 2.7 及以上新增的配置方式,会在配置中心上创建节点。
- 服务级别配置:配置只会作用于指定的接口服务。
综述,如下图(下图中的zk充当注册中心和配置中心,因此会创建下面三个节点):
- 节点 ① :Dubbo 2.6 版本使用的配置节点。当我们使用 dubbo-admin 进行配置时会创建该节点由于Dubbo 2.6 版本不存在配置中心和元数据中心,所以节点会在注册中心上创建,路径规则为
/dubbo/{服务接口全路径}/configurations - 节点 ② :Dubbo 2.7 版本使用的配置节点,应用级别配置,该节点创建在配置中心上。路径规则为
/dubbo/config/{应用名}/configurations - 节点 ③ :Dubbo 2.7 版本使用的配置节点,服务级别配置,该节点创建在配置中心上。路径规则为
/dubbo/config/{分组}/{服务接口全路径}:{版本号}/configurations 。该节点的内容等价于节点 ①的内容(实际节点内容由于生成规则不同有所不同,但其所包含的配置内容是等价的)。
本文使用的 dubbo-admin-0.2.0 来进行动态配置,该版本会兼容 2.6 和 2.7 版本,因此如果进行服务级别动态配置,则上图中的 ① 和 ③ 两个配置节点都会创建。如果进行应用级别动态配置,则会创建 ② 的配置节点,因为 Dubbo2.6版本不支持 应用级别配置。
关于上述内容,在 Dubbo笔记 ? :配置中心 中有较为完整的描述,如有需要可阅读。
基于上述前提,我们来看下面的内容。
二、提供者
我们知道提供者在服务暴露时的核心方法是 RegistryProtocol#export,这里我们忽略其他代码,仅看如下代码:
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
....
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
....
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
....
return new DestroyableExporter<>(exporter);
}
这里我们主要关注两个方法 :
- RegistryProtocol#overrideUrlWithConfig :Dubbo 2.7 及以上提供的新的订阅方式。通过 ProviderConfigurationListener 和 ServiceConfigurationListener 监听器来监听动态配置,处理应用级别和服务级别的配置。
- RegistryService#subscribe :Dubbo 2.7 以下版本使用的订阅方式,可以用于处理服务级别的配置。
下面我们详细来看这两个方法的具体实现:
1. RegistryProtocol#overrideUrlWithConfig
该函数是 Dubbo2.7 版本提供的,Dubbo 2.6 版本所有数据都存在注册中心上,Dubbo 2.7版本分成了注册中心,配置中心,和元数据中心。
private final ProviderConfigurationListener providerConfigurationListener = new ProviderConfigurationListener();
private final Map<String, ServiceConfigurationListener> serviceConfigurationListeners = new ConcurrentHashMap<>();
private URL overrideUrlWithConfig(URL providerUrl, OverrideListener listener) {
providerUrl = providerConfigurationListener.overrideUrl(providerUrl);
ServiceConfigurationListener serviceConfigurationListener = new ServiceConfigurationListener(providerUrl, listener);
serviceConfigurationListeners.put(providerUrl.getServiceKey(), serviceConfigurationListener);
return serviceConfigurationListener.overrideUrl(providerUrl);
}
这里我们的关键点在于两个监听器 :
- ProviderConfigurationListener : 应用级别监听器,因为一个程序只能有一个应用名,所以这里之需要一个 ProviderConfigurationListener 实例即可。
- ServiceConfigurationListener :服务级别监听器。因为一个程序可以提供多个 Dubbo 接口服务,所以这使用
serviceConfigurationListeners 来保存不同接口的监听器。
我们这里先来看 ServiceConfigurationListener 之后再来看 ProviderConfigurationListener 。
1.1 ServiceConfigurationListener
ServiceConfigurationListener 用于 Dubbo 2.7 及以后 来处理 服务级别的配置,在 ServiceConfigurationListener 的构造函数中会通过 this.initWith(providerUrl.getEncodedServiceKey() + CONFIGURATORS_SUFFIX) 来监听当前服务的配置。其实现如下:
private class ServiceConfigurationListener extends AbstractConfiguratorListener {
private URL providerUrl;
private OverrideListener notifyListener;
public ServiceConfigurationListener(URL providerUrl, OverrideListener notifyListener) {
this.providerUrl = providerUrl;
this.notifyListener = notifyListener;
this.initWith(providerUrl.getEncodedServiceKey() + CONFIGURATORS_SUFFIX);
}
private <T> URL overrideUrl(URL providerUrl) {
return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
}
@Override
protected void notifyOverrides() {
notifyListener.doOverrideIfNecessary();
}
}
这里的关键在于 ServiceConfigurationListener#initWith ,其实现在其父类中完成,如下:
protected final void initWith(String key) {
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
dynamicConfiguration.addListener(key, this);
String rawConfig = dynamicConfiguration.getConfig(key);
if (!StringUtils.isEmpty(rawConfig)) {
process(new ConfigChangeEvent(key, rawConfig));
}
}
下面我们按照注释的顺序一一来看:
1.1.1 DynamicConfiguration#getDynamicConfiguration
DynamicConfiguration#getDynamicConfiguration 实现如下:
static DynamicConfiguration getDynamicConfiguration() {
Optional<Configuration> optional = Environment.getInstance().getDynamicConfiguration();
return (DynamicConfiguration) optional.orElseGet(() -> getExtensionLoader(DynamicConfigurationFactory.class)
.getDefaultExtension()
.getDynamicConfiguration(null));
}
我们这里以 zk 为配置中心,所以获取的是 ZookeeperDynamicConfiguration 实例。ZookeeperDynamicConfiguration 会在构造函数中建立与 ZK 的连接并获取节点内容缓存到本地。
1.1.2 DynamicConfiguration#addListener
DynamicConfiguration#addListener 是实现监听服务级别配置的的核心方法,这里调用的是 DynamicConfiguration 接口的默认方法如下
default void addListener(String key, ConfigurationListener listener) {
addListener(key, DEFAULT_GROUP, listener);
}
随后通过如下调用顺序:
ZookeeperDynamicConfiguration#addListener -> CacheListener#addListener
CacheListener#addListener 实现如下:
private Map<String, Set<ConfigurationListener>> keyListeners = new ConcurrentHashMap<>();
public void addListener(String key, ConfigurationListener configurationListener) {
Set<ConfigurationListener> listeners = this.keyListeners.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>());
listeners.add(configurationListener);
}
这里可以看到,这里会将当前接口服务的key 和监听器保存到 CacheListener#keyListeners 中。而当我们进行动态配置时会修改配置中心的节点,而当 zk 节点变化时会触发 CacheListener#childEvent 方法,其实现如下:
@Override
public void childEvent(CuratorFramework aClient, TreeCacheEvent event) throws Exception {
TreeCacheEvent.Type type = event.getType();
ChildData data = event.getData();
if (type == TreeCacheEvent.Type.INITIALIZED) {
initializedLatch.countDown();
return;
}
if (data == null) {
return;
}
if (data.getPath().split("/").length >= 5) {
byte[] value = data.getData();
String key = pathToKey(data.getPath());
ConfigChangeType changeType;
switch (type) {
case NODE_ADDED:
changeType = ConfigChangeType.ADDED;
break;
case NODE_REMOVED:
changeType = ConfigChangeType.DELETED;
break;
case NODE_UPDATED:
changeType = ConfigChangeType.MODIFIED;
break;
default:
return;
}
ConfigChangeEvent configChangeEvent = new ConfigChangeEvent(key, new String(value, StandardCharsets.UTF_8), changeType);
Set<ConfigurationListener> listeners = keyListeners.get(key);
if (CollectionUtils.isNotEmpty(listeners)) {
listeners.forEach(listener -> listener.process(configChangeEvent));
}
}
}
那么这里我们就可以得知:当配置中心上有节点变动时会触发 CacheListener#childEvent 方法,而在 CacheListener#childEvent 方法中会通过 pathToKey(data.getPath()); 将path 转为 key,随后依据 key 从 keyListeners 找到对应节点的监听器,并执行 listener.process(configChangeEvent) 来触发监听器操作。
上述理论上如此,但需要注意的是:
当我们使用zk 作为配置中心时 Dubbo的服务级别配置是通过 RegistryService#subscribe 完成更新,而不是 通过 ServiceConfigurationListener 完成,因为当配置节点更新时并不会触发监听器操作, 原因在于keyListeners 中的key 和 pathToKey(data.getPath()); 转化的key 并不一致。
并且
- 该种情况仅针对于 服务级别配置,应用级别配置并不会出现该种情况。
- 个人测试,如果使用 Nacos 来作为配置中心则可以执行 ServiceConfigurationListener 的监听器逻辑,所以推测可能是 Dubbo对 zk 作为配置中心时的一个缺陷 ?
上述问题的原因如下 :
在 Dubbo 2.7.0 的版本下,ServiceConfigurationListener 在初始化时会传入监听器的 key,而这个key是通过 providerUrl.getEncodedServiceKey() + CONFIGURATORS_SUFFIX 生成。
其中 URL#getEncodedServiceKey 实现如下,可以看到仅仅是将 serviceKey 的第一个 / 替换为了 *
public String getEncodedServiceKey() {
String serviceKey = this.getServiceKey();
serviceKey = serviceKey.replaceFirst("/", "*");
return serviceKey;
}
所以实际上 CacheListener#keyListeners 中监听的 key 为dubbo*com.kingfish.service.impl.SimpleDemoService:1.0.0.configurators 。也即是说对于 com.kingfish.service.impl.SimpleDemoService 这个服务接口,我们对应的监听器的key 为 dubbo*com.kingfish.service.impl.SimpleDemoService:1.0.0.configurators 。
如下图:
而当我们修改动态配置时,即zk节点发生变化,会触发 CacheListener#childEvent 方法。其中会根据节点路径转换为对应的key,如下:
1. 修改的节点路径如下:
data.getPath() = /dubbo/config/dubbo/com.kingfish.service.impl.SimpleDemoService:1.0.0/configurators
2. key 的生成规则如下:
key = pathToKey(data.getPath())
= dubbo.com.kingfish.service.impl.SimpleDemoService:1.0.0.configurators
也即是说,当我们修改 com.kingfish.service.impl.SimpleDemoService 这个服务接口的动态配置时,Dubbo 根据 zk 节点路径转化的Key 为 dubbo.com.kingfish.service.impl.SimpleDemoService:1.0.0.configurators 。而我们实际的key为 dubbo*com.kingfish.service.impl.SimpleDemoService:1.0.0.configurators 。从而导致我们无法通过path转化的key找到对应服务的监听器。
如下图,由于变更节点转化的key 与 keyListeners中的key 不匹配导致 无法根据变更节点找到对应的监听器来进行处理:
1.1.3 AbstractConfiguratorListener#process
AbstractConfiguratorListener#process
-> RegistryProtocol.ServiceConfigurationListener#notifyOverrides
-> RegistryProtocol.OverrideListener#doOverrideIfNecessary
在 RegistryProtocol.OverrideListener#doOverrideIfNecessary 中 完成了本地的配置更新,如下:
public synchronized void doOverrideIfNecessary() {
final Invoker<?> invoker;
if (originInvoker instanceof InvokerDelegate) {
invoker = ((InvokerDelegate<?>) originInvoker).getInvoker();
} else {
invoker = originInvoker;
}
URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<?> exporter = bounds.get(key);
if (exporter == null) {
logger.warn(new IllegalStateException("error state, exporter should not be null"));
return;
}
URL currentUrl = exporter.getInvoker().getUrl();
URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey())
.getConfigurators(), newUrl);
newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl);
if (!currentUrl.equals(newUrl)) {
RegistryProtocol.this.reExport(originInvoker, newUrl);
...
}
}
1.2 ProviderConfigurationListener
ProviderConfigurationListener 的实现与 ServiceConfigurationListener 基本相同,不同的是,ServiceConfigurationListener 监听的服务级别, ProviderConfigurationListener监听的是应用级别的配置,所以这里监听的节点会有所不同。
private class ProviderConfigurationListener extends AbstractConfiguratorListener {
public ProviderConfigurationListener() {
this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
}
private <T> URL overrideUrl(URL providerUrl) {
return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
}
@Override
protected void notifyOverrides() {
overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary());
}
}
对于应用级别的动态配置,即是是 ZK 作为配置中心也不会出现上面 服务级别动态配置无法触发监听器的情况。
2. RegistryService#subscribe
关于这部分的具体代码,详阅:Dubbo笔记 ⑤ : 服务发布流程 - Protocol#export 中 服务订阅 部分的内容。
我们这里直接说结果 :由于 Dubbo 2.7之前的版本只有注册中心,并没有配置中心和元数据中心,所以动态配置的内容会保存在注册中心上。以 zk 为例,如下图: RegistryService#subscribe 会监听 图 ① 标注的节点,而上面我们提到,当我们创建服务级别的配置时,dubbo-admin-0.2.0 为了 兼容,会同时创建 图① 和 图 ② 节点。
而我们上面提到 当配置中心是zk 时ServiceConfigurationListener 并不起作用,所以此时的服务级别的动态配置更新就是通过 RegistryService#subscribe 监听 图① 节点的变化完成。
3. 总结
如下表
序号 | Dubbo版本 | 配置中心 | 服务级别配置 | 应用级别配置 |
---|
1 | 2.6 及以下 | 不支持 | 通过 RegistryService#subscribe 完成 | 不支持 | 2 | 2.7 及以上 | ZK | 通过 RegistryService#subscribe 完成 | 通过 ProviderConfigurationListener 完成 | 3 | 2.7 及以上 | 非ZK | 通过 ServiceConfigurationListener 完成 | 通过 ProviderConfigurationListener 完成 |
对于 Dubbo 2.6 及以下,通过 ServiceConfigurationListener 和 ProviderConfigurationListener 监听器完成更新。但是对于 zk作为配置中心的场景,ServiceConfigurationListener 失效,通过 Dubbo 2.6 版本的旧逻辑来完成更新。
对于 Dubbo 2.7及以上,通过 RegistryService#subscribe 方法中的逻辑完成更新。
应用配置和服务级别配置的优先级为: 应用级别 > 服务级别
因为配置的刷新是通过 RegistryProtocol.OverrideListener#notify 方法完成,在这个方法中会先 加载 ServiceConfigurationListeners 的配置,后加载 ProviderConfigurationListener 的配置。即后加载的配置会覆盖之前加载的配置,故应用级别 > 服务级别。
三、消费者
消费者的流程和提供者相似,下面来简单介绍。
当消费者启动时,Dubbo 会按照如下流程执行:
RegistryProtocol#refer -> RegistryProtocol#doRefer -> RegistryDirectory#subscribe
其中 RegistryDirectory#subscribe 的实现如下:
private static final ConsumerConfigurationListener consumerConfigurationListener = new ConsumerConfigurationListener();
private ReferenceConfigurationListener serviceConfigurationListener;
public void subscribe(URL url) {
setConsumerUrl(url);
consumerConfigurationListener.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}
这里我们又看到了两个监听器,我们直接说明:
- RegistryDirectory.ConsumerConfigurationListener#addNotifyListener : Dubbo 2.7 版本新增,订阅当前消费者应用级别配置.
- RegistryDirectory.ReferenceConfigurationListener :Dubbo 2.7 版本新增,订阅当前引用的接口服务的配置。这里需要对于提供者只需要关注自身的动态配置即可,但是对于消费者则需要订阅包括提供者列表 providers,动态配置信息 configurators,路由配置 routers 的信息。
- RegistryService#subscribe : Dubbo 2.6 版本旧逻辑,订阅当前引用的服务的配置,包括提供者列表 providers,动态配置信息 configurators,路由配置 routers 的信息。
注: 对于 路由节点,消费者会在 ListenableRouter#init 和 TagRouter#notify 方法中也会调用 DynamicConfiguration#addListener 来增加监听器进行监听。
RegistryService#subscribe 我们在上面已经提到过,需要注意的是 对于消费者来说RegistryDirectory#subscribe 不仅仅只订阅了configurators,而是订阅了 providers、configurators、routers 节点,当providers、configurators、routers 子节点变化时会通过回调通知RegistryDirectory。
下面我们来看 ConsumerConfigurationListener 和 ReferenceConfigurationListener 的实现。
1. ConsumerConfigurationListener & ReferenceConfigurationListener
ConsumerConfigurationListener和 ReferenceConfigurationListener 都是 RegistryDirectory 的内部静态类,其实现基本相同,如下:
private static class ConsumerConfigurationListener extends AbstractConfiguratorListener {
List<RegistryDirectory> listeners = new ArrayList<>();
ConsumerConfigurationListener() {
this.initWith(ApplicationModel.getApplication() + Constants.CONFIGURATORS_SUFFIX);
}
void addNotifyListener(RegistryDirectory listener) {
this.listeners.add(listener);
}
@Override
protected void notifyOverrides() {
listeners.forEach(listener -> listener.refreshInvoker(Collections.emptyList()));
}
}
private static class ReferenceConfigurationListener extends AbstractConfiguratorListener {
private RegistryDirectory directory;
private URL url;
ReferenceConfigurationListener(RegistryDirectory directory, URL url) {
this.directory = directory;
this.url = url;
this.initWith(url.getEncodedServiceKey() + Constants.CONFIGURATORS_SUFFIX);
}
@Override
protected void notifyOverrides() {
directory.refreshInvoker(Collections.emptyList());
}
}
当动态配置更新后,会触发 RegistryDirectory#refreshInvoker 来刷新本地配置,按照如下流程调用后 来到了RegistryDirectory#mergeUr 方法中。
RegistryDirectory#refreshInvoker -> RegistryDirectory#toInvokers -> RegistryDirectory#mergeUrl
1.1 RegistryDirectory#mergeUrl
在 RegistryDirectory#mergeUrl 方法中 Dubbo 完成了配置合并,合并的优先级是
动态配置 > Jvm 本地配置 > 消费者端配置(xml > Properties) > 提供者端配合
下面我们来看具体实现:
private URL mergeUrl(URL providerUrl) {
providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap);
providerUrl = overrideWithConfigurator(providerUrl);
providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false));
this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters());
if ((providerUrl.getPath() == null || providerUrl.getPath()
.length() == 0) && "dubbo".equals(providerUrl.getProtocol())) {
String path = directoryUrl.getParameter(Constants.INTERFACE_KEY);
if (path != null) {
int i = path.indexOf('/');
if (i >= 0) {
path = path.substring(i + 1);
}
i = path.lastIndexOf(':');
if (i >= 0) {
path = path.substring(0, i);
}
providerUrl = providerUrl.setPath(path);
}
}
return providerUrl;
}
private URL overrideWithConfigurator(URL providerUrl) {
providerUrl = overrideWithConfigurators(this.configurators, providerUrl);
providerUrl = overrideWithConfigurators(consumerConfigurationListener.getConfigurators(), providerUrl);
if (serviceConfigurationListener != null) {
providerUrl = overrideWithConfigurators(serviceConfigurationListener.getConfigurators(), providerUrl);
}
return providerUrl;
}
我们这里假设 消费者端 本地的原始配置为 URL0 ,则 :
- 在消费者启动时,会加载 JVM 配置,并覆盖本地配置
URL0 。RegistryDirectory 在初始化时会将合并后的消费者配置保存到 RegistryDirectory#queryMap 中。这一步完成了 Jvm 配置 覆盖 消费者端本地配置 ,诞生了合并后的新配置 URL1 。 - 在上面的 RegistryDirectory#mergeUrl 的代码中,
ClusterUtils.mergeUrl(providerUrl, queryMap); 会使用 queryMap 中的配置覆盖providerUrl 中的配置,而 queryMap 中保存的是URL1 ,providerUrl 为从注册中心拉取的提供者的配置。这一步完成了 URL1 覆盖提供者的配置,诞生了合并后的新配置 URL2 . - 在随后的 RegistryDirectory#overrideWithConfigurator 方法中,会拉取动态配置并覆盖
URL2 的配置,这一步完成了动态配置 覆盖 URL2 ,诞生了合并后的最终配合 URL3 。
综合上面的过程得到优先级的合并规则 :
动态配置 > Jvm 本地配置 > 消费者端配置(xml > Properties) > 提供者端配合
以上:内容部分参考 《深度剖析Apache Dubbo 核心技术内幕》 https://dubbo.apache.org/zh/docs/v2.7/dev/source/ 如有侵扰,联系删除。 内容仅用于自我记录学习使用。如有错误,欢迎指正
|