"2021SC@SDUSC"
一、前言
如果没有Rpc框架,我们需要网络编程代码调用远程服务,同时对于请求参数和响应结果需要手动进行序列化和反序列化。这些过程对于没有接触过网络编程的程序员来说有一定的难度。Dubbo服务引用的目的就是将远程调用的网络编程隐藏在框架当中,让程序员像调用本地服务一样调用远程服务。Dubbo提供的功能不仅如此,同时提供了服务发现,集群容错,负载均衡,服务降级等功能。
如下图是dubbo服务引用的整体流程,在引用服务时,dubbo首先会从注册中心获取注册服务的url信息,并利用protocol将url信息转换为Invoker,以此屏蔽网络调用的流程。经过第一个步骤之后,我们获取到了invoker集合,dubbo使用集群策略和负载均衡将invoker集合转换为一个invoker对外服务。现在的Invoker已经能够实现远程调用,但是使用起来不方便,Dubbo使用ProxyFactory将Invoker转换为用户所需要的接口。
二、Dubbo服务引用
在Dubbo中提供者负责服务的导出和发布,而消费着负责订阅服务和服务的导入。在 Dubbo 中,我们可以通过两种方式引用远程服务。第一种是使用服务直连的方式引用服务,第二种方式是基于注册中心进行引用。 Dubbo 服务引用的时机有两个,第一个是在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,第二个是在 ReferenceBean 对应的服务被注入到其他类中时引用。这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。默认情况下,Dubbo 使用懒汉式引用服务。 下边是官网的一个服务引用的时序图:
ReferenceConfig:通过get方法其实是进入到ReferenceConfig类中执行init()方法。在这个方法里主要做了下面几件事情:
-
1、对@Reference标注的接口查看是否合法,检查该接口是不是存在泛型。 -
2、在系统中拿到dubbo.resolve.file这个文件,这个文件是进行配置consumer的接口的。将配置好的consumer信息存到URL中。 -
3、将配置好的ApplicationConfig、ConsumerConfig、ReferenceConfig、MethodConfig,以及消费者的IP地址存到系统的上下文中。 -
4、接下来开始创建代理对象进入到ReferenceConfig的createProxy 。这里还是在ReferenceConfig类中。上面的那些配置统统传入该方法中。上面有提到resolve解析consumer为URL,现在就根据这个URL首先判断是否远程调用还是本地调用。
- 4.1、若是本地调用,则调用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 实例。
- 4.2、若是远程调用,则读取直连配置项,或注册中心 url,并将读取到的 url 存储到 urls 中。然后根据 urls 元素数量进行后续操作。若 urls 元素数量为1,则直接通过 Protocol 自适应拓展类即RegistryProtocol类或者DubboProtocol构建 Invoker 实例接口,这得看URL前面的是registry://开头,还是以dubbo://。若 urls 元素数量大于1,即存在多个注册中心或服务直连 url,此时先根据 url 构建 Invoker。然后再通过 Cluster 合并即merge多个 Invoker,最后调用 ProxyFactory 生成代理类。
RegistryProtocol:在refer方法中首先为 url 设置协议头,然后根据 url 参数加载注册中心实例。然后获取 group 配置,根据 group 配置决定 doRefer 第一个参数的类型。doRefer 方法创建一个 RegistryDirectory 实例,然后生成服务消费者链接,通过registry.register方法向注册中心注册消费者的链接,然后通过directory.subscribe向注册中心订阅 providers、configurators、routers 等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。
ProxyFactory:Invoker 创建完毕后,接下来要做的事情是为服务接口生成代理对象。有了代理对象,即可进行远程调用。代理对象生成的入口方法为的getProxy。获取需要创建的接口列表,组合成数组。而后将该接口数组传入 Proxy 的 getProxy 方法获取 Proxy 子类,然后创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例。InvokerInvocationHandler 实现 JDK 的 InvocationHandler 接口,具体的用途是拦截接口类调用。可以理解为AOP或拦截器。也就是在获取该对象之前会调用到Proxy实例而不会调用到服务提供者对应的类。
三、Dubbo源码
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean,
ApplicationContextAware, InitializingBean, DisposableBean {
@Override
public void afterPropertiesSet() throws Exception {
// 初始化dubbo的配置
prepareDubboConfigBeans();
// 默认使用懒汉加载
if (init == null) {
init = false;
}
// 饿汉加载,即时引入服务
if (shouldInit()) {
getObject();
}
}
@Override
public Object getObject() {
return get();
}
}
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
private transient volatile T ref;
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
// 检测 ref 是否为空,为空则通过 init 方法创建
if (ref == null) {
// 启动初始化操作 init 方法主要用于处理配置,以及调用 createProxy 生成代理类
init();
}
return ref;
}
}
在进行具体工作之前,需先进行配置检查与收集工作。接着根据收集到的信息决定服务用的方式,有三种,
- 1、第一种是引用本地 (JVM) 服务。
- 2、第二是通过直连方式引用远程服务。
- 3、第三是通过注册中心引用远程服务。
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
private transient volatile T ref;
private transient volatile boolean initialized;
private DubboBootstrap bootstrap;
public synchronized void init() {
//避免重复加载
if (initialized) {
return;
}
//获取Dubbo核心容器
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
//进行Dubbo核心配置的加载和检查
bootstrap.initialize();
}
//在对象创建后在使用其他配置模块配置对象之前检查对象配置并重写默认配置
checkAndUpdateSubConfigs();
//检查并生成sub配置和Local配置是否合法
checkStubAndLocal(interfaceClass);
//判断对象是否有mock并生成mock信息
ConfigValidationUtils.checkMock(interfaceClass, this);
//保存对象属性map信息
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, CONSUMER_SIDE);
//添加版本信息,包含dubbo版本,release版本,timestamp运行时间戳和sid_key等信息
ReferenceConfigBase.appendRuntimeParameters(map);
//添加泛型 revision信息
if (!ProtocolUtils.isGeneric(generic)) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
//生成服务的代理对象,跟服务导出是一样,通过代理对象来代理,返回代理方法
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
//添加需要代理的方法
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
}
}
//添加interface名
map.put(INTERFACE_KEY, interfaceName);
//添加重试信息
AbstractConfig.appendParameters(map, getMetrics());
//检查获取并添加Application信息
AbstractConfig.appendParameters(map, getApplication());
//检查获取并添加Module信息
AbstractConfig.appendParameters(map, getModule());
// remove 'default.' prefix for configs from ConsumerConfig
// appendParameters(map, consumer, Constants.DEFAULT_KEY);
//检查获取并添加consumer信息
AbstractConfig.appendParameters(map, consumer);
AbstractConfig.appendParameters(map, this);
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}
//设置方法重试信息并收集方法异步调用信息
Map<String, AsyncMethodInfo> attributes = null;
if (CollectionUtils.isNotEmpty(getMethods())) {
attributes = new HashMap<>();
for (MethodConfig methodConfig : getMethods()) {
AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
String retryKey = methodConfig.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(methodConfig.getName() + ".retries", "0");
}
}
AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
if (asyncMethodInfo != null) {
// consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
attributes.put(methodConfig.getName(), asyncMethodInfo);
}
}
}
//获取服务消费者 ip 地址
String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
if (StringUtils.isEmpty(hostToRegistry)) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
//添加服务注册信息
map.put(REGISTER_IP_KEY, hostToRegistry);
//将配置保存如服务元信息中
serviceMetadata.getAttachments().putAll(map);
//创建代理
ref = createProxy(map);
serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
// 根据服务名,ReferenceConfig,代理类构建 ConsumerModel,
// 并将 ConsumerModel 存入到 ApplicationModel 中
ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
consumerModel.setProxyObject(ref);
consumerModel.init(attributes);
initialized = true;
//检查引入的服务是否可用
checkInvokerAvailable();
// dispatch a ReferenceConfigInitializedEvent since 2.7.4
dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}
}
在服务对象创建后会进行很多对象属性配置的合法性检查,我们看一下checkAndUpdateSubConfigs中都做了哪些配置的检查
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
public void checkAndUpdateSubConfigs() {
if (StringUtils.isEmpty(interfaceName)) {
throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
}
//完成服务接口上 Application、Module、Registries和Monitor的设置
completeCompoundConfigs(consumer);
if (consumer != null) {
if (StringUtils.isEmpty(registryIds)) {
setRegistryIds(consumer.getRegistryIds());
}
}
// get consumer's global configuration
// 获取 consumer 的全局默认配置
checkDefault();
// init some null configuration.
List<ConfigInitializer> configInitializers = ExtensionLoader.getExtensionLoader(ConfigInitializer.class)
.getActivateExtension(URL.valueOf("configInitializer://"), (String[]) null);
configInitializers.forEach(e -> e.initReferConfig(this));
//检查配置方法方法参数
this.refresh();
if (getGeneric() == null && getConsumer() != null) {
setGeneric(getConsumer().getGeneric());
}
//泛型配置
if (ProtocolUtils.isGeneric(generic)) {
interfaceClass = GenericService.class;
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, getMethods());
}
//init serivceMetadata
//初始化元务原数据
serviceMetadata.setVersion(getVersion());
serviceMetadata.setGroup(getGroup());
serviceMetadata.setDefaultGroup(getGroup());
serviceMetadata.setServiceType(getActualInterface());
serviceMetadata.setServiceInterfaceName(interfaceName);
// 创建服务URL对象
serviceMetadata.setServiceKey(URL.buildKey(interfaceName, group, version));
//配置中心服务订阅
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(interfaceClass);
repository.registerConsumer(
serviceMetadata.getServiceKey(),
serviceDescriptor,
this,
null,
serviceMetadata);
//dubbo.resolve.file文件中获取服务url配置信息
resolveFile();
//再次验证检查配置
ConfigValidationUtils.validateReferenceConfig(this);
//postProcessConfig配置调用
postProcessConfig();
}
}
再消费者服务配置信息检查并通过默认设置完整后,接下来就是创建代理对象了,即ref = createProxy(map);
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
private T createProxy(Map<String, String> map) {
//jvm本地引入
if (shouldJvmRefer(map)) {
//使用本地地址构建url信息
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
//本地引用invoker生成
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
urls.clear();
// 用户配置url信息,表明用户可能想进行点对点调用
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
// 当需要配置多个 url 时,可用分号进行分割,这里会进行切分
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
// 设置接口全限定名为 url 路径
url = url.setPath(interfaceName);
}
// 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心
if (UrlUtils.isRegistry(url)) {
// 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
// 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
// 最后将合并后的配置设置为 url 查询字符串中。
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// 从注册中心的配置中组装url信息
// if protocols not injvm checkRegistry
// 如果协议不是在jvm本地中
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
//检查注册中心是否存在(如果当前配置不存在则获取服务默认配置),然后将他们转换到RegistryConfig中
checkRegistry();
//通过注册中心配置信息组装URL
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
//添加monitor监控信息
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
//单个注册中心或服务提供者(服务直连,下同)
if (urls.size() == 1) {
// 调用 RegistryProtocol 的 refer 构建 Invoker 实例
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
//多个注册中心或多个服务提供者,或者两者混合
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 获取所有的 Invoker
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
// 保存使用注册中心的最新的URL信息
registryURL = url; // use last registry url
}
}
// 注册中心URL存在
if (registryURL != null) { // registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
// 对于对区域订阅方案,默认使用"zone-aware"区域
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
// invoker 包装顺序: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
} else { // not a registry url, must be direct invoke.
// 如果不存在注册中心连接,只能使用直连
//如果订阅区域未设置,则设置为默认区域"zone-aware"
String cluster = CollectionUtils.isNotEmpty(invokers)
? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT)
: Cluster.DEFAULT;
// 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并
invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
}
}
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
MetadataUtils.publishServiceDefinition(consumerURL);
// 创建服务代理
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
}
Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。Protocol 实现类有很多,我们看看一下最常用的两个,分别是 RegistryProtocol 和 DubboProtocol是如何创建invoker,即REF_PROTOCOL.refer(interfaceClass, url)方法如何创建Invoker的。
public class RegistryProtocol implements Protocol {
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//将 registry 参数值,并将其设置为协议头并移除registry 参数值
url = getRegistryUrl(url);
//获取注册中心实例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
//如果当前类型是RegistryService,则通过代理工厂获取invoker
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
// 将 url 查询字符串转为 Map 并获取group信息:group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
// 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑
return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
}
}
Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
// 调用 doRefer 继续执行服务引用逻辑
return doRefer(cluster, registry, type, url);
}
protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
return interceptInvoker(getInvoker(cluster, registry, type, url), url);
}
protected <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建 RegistryDirectory 实例
DynamicDirectory<T> directory = createDirectory(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
// 获取REFER_KEY的所有设置信息
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
// 生成服务消费者连接
URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(urlToRegistry);
registry.register(directory.getRegisteredConsumerUrl());
}
//更新路由调用链路
directory.buildRouterChain(urlToRegistry);
// 订阅 providers、configurators、routers 等节点数据
directory.subscribe(toSubscribeUrl(urlToRegistry));
// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
return (ClusterInvoker<T>) cluster.join(directory);
}
protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url) {
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
//引入了RegistryProtocol侦听器,以使用户有机会自定义或更改导出并引用RegistryProtocol的行为。
// 例如:在满足某些条件时立即重新导出或重新引用。
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, invoker);
}
return invoker;
}
}
以上就是RegistryProtocol的服务引用,接下来我们看看DubboProtocol的服务引入,DubboProtocol继承于AbstractProtocol,所以他的入口在AbstractProtocol中
public abstract class AbstractProtocol implements Protocol {
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
}
public class DubboProtocol extends AbstractProtocol {
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
// 创建 rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
// 通过url获取client
private ExchangeClient[] getClients(URL url) {
// whether to share connection
// 是否共享连接
boolean useShareConnect = false;
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
// 如果未配置 connections,则共享连接, 否则, 一个服务使用一个连接
if (connections == 0) {
useShareConnect = true;
/*
* The xml configuration should have a higher priority than properties.
*/
//xml配置高于其他方式的配置,获取共享连接数
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
//获取共享客户端
shareClients = getSharedClient(url, connections);
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
//使用共享的客户端
clients[i] = shareClients.get(i);
} else {
// 初始化新的客户端
clients[i] = initClient(url);
}
}
return clients;
}
//获取共享客户端
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
String key = url.getAddress();
//从缓存中获取
List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
//如果缓存的客户端还能使用
if (checkClientCanUse(clients)) {
//增加引用计数
batchClientRefIncr(clients);
return clients;
}
//如果从缓存中没有获取到可用的客户端,则初始化客户端
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
clients = referenceClientMap.get(key);
// double check
if (checkClientCanUse(clients)) {
batchClientRefIncr(clients);
return clients;
}
// connectNum must be greater than or equal to 1
// 连接是必须大于等于1
connectNum = Math.max(connectNum, 1);
// If the clients is empty, then the first initialization is
// 如果共享客户端为空,则第一次初始化
if (CollectionUtils.isEmpty(clients)) {
clients = buildReferenceCountExchangeClientList(url, connectNum);
//放入共享客户端缓存
referenceClientMap.put(key, clients);
} else {
for (int i = 0; i < clients.size(); i++) {
ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
// If there is a client in the list that is no longer available, create a new one to replace him.
// 如果列表中有一个不再可用的客户端,请创建一个新客户端以替换该客户端。
if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
clients.set(i, buildReferenceCountExchangeClient(url));
continue;
}
//增加引用计数
referenceCountExchangeClient.incrementAndGetCount();
}
}
/*
* I understand that the purpose of the remove operation here is to avoid the expired url key
* always occupying this memory space.
* But "locks.remove(key);" can lead to "synchronized (locks.get(key)) {" NPE, considering that the key of locks is "IP + port",
* it will not lead to the expansion of "locks" in theory, so I will annotate it here.
*/
// locks.remove(key);
return clients;
}
}
//创建单个引用客户端
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
//创建客户端
ExchangeClient exchangeClient = initClient(url);
return new ReferenceCountExchangeClient(exchangeClient);
}
private ExchangeClient initClient(URL url) {
// 设置client类型
// client type setting.
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
//添加编码格式
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
// 开启心跳检测
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
// 检测客户端类型是否存在,不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy
// 是否是懒加载
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
// 创建懒加载 ExchangeClient 实例
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 创建普通 ExchangeClient 实例
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
}
public class Exchangers {
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
// url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
}
public class HeaderExchanger implements Exchanger {
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
}
public class Transporters {
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handler 数量大于1,则创建一个 ChannelHandler 分发器
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取 Transporter 自适应拓展类(如NettyTransporter),并调用 connect 方法生成 Client 实例
return getTransporter().connect(url, handler);
}
}
public class NettyTransporter implements Transporter {
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
//创建netty客户端
return new NettyClient(url, handler);
}
}
Invoker 创建完毕后,接下来要做的事情是为服务接口生成代理对象。有了代理对象,即可进行远程调用。代理对象生成的入口方法为 ProxyFactory 的 getProxy,接下来进行分析。
public abstract class AbstractProxyFactory implements ProxyFactory {
@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Set<Class<?>> interfaces = new HashSet<>();
String config = invoker.getUrl().getParameter(INTERFACES);
if (config != null && config.length() > 0) {
String[] types = COMMA_SPLIT_PATTERN.split(config);
for (String type : types) {
// TODO can we load successfully for a different classloader?.
interfaces.add(ReflectUtils.forName(type));
}
}
//泛型服务
if (generic) {
if (!GenericService.class.isAssignableFrom(invoker.getInterface())) {
interfaces.add(com.alibaba.dubbo.rpc.service.GenericService.class);
}
try {
// find the real interface from url
// 通过url找寻真正的inteface
String realInterface = invoker.getUrl().getParameter(Constants.INTERFACE);
interfaces.add(ReflectUtils.forName(realInterface));
} catch (Throwable e) {
// ignore
}
}
interfaces.add(invoker.getInterface());
interfaces.addAll(Arrays.asList(INTERNAL_INTERFACES));
//调用重载方法
return getProxy(invoker, interfaces.toArray(new Class<?>[0]));
}
public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
}
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
//使用Dubbo自己的Proxy获取代理类,感兴趣的可以自己了解下
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
}
到这里整个引用服务就分析完了,这样我们引用服务就拿到代理的Invoker,可以进行服务的调用了,让调用者像调用本地服务一样调用远程服务。
?
|