一、前言
本系列为个人Dubbo学习笔记,内容基于《深度剖析Apache Dubbo 核心技术内幕》, 过程参考官方源码分析文章,仅用于个人笔记记录。本文分析基于Dubbo2.7.5版本,由于个人理解的局限性,若文中不免出现错误,感谢指正。
系列文章地址:Dubbo源码分析:全集整理
本文基于 Dubbo 2.7.5 版本。关于该部分逻辑,如有需要可参考:
- Dubbo笔记 ? : Spring 执行流程概述
- Dubbo笔记 ? : DubboBootstrap 的服务暴露
- Dubbo笔记 ? : 服务自省-提供者
- Dubbo笔记 ? : 服务自省-消费者
二、ReferenceAnnotationBeanPostProcessor#doGetInjectedBean
在 Dubbo笔记 ? : Spring 执行流程概述中我们就介绍了 Spring 框架下 Dubbo的启动流程。对于消费者我们提到,在Spring 容器启动过程中,ReferenceAnnotationBeanPostProcessor会扫描 被@Reference 注解修饰的属性,并进行属性填充,而填充的逻辑则在于 ReferenceAnnotationBeanPostProcessor#doGetInjectedBean 中,在这个方法中会创建 Dubbo Service的 代理对象。其实现具体如下:
@Override
protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception {
String referencedBeanName = buildReferencedBeanName(attributes, injectedType);
String referenceBeanName = getReferenceBeanName(attributes, injectedType);
ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);
registerReferenceBean(referencedBeanName, referenceBean, attributes, injectedType);
cacheInjectedReferenceBean(referenceBean, injectedElement);
return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
}
这里我们注重来看 ReferenceAnnotationBeanPostProcessor#getOrCreateProxy 的实现,如下:
private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) {
if (existsServiceBean(referencedBeanName)) {
return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
wrapInvocationHandler(referenceBeanName, referenceBean));
} else {
return referenceBean.get();
}
}
这里我们看到l了两个分支:
- 如果容器中存在该 dubbo service 的 ReferenceBean,则直接创建代理对象。
- 如果容器中不存在 该 dubbo service 的 ReferenceBean,则会通过 ReferenceBean#get 来获取代理对象。
下面我们一一来看:
1. newProxyInstance
当我们通过XML 或 @Bean 注入了 ReferenceBean 时可能会出现这种情况,即容器中存在 referencedBean。这里可以看到, 如果容器中已经存在当前接口的 referencedBean。则会直接创建代理类.
return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
wrapInvocationHandler(referenceBeanName, referenceBean));
private InvocationHandler wrapInvocationHandler(String referenceBeanName, ReferenceBean referenceBean) {
return localReferenceBeanInvocationHandlerCache.computeIfAbsent(referenceBeanName, name ->
new ReferenceBeanInvocationHandler(referenceBean));
}
ReferenceAnnotationBeanPostProcessor.ReferenceBeanInvocationHandler 实现如下:
private static class ReferenceBeanInvocationHandler implements InvocationHandler {
private final ReferenceBean referenceBean;
private Object bean;
private ReferenceBeanInvocationHandler(ReferenceBean referenceBean) {
this.referenceBean = referenceBean;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object result;
try {
if (bean == null) {
init();
}
result = method.invoke(bean, args);
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
return result;
}
private void init() {
this.bean = referenceBean.get();
}
}
2. ReferenceBean#get
我们在 Dubbo源码分析:全集整理 中分析过 2.7.0 版本的 Dubbo消费者引用过程,这里基本逻辑类似,所以这并不会分析过于详细。
Dubbo 在第一次创建 Dubbo Service的 代理对象时会执行 ReferenceConfig#init 方法,如下:
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
public synchronized void init() {
if (initialized) {
return;
}
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
checkAndUpdateSubConfigs();
serviceMetadata.setVersion(version);
serviceMetadata.setGroup(group);
serviceMetadata.setDefaultGroup(group);
serviceMetadata.setServiceType(getActualInterface());
serviceMetadata.setServiceInterfaceName(interfaceName);
serviceMetadata.setServiceKey(URL.buildKey(interfaceName, group, version));
checkStubAndLocal(interfaceClass);
ConfigValidationUtils.checkMock(interfaceClass, this);
Map<String, String> map = new HashMap<String, String>();
... 服务参数解析,保存到 map 中
serviceMetadata.getAttachments().putAll(map);
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(interfaceClass);
repository.registerConsumer(
serviceMetadata.getServiceKey(),
attributes,
serviceDescriptor,
this,
null,
serviceMetadata);
ref = createProxy(map);
serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
repository.lookupReferredService(serviceMetadata.getServiceKey()).setProxyObject(ref);
initialized = true;
dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}
这里对 ReferenceConfig#init 方法进行了简化,不过我们可以看到核心逻辑在 ReferenceConfig#createProxy 中。下面我们具体来看:
3. ReferenceConfig#createProxy
ReferenceConfig#createProxy 创建了 Dubbo service的代理对象,是 消费者端的核心方法,其实现如下:
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
... 本地服务调用
} else {
urls.clear();
if (url != null && url.length() > 0) {
... 对服务直连的处理,将指定的url解析出注册中心地址,保存到urls 中
} else {
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
checkRegistry();
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
if (urls.size() == 1) {
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
... 多个注册中心或多个服务提供者,或者两者混合 调用,与单一差不多
}
}
...
String metadata = map.get(METADATA_KEY);
WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
if (metadataService != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataService.publishServiceDefinition(consumerURL);
}
return (T) PROXY_FACTORY.getProxy(invoker);
}
上面的进行了代码省略,我们主要关注下面四点:
ConfigValidationUtils.loadRegistries(this, false); :解析获取注册中心地址,这里会对服务自省的模式进行判断。REF_PROTOCOL.refer(interfaceClass, urls.get(0)); :在服务自省模式下,这里会调用 ServiceDiscoveryRegistryProtocol#refer 来获取 Invoker。该方法在第三部分进行了详细分析。metadataService.publishServiceDefinition(consumerURL) :发布 service definition ,这里默认会将 service definition 发布到本地元数据中心。 service definition 中包括 :接口名、接口方法、方法入参等信息。PROXY_FACTORY.getProxy(invoker); : 获取最终的代理对象。
下面我们对这四个部分进行详细分析 :
3.1 ConfigValidationUtils.loadRegistries(this, false);
在上面的代码中会通过 ConfigValidationUtils.loadRegistries(this, true) 是获取到当前的注册中心地址,这点逻辑和提供者相同。 ConfigValidationUtils.loadRegistries(this, true) ,内部会执行下面代码:
url = URLBuilder.from(url)
.addParameter(REGISTRY_KEY, url.getProtocol())
.setProtocol(extractRegistryType(url))
.build();
也就是说,如果我们设置了 dubbo.registry.parameters.registry-type = service ,在这则会将注册中心协议修改 为 service-discovery-registry ,如下:
service-discovery-registry://localhost:2181/org.apache.dubbo.registry.RegistryService?application=simple-provider&dubbo=2.0.2&logger=slf4j&metadata.type=remote&pid=25236&qos.enable=false®istry=zookeeper®istry-type=service®istry.type=service&release=2.7.5×tamp=1634545621271
否则协议类型会为设置为 registry,如下:
registry://localhost:2181/org.apache.dubbo.registry.RegistryService?application=simple-provider&dubbo=2.0.2&logger=slf4j&metadata.type=local&pid=38688&qos.enable=false®istry=zookeeper&release=2.7.5&simplified=false×tamp=1636683500716
3.2 REF_PROTOCOL.refer(interfaceClass, urls.get(0));
这里 REF_PROTOCOL 是 Protocol$Adaptive 类型,即Protocol 的 SPI 适配器,会根据入参中的 url.getProtocol() 获取的值来选择调用具体的实现类,而我们上面提到,如果我们设置了 dubbo.registry.parameters.registry-type = service ,URL的协议类型为 service-discovery-registry,因此这里会调用 ServiceDiscoveryRegistryProtocol#refer 来完成后续引用操作。关于 ServiceDiscoveryRegistryProtocol#refer 的内容,我们在第三部分再详细分析。
3.3 metadataService.publishServiceDefinition(consumerURL)
该部分是将当前服务的 ServiceDefinition 发布到 WritableMetadataService 中,需要注意的是,当前这个 WritableMetadataService 的类型并非是 dubbo.application.metadata-type = local 来控制,而是通过参数中的 metadata 属性,如 dubbo.application.parameters.metadata ,默认情况下为 local 。
String metadata = map.get(METADATA_KEY);
WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
if (metadataService != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataService.publishServiceDefinition(consumerURL);
}
需要注意:
如果服务自省模式下使用远程元数据中心 (dubbo.application.metadata-type=remote ) 的情况下 并且,这里将元数据设置为了远程( dubbo.application.parameters.metadata = remote )会产生冲突,因为其需要在元数据中心上创建的节点路径相同。
下面以ZK 作为元数据中心说明:
-
dubbo.application.parameters.metadata = remote 时会在元数据中心上创建 ServiceDefinition 的相关节点,如对于ProviderService其节点路径为 /dubbo/metadata/com.kingfish.service.ProviderService/2.0.0/spring , 节点值如下: {
"canonicalName": "com.kingfish.service.ProviderService",
"codeSource": "file:/E:/HKingFish/dubbo-demo/simple-spring-duboo/simple-api/target/classes/",
"methods": [
{
"name": "sayHelloWorld",
"parameterTypes": [],
"returnType": "java.lang.String"
},
{
"name": "sayHello",
"parameterTypes": [
"java.lang.String"
],
"returnType": "void"
},
{
"name": "sayHello",
"parameterTypes": [],
"returnType": "void"
}
],
"types": [
{
"type": "int",
"typeBuilderName": "org.apache.dubbo.metadata.definition.builder.DefaultTypeBuilder"
},
{
"type": "char",
"typeBuilderName": "org.apache.dubbo.metadata.definition.builder.DefaultTypeBuilder"
},
{
"type": "java.lang.String",
"typeBuilderName": "org.apache.dubbo.metadata.definition.builder.DefaultTypeBuilder"
},
{
"type": "void",
"typeBuilderName": "org.apache.dubbo.metadata.definition.builder.DefaultTypeBuilder"
}
]
}
-
dubbo.application.metadata-type=remote 时会在元数据中心上创建 dubbo service的相关节点,如 /dubbo/metadata/com.kingfish.service.ProviderService/2.0.0/spring/provider/dubbo/revision3124311290257149701 。节点值为如下 : dubbo%3A%2F%2F192.168.110.57%3A20880%2Forg.apache.dubbo.metadata.MetadataService%3Fanyhost%3Dtrue%26application%3Dsimple-provider%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26group%3Dsimple-provider%26interface%3Dorg.apache.dubbo.metadata.MetadataService%26logger%3Dslf4j%26metadata.type%3Dremote%26methods%3DgetExportedURLs%2CtoURLs%2CserviceName%2CisMetadataServiceURL%2Cversion%2CgetSubscribedURLs%2CtoSortedStrings%2CgetServiceDefinition%26pid%3D35544%26release%3D2.7.5%26revision%3D2.7.5%26side%3Dprovider%26timestamp%3D1637305980358%26version%3D1.0.0
可以看到,两种模式都需要对 /dubbo/metadata/com.kingfish.service.ProviderService/2.0.0 节点处理,因此会产生冲突。
3.4 PROXY_FACTORY.getProxy(invoker);
这里默认会通过 JavassistProxyFactory#getProxy 来创建代理对象。该内容在 Dubbo笔记 ⑧ : 消费者启动流程 - ReferenceConfig#get 中有过详细介绍,这里不再赘述。
三、ServiceDiscoveryRegistryProtocol#refer
下面我们来看一下 ServiceDiscoveryRegistryProtocol#refer 的具体实现,其实现在父类 RegistryProtocol 中,如下:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = getRegistryUrl(url);
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}
这里我们看 RegistryProtocol#doRefer 实现如下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
return invoker;
}
RegistryProtocol#refer 中短短几行代码,却囊括了 Dubbo 集群容错的逻辑。下面我们按照注释的顺序进行分析。
1. RegistryService#register
这里的实现是 ServiceDiscoveryRegistry#register,其逻辑同提供者,由于当前是消费者,所以并不会进行注册。如下:
@Override
public final void register(URL url) {
if (!shouldRegister(url)) {
return;
}
super.register(url);
}
2. RegistryDirectory#buildRouterChain
保存路由链。关于 Dubbo 的 Router 相关内容,如有需要详参:Dubbo笔记 ? :Dubbo集群组件 之 Router
public void buildRouterChain(URL url) {
this.setRouterChain(RouterChain.buildChain(url));
}
public void setRouterChain(RouterChain<T> routerChain) {
this.routerChain = routerChain;
}
3. RegistryDirectory#subscribe
RegistryDirectory#subscribe 是对节点的订阅,也是我们需要分析的核心逻辑。其实现如下:
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}
其中 ServiceDiscoveryRegistry#subscribe 实现如下:
@Override
public final void subscribe(URL url, NotifyListener listener) {
if (!shouldSubscribe(url)) {
return;
}
super.subscribe(url, listener);
}
@Override
public void doSubscribe(URL url, NotifyListener listener) {
subscribeURLs(url, listener);
}
ServiceDiscoveryRegistry#subscribeURLs 在 ServiceDiscoveryRegistry中有多个重载方法,该处调用的代码如下:
protected void subscribeURLs(URL url, NotifyListener listener) {
writableMetadataService.subscribeURL(url);
Set<String> serviceNames = getServices(url);
if (CollectionUtils.isEmpty(serviceNames)) {
throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + url);
}
serviceNames.forEach(serviceName -> subscribeURLs(url, listener, serviceName));
}
上面我们可以看到其逻辑大致如下:
- 订阅当前 URL : 实际上是将当前 URL 保存到本地元数据中心中。
- 获取提供当前需要的接口服务的 应用名
- 对提供服务的应用进行遍历订阅,获取应用提供的服务,并进行订阅。
下面我们按照注释顺序来一步一步分析:
3.1 writableMetadataService.subscribeURL(url)
这里的 writableMetadataService 默认为 InMemoryWritableMetadataService,受 dubbo.registry.parameters.dubbo.metadata.storage-type 参数控制。
下面我们来看InMemoryWritableMetadataService 和 RemoteWritableMetadataService 的实现
-
InMemoryWritableMetadataService#subscribeURL
@Override
public boolean subscribeURL(URL url) {
return addURL(subscribedServiceURLs, url);
}
boolean addURL(Map<String, SortedSet<URL>> serviceURLs, URL url) {
return executeMutually(() -> {
SortedSet<URL> urls = serviceURLs.computeIfAbsent(url.getServiceKey(), this::newSortedURLs);
return urls.add(url);
});
}
-
RemoteWritableMetadataService#subscribeURL @Override
public boolean subscribeURL(URL url) {
return true;
}
3.2 getServices(url)
参数中的 url 中保存了要调用的 Dubbo Service 信息,此时需要通过ServiceDiscoveryRegistry#getServices 来获取提供该服务的应用名。我们来看其实现过程:
protected Set<String> getServices(URL subscribedURL) {
Set<String> subscribedServices = new LinkedHashSet<>();
String serviceNames = subscribedURL.getParameter(PROVIDED_BY);
if (StringUtils.isNotEmpty(serviceNames)) {
subscribedServices = parseServices(serviceNames);
}
if (isEmpty(subscribedServices)) {
subscribedServices = findMappedServices(subscribedURL);
if (isEmpty(subscribedServices)) {
subscribedServices = getSubscribedServices();
}
}
return subscribedServices;
}
public Set<String> getSubscribedServices() {
return subscribedServices;
}
这里可以看到,ServiceName 的查找有优先级:
- 首先查找消费者通过 provided-by 参数指定的应用。
- 其次查找映射的应用。我们在提供者的文章中提到,提供者启动后,dubbo会在配置中心创建映射节点(dubbo/config/mapping 节点下的映射关系),这里便是通过其映射节点找到对应服务。
- 最后使用注册表 url 中指定的服务。
下面我们来看一下 ServiceDiscoveryRegistry#findMappedServices 的实现:
protected Set<String> findMappedServices(URL subscribedURL) {
String serviceInterface = subscribedURL.getServiceInterface();
String group = subscribedURL.getParameter(GROUP_KEY);
String version = subscribedURL.getParameter(VERSION_KEY);
String protocol = subscribedURL.getParameter(PROTOCOL_KEY, DUBBO_PROTOCOL);
return serviceNameMapping.get(serviceInterface, group, version, protocol);
}
其中 ServiceNameMapping#get 的实现类为 DynamicConfigurationServiceNameMapping,其实现如下:
@Override
public Set<String> get(String serviceInterface, String group, String version, String protocol) {
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
Set<String> serviceNames = new LinkedHashSet<>();
execute(() -> {
Set<String> keys = dynamicConfiguration.getConfigKeys(buildGroup(serviceInterface, group, version, protocol));
serviceNames.addAll(keys);
});
return Collections.unmodifiableSet(serviceNames);
}
综上这里会查找 /dubbo/config/mapping 下面的节点,根据接口名获取对应的应用名(该节点是提供者在启动时监听 ServiceConfigExportedEvent 事件注册的,如有需要详参 Dubbo笔记 ? : DubboBootstrap 的服务暴露)。如下图:
3.3 subscribeURLs(url, listener, serviceName)
经过上面两步,现在已经知道了可以提供当前服务的应用名称,这里会对所有的应用名称进行遍历订阅,以获取应用的信息。如下:
serviceNames.forEach(serviceName -> subscribeURLs(url, listener, serviceName));
ServiceDiscoveryRegistry#subscribeURLs 实现如下,关于该方法的分析再第四部分会进行介绍。
protected void subscribeURLs(URL url, NotifyListener listener, String serviceName) {
List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
subscribeURLs(url, listener, serviceName, serviceInstances);
registerServiceInstancesChangedListener(url, new ServiceInstancesChangedListener(serviceName) {
@Override
public void onEvent(ServiceInstancesChangedEvent event) {
subscribeURLs(url, listener, event.getServiceName(), new ArrayList<>(event.getServiceInstances()));
}
});
}
4. Cluster#join
关于 Dubbo 集群容错的内容,如有需要,详参: Dubbo笔记 ? :Dubbo集群组件 之 Router
四、ServiceDiscoveryRegistry#subscribeURLsURL, NotifyListener, String)
上面我们提到消费者通过配置中心的映射节点找到了提供服务的应用。这里便对应用进行订阅获取信息,其实现如下:
protected void subscribeURLs(URL url, NotifyListener listener, String serviceName) {
List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
subscribeURLs(url, listener, serviceName, serviceInstances);
registerServiceInstancesChangedListener(url, new ServiceInstancesChangedListener(serviceName) {
@Override
public void onEvent(ServiceInstancesChangedEvent event) {
subscribeURLs(url, listener, event.getServiceName(), new ArrayList<>(event.getServiceInstances()));
}
});
}
这里按照注释顺序为:
- 根据 serviceName 从元数据中心获取到应用的元数据信息并包装成 ServiceInstance
- 根据 ServiceInstance 的信息,获取元数据中心中发布的 接口服务信息,并进行订阅
- 注册 ServiceInstance 的监听器,当 ServiceInstance 发生变化时可以感知并同步。
下面我们来具体看:
1. serviceDiscovery.getInstances(serviceName);
serviceDiscovery.getInstances(serviceName); 的作用是根据 serviceName 从 元数据中心上获取应用的元数据信息。这里的调用链如下
EventPublishingServiceDiscovery#getInstances -> ZookeeperServiceDiscovery#getInstances
我们这里直接来看 ZookeeperServiceDiscovery#getInstances :
@Override
public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
return doInServiceDiscovery(s -> build(s.queryForInstances(serviceName)));
}
其中 s.queryForInstances(serviceName) 调用的是 ServiceDiscoveryImpl#queryForInstances(java.lang.String),其实现如下:
@Override
public Collection<ServiceInstance<T>> queryForInstances(String name) throws Exception
{
return queryForInstances(name, null);
}
List<ServiceInstance<T>> queryForInstances(String name, Watcher watcher) throws Exception
{
ImmutableList.Builder<ServiceInstance<T>> builder = ImmutableList.builder();
String path = pathForName(name);
List<String> instanceIds;
if ( watcher != null )
{
instanceIds = getChildrenWatched(path, watcher, true);
}
else
{
try
{
instanceIds = client.getChildren().forPath(path);
}
catch ( KeeperException.NoNodeException e )
{
instanceIds = Lists.newArrayList();
}
}
for ( String id : instanceIds )
{
ServiceInstance<T> instance = queryForInstance(name, id);
if ( instance != null )
{
builder.add(instance);
}
}
return builder.build();
}
可以看到 : 这一步的目的就是获取提供当前接口服务的所有应用的元数据信息。在上一步获取到了提供当前接口服务的应用名称后,这一步去获取 service/{应用名} 节点下所有的应用,并获取其元数据信息。
注: 如下图(存在一台机器 192.168.100.57:9999 提供了 simple-provider 的应用服务) : 其中 service/simple-provider/192.168.100.57:9999 节点的值如下:
{
"name": "simple-provider",
"id": "192.168.110.57:9999",
"address": "192.168.110.57",
"port": 9999,
"sslPort": null,
"payload": {
"@class": "org.apache.dubbo.registry.zookeeper.ZookeeperInstance",
"id": null,
"name": "simple-provider",
"metadata": {
"dubbo.metadata-service.url-params": "{\"dubbo\":{\"version\":\"1.0.0\",\"dubbo\":\"2.0.2\",\"release\":\"2.7.5\",\"port\":\"20880\"}}",
"dubbo.subscribed-services.revision": "N/A",
"dubbo.endpoints": "[{\"port\":9999,\"protocol\":\"dubbo\"}]",
"dubbo.metadata.storage-type": "remote",
"dubbo.exported-services.revision": "3124311290257149701"
}
},
"registrationTimeUTC": 1637310981486,
"serviceType": "DYNAMIC",
"uriSpec": null
}
2. subscribeURLs(url, listener, serviceName, serviceInstances);
这里调用的方法是:
ServiceDiscoveryRegistry#subscribeURLs(org.apache.dubbo.common.URL, org.apache.dubbo.registry.NotifyListener, java.lang.String, java.util.Collection<org.apache.dubbo.registry.client.ServiceInstance>)
其实现如下:
protected void subscribeURLs(URL subscribedURL, NotifyListener listener, String serviceName,
Collection<ServiceInstance> serviceInstances) {
if (isEmpty(serviceInstances)) {
return;
}
List<URL> subscribedURLs = new LinkedList<>();
subscribedURLs.addAll(getExportedURLs(subscribedURL, serviceInstances));
if (subscribedURLs.isEmpty()) {
subscribedURLs.addAll(synthesizeSubscribedURLs(subscribedURL, serviceInstances));
}
listener.notify(subscribedURLs);
}
这里我们关注两点:
- getExportedURLs(subscribedURL, serviceInstances) : 获取应用实例暴露的服务接口的URL 信息。
- listener.notify(subscribedURLs) :该方法会对 subscribedURLs 分类并解析,以更新本地的 Invoker 列表。如有需要详参: Dubbo笔记 ⑨ : 消费者启动流程 - RegistryProtocol#refer
这里我们关注 getExportedURLs(subscribedURL, serviceInstances) 实现如下:
private List<URL> getExportedURLs(URL subscribedURL, Collection<ServiceInstance> instances) {
List<ServiceInstance> serviceInstances = instances.stream()
.filter(ServiceInstance::isEnabled)
.filter(ServiceInstance::isHealthy)
.filter(ServiceInstanceMetadataUtils::isDubboServiceInstance)
.collect(Collectors.toList());
int size = serviceInstances.size();
if (size == 0) {
return emptyList();
}
prepareServiceRevisionExportedURLs(serviceInstances);
List<URL> subscribedURLs = cloneExportedURLs(subscribedURL, serviceInstances);
serviceInstances.clear();
return subscribedURLs;
}
我们按照注释继续往下看:
2.1 prepareServiceRevisionExportedURLs(serviceInstances);
ServiceDiscoveryRegistry#prepareServiceRevisionExportedURLs 其实现如下:
private void prepareServiceRevisionExportedURLs(List<ServiceInstance> serviceInstances) {
executeExclusively(() -> {
expungeStaleRevisionExportedURLs(serviceInstances);
initializeRevisionExportedURLs(serviceInstances);
});
}
当应用的配置不同时,其 revision 也不相同。同样的,如果应用有配置修改,其revision 也会变更,原先的revision 就已经过期。这一步的目的就是清除过期的应用revision,初始化当前 serviceInstances 的 url。
关于 Dubbo 服务修订版本,以下内容部分于引用 Apache dubbo 服务自省架构设计 :
当业务出现变化时,Dubbo Service 的 Dubbo 服务也会随之升级。通常,Provider 先行升级,Consumer 随后跟进。服务的升级可以包括
-
Dubbo 服务 interface 升级 :由于 Dubbo 基于 Java 接口来暴露服务,同时 Java 接口通常在 Dubbo 微服务中又是唯一的。如果 interface 的全类名调整的话,那么,相当于 com.acme.Interface1 做下线处理,Consumer 将无法消费到该 Dubbo 服务,这种情况不予考虑。如果是 Provider 新增服务接口的话,那么 com.acme.Interface1 则并没有变化,也无需考虑。所以,有且仅有一种情况考虑,即“Dubbo interface 方法声明升级”,包括:增加服务方法、删除服务方法、修改方法签名 -
Dubbo 服务 group、version 和 protocol 升级 :假设 P1 在升级过程中,新的服务实例部署仅存在调整 group 后的 Dubbo 服务,如 dubbo:com.acme.Interface1:v1:test ,那么这种升级就是不兼容升级,在新老交替过程中,Consumer 仅能消费到老版本的 Dubbo 服务。当新版本完全部署完成后,Consumer 将无法正常服务调用。如果,新版本中 P1 同时部署了 dubbo:com.acme.Interface1:v1:default 和 dubbo:com.acme.Interface1:v1:test 的话,相当于 group 并无变化。同理,version 和 protocol 变化,相当于 Dubbo 服务 ID 变化,这类情况无需处理。 -
Dubbo 服务元数据升级 : 这是一种比较特殊的升级方法,即 Provider 所有服务实例 Dubbo 服务 ID 相同,然而 Dubbo 服务的参数在不同版本服务实例存在差异,假设 Dubbo Service P1 部署 5 台服务,其中 3 台服务实例设置 timeout 为 1000 ms,其余 2 台 timeout 为 3000 ms。换言之,P1 拥有两个版本(状态)的 MetadataService 。
综上所述,无论是 Dubbo interface 方法声明升级,还是 Dubbo 服务元数据升级,均可认为是 Dubbo 服务升级的因子,这些因子所计算出来的数值称之为“Dubbo 服务修订版本”,服务自省架构将其命名为“revision”。架构设设计上,当 Dubbo Service 增加或删除服务方法、修改方法签名以及调整 Dubbo 服务元数据,revision 也会随之变化,revision 数据将存放在其 Dubbo 服务实例的 metadata 中。当 Consumer 订阅 Provider Dubbo 服务元信息时,MetadataService 远程调用的次数取决于服务实例列表中出现 revision 的个数,整体执行流程如下图所示:
2.1.1 ServiceDiscoveryRegistry#expungeStaleRevisionExportedURLs
ServiceDiscoveryRegistry#expungeStaleRevisionExportedURLs 用于清除过期的版本信息。其实现如下:
private final Map<String, Map<String, List<URL>>> serviceRevisionExportedURLsCache = new LinkedHashMap<>();
private void expungeStaleRevisionExportedURLs(List<ServiceInstance> serviceInstances) {
String serviceName = serviceInstances.get(0).getServiceName();
Map<String, List<URL>> revisionExportedURLsMap = getRevisionExportedURLsMap(serviceName);
if (revisionExportedURLsMap.isEmpty()) {
return;
}
Set<String> existedRevisions = revisionExportedURLsMap.keySet();
Set<String> currentRevisions = serviceInstances.stream()
.map(ServiceInstanceMetadataUtils::getExportedServicesRevision)
.collect(Collectors.toSet());
Set<String> staleRevisions = new HashSet<>(existedRevisions);
staleRevisions.removeAll(currentRevisions);
staleRevisions.forEach(revisionExportedURLsMap::remove);
}
2.1.2 ServiceDiscoveryRegistry#initializeRevisionExportedURLs(List)
ServiceDiscoveryRegistry#initializeRevisionExportedURLs(List) 在这里会查找所有serviceInstances 暴露的服务 URL 并缓存到 ServiceDiscoveryRegistry#serviceRevisionExportedURLsCache中。 其中 ServiceDiscoveryRegistry#serviceRevisionExportedURLsCache 结构如下:
private final Map<String, Map<String, List<URL>>> serviceRevisionExportedURLsCache = new LinkedHashMap<>();
ServiceDiscoveryRegistry#initializeRevisionExportedURLs(List) 实现如下:
private void initializeRevisionExportedURLs(List<ServiceInstance> serviceInstances) {
initializeSelectedRevisionExportedURLs(serviceInstances);
serviceInstances.forEach(this::initializeRevisionExportedURLs);
}
-
ServiceDiscoveryRegistry#initializeSelectedRevisionExportedURLs private void initializeSelectedRevisionExportedURLs(List<ServiceInstance> serviceInstances) {
for (int i = 0; i < serviceInstances.size(); i++) {
ServiceInstance selectedInstance = selectServiceInstance(serviceInstances);
List<URL> revisionExportedURLs = initializeRevisionExportedURLs(selectedInstance);
if (isNotEmpty(revisionExportedURLs)) {
break;
}
}
}
private ServiceInstance selectServiceInstance(List<ServiceInstance> serviceInstances) {
int size = serviceInstances.size();
if (size == 0) {
return null;
} else if (size == 1) {
return serviceInstances.get(0);
}
ServiceInstanceSelector selector = getExtensionLoader(ServiceInstanceSelector.class).getAdaptiveExtension();
return selector.select(getUrl(), serviceInstances);
}
-
ServiceDiscoveryRegistry#initializeRevisionExportedURLs(ServiceInstance) private List<URL> initializeRevisionExportedURLs(ServiceInstance serviceInstance) {
if (serviceInstance == null) {
return emptyList();
}
String serviceName = serviceInstance.getServiceName();
String revision = getExportedServicesRevision(serviceInstance);
Map<String, List<URL>> revisionExportedURLsMap = getRevisionExportedURLsMap(serviceName);
List<URL> revisionExportedURLs = revisionExportedURLsMap.get(revision);
boolean firstGet = false;
if (revisionExportedURLs == null) {
if (!revisionExportedURLsMap.isEmpty()) {
... 日志打印
} else {
firstGet = true;
}
revisionExportedURLs = getExportedURLs(serviceInstance);
if (revisionExportedURLs != null) {
revisionExportedURLsMap.put(revision, revisionExportedURLs);
}
}
} else {
}
return revisionExportedURLs;
}
2.2 cloneExportedURLs(subscribedURL, serviceInstances);
上一步已经获取到了 ServiceInstance 所暴露的 url。这一步则是 ServiceDiscoveryRegistry#cloneExportedURLs 会从 ServiceInstance 所包括的 URL 中筛选出满足当前服务接口的 URL,并进行克隆后返回。具体实现如下:
private List<URL> cloneExportedURLs(URL subscribedURL, Collection<ServiceInstance> serviceInstances) {
if (isEmpty(serviceInstances)) {
return emptyList();
}
List<URL> clonedExportedURLs = new LinkedList<>();
serviceInstances.forEach(serviceInstance -> {
String host = serviceInstance.getHost();
getTemplateExportedURLs(subscribedURL, serviceInstance)
.stream()
.map(templateURL -> templateURL.removeParameter(TIMESTAMP_KEY))
.map(templateURL -> templateURL.removeParameter(PID_KEY))
.map(templateURL -> {
String protocol = templateURL.getProtocol();
int port = getProtocolPort(serviceInstance, protocol);
if (Objects.equals(templateURL.getHost(), host)
&& Objects.equals(templateURL.getPort(), port)) {
return templateURL;
}
URLBuilder clonedURLBuilder = from(templateURL)
.setHost(host)
.setPort(port);
return clonedURLBuilder.build();
})
.forEach(clonedExportedURLs::add);
});
return clonedExportedURLs;
}
3. registerServiceInstancesChangedListener
registerServiceInstancesChangedListener 会订阅 ServiceInstance 节点信息,当 ServiceInstance 实例有变化时可以感知并更新本地的缓存信息。
registerServiceInstancesChangedListener(url, new ServiceInstancesChangedListener(serviceName) {
@Override
public void onEvent(ServiceInstancesChangedEvent event) {
subscribeURLs(url, listener, event.getServiceName(), new ArrayList<>(event.getServiceInstances()));
}
});
其中 ServiceDiscoveryRegistry#registerServiceInstancesChangedListener 的实现如下:
private void registerServiceInstancesChangedListener(URL url, ServiceInstancesChangedListener listener) {
String listenerId = createListenerId(url, listener);
if (registeredListeners.add(listenerId)) {
serviceDiscovery.addServiceInstancesChangedListener(listener);
}
}
private String createListenerId(URL url, ServiceInstancesChangedListener listener) {
return listener.getServiceName() + ":" + url.toString(VERSION_KEY, GROUP_KEY, PROTOCOL_KEY);
}
这里可以看到, 消费者通过 EventPublishingServiceDiscovery#addServiceInstancesChangedListener -> ZookeeperServiceDiscovery#addServiceInstancesChangedListener 监听注册中心上的应用节点(/service/{应用名称}),当节点发生变化时调用 ServiceDiscoveryRegistry#subscribeURLs(org.apache.dubbo.common.URL, org.apache.dubbo.registry.NotifyListener, java.lang.String, java.util.Collection<org.apache.dubbo.registry.client.ServiceInstance>) 来更新本地 URL 列表。
五、总结
注册中心、元数据中心、配置中心的三者的配合逻辑,如下:
- 首先获取 配置中心 上的映射节点,来获取提供该接口服务的应用名称。
- 根据获取到的应用名称,获取注册中心上的应用元数据。
- 根据应用元数据,确定应用的元数据中心地址,并从元数据中心获取接口服务的URL信息。
其中关于配置中心,在 Apache-Dubbo-服务自省架构设计 中提到过本地化映射配置,但【该特性并未在最新 Dubbo 2.7.6 全面发布,部分特性已在 Dubbo Spring Cloud 中发布】。
整个消费者逻辑的总结如下:
- 消费者服务启用后,要引用 Dubbo 接口服务 A。则程序会通过 ReferenceBean#get 来获取A的代理对象 AProxy,该代理对象会建立与提供者的服务的网络连接,所有的调用都会通过该代理对象发送给提供者并接收结果。
- ReferenceBean#get 通过 ReferenceConfig#createProxy 来创建代理对象。
- 在 ReferenceConfig#createProxy 中首先会获取 注册中心地址。此时由于我们设置了
dubbo.registry.parameters.registry-type = service ,所以注册中心的协议类型为 service-discovery-registry 。所以会通过 ServiceDiscoveryRegistryProtocol#refer 来进行下一步处理。 - ServiceDiscoveryRegistryProtocol#refer 中首先注册自身、构建路由链。随后通过 ServiceDiscoveryRegistryProtocol#subscribe 来订阅服务。
- 在 ServiceDiscoveryRegistryProtocol#subscribe 中会通过 ServiceDiscoveryRegistry#getServices 从配置中心节点获取到服务接口和应用的映射关系,这一步结束后便获取到提供当前接口服务的应用名集合。
- 随后通过 ServiceDiscoveryRegistry#subscribeURLs 来进行订阅。
- ServiceDiscoveryRegistry#subscribeURLs 中首先会通过应用名从注册中心上获取应用信息构建出来应用实例对象。
- 之后会清除自身过期的缓存接口服务信息,并通过应用实例来获取到元数据中心地址,从元数据中心获取到当前应用提供的接口服务URL并克隆到本地。至此,消费者获取到了应用实例提供的所有接口服务URL。
- 之后通过 ServiceDiscoveryRegistry#registerServiceInstancesChangedListener 对应用实例进行订阅,当应用配置改变时,消费者可以感知并更新本地缓存配置。
以上:内容部分参考 Apache dubbo 服务自省架构设计 如有侵扰,联系删除。 内容仅用于自我记录学习使用。如有错误,欢迎指正
|