上一次我们讲解了Dubbo的服务暴露, 这次我们来看一下Dubbo是如何调用服务的
本文会根据dubbo的架构图进行解析
目录
客户端启动流程
Dubbo服务调用
客户端启动流程
大家都知道,客户端在调用服务时只有接口没有相对应的实现类,所以呢,我们需要在调用服务时需要先生成一个代理,再通过代理去执行服务。
????????
?可以看到我们通过从ioc中获取DemoService的Bean,那其实本身客户端中并没有DemoService的bean,这个时候我们可以看到配置文件中进行了这样的一个配置使用了reference标签,在dubbo中reference标签配置的服务会被另一个Bean生成代理,那就是ReferenceBean,它会通过getObject()方法将生成的代理对象放入ioc中,所以点进去
<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService"/>
?这里会对接口的代理对象进行检查,如果没有则会初始化一个
public synchronized T get() {
checkAndUpdateSubConfigs();
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
// 检测 ref 是否为空,为空则通过 init 方法创建
if (ref == null) {
// init 方法主要用于处理配置,以及调用 createProxy 生成代理类
init();
}
return ref;
}
点进init()方法中,可以看到核心的方法就是创建代理对象的(能看到我们本身配置的一些信息,接口路径等等)
接着就是对提供者的数量的一个判断,是有多个还是单个,然后生成invoker,最后会根据invoker返回生成的代理,重点则在于如何生成invoker,这里的protocol是自适应的,所以会根据url中的信息选择对应的实现类,可以看到协议中使用的是registry,于是来到RegistryProtocol
// 单个注册中心或服务提供者(服务直连,下同)
if (urls.size() == 1) {
// 调用 RegistryProtocol 的 refer 构建 Invoker 实例
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); // registry://192.168.200.129:2181/org.apache.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&pid=14276&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.demo.DemoService%26lazy%3Dfalse%26methods%3DsayHello%26pid%3D14276%26qos.port%3D33333%26register.ip%3D192.168.200.10%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1622603489641®istry=zookeeper×tamp=1622603494381
} else {
// 多个注册中心或多个服务提供者,或者两者混合
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 获取所有的 Invoker 到 invokers中
for (URL url : urls) {
// 通过 ref_protocol 调用 refer 构建 Invoker,refprotocol 会在运行时
// 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use RegistryAwareCluster only when register's CLUSTER is available
// 如果注册中心链接不为空,则将使用 AvailableCluster
URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
// 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并 Cluster扩展点默认找的是FailoverCluster
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
// invoker 可用性检查
if (shouldCheck() && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataReportService.publishConsumer(consumerURL);
}
// create service proxy 根据Invoker 真正生成代理 PROXY_FACTORY=ProxyFactory$Adaptive@xxxx
return (T) PROXY_FACTORY.getProxy(invoker);
来到RegistryProtocol中的refer(),这里根据配置的信息获取了注册中心并连接上了zk,随后将包含集群容错以及负载均衡的cluster、注册中心,接口类型以及url传入doRefer()
在doRefer中,先是将自己注册到注册中心去,随后进行了服务的订阅(重点)在里面为每个提供者生成一个invoker,最后将多个invoker包装成一个
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = URLBuilder.from(url)
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
.removeParameter(REGISTRY_KEY)
.build();
//获取注册中心实例 ZookeeperRegistry url=zookeeper://192.168.200.129:2181/org.apache.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=14276&qos.port=33333×tamp=1622603494381
Registry registry = registryFactory.getRegistry(url);// 获取实例并使用curator连接上了zk
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*" 将 url 查询字符串转为 Map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY); // // 获取 group 配置
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
// 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建 RegistryDirectory 实例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 设置注册中心和协议
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 生成服务消费者链接 consumer://192.168.200.10/org.apache.dubbo.demo.DemoService?application=demo-consumer&check=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=12468&qos.port=33333&side=consumer&sticky=false×tamp=1622604057385
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
// 注册服务消费者,在 consumers 目录下创建新节点
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
/**
* 订阅 providers、configurators、routers 等节点数据
* 拿到 providers 下的提供者信息后,会创建客户端(nettyclient),并连接服务端,重点 DubboProtocol 中的 protocolBindingRefer
* ********所以重点跟这个方法的调用****
*/
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory); // 一个注册中心下可能有多个服务提供者,每个提供者都对应一个invoker,这里将多个invoker合并为一个
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
进入到订阅这个部分,订阅本身就是一个拓展点,由于我们使用的是zk,所以最终会到ZookeeperRegistry中的doSubscribe,在进行了一系列的zk操作之后,会通知所有的客户端拉取的提供者节点将它们封装成invoker
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// providers 拿到 providers 对应的URL
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
/**
* 将 /providers 下每个 provider url 封装成 invoker ******重点*****
*/
refreshOverrideAndInvoker(providerURLs);
}
这里会将提供者封装成invoker并与其建立连接
这里根据url进行创建,协议本身是拓展点,默认使用的是DubboProtocol去生辰invoker,在这个invoker的外层套了一层InvokerDelegate,我们重点看DubboProtocol中的内容
?在创建DubboInvoker时,可以看到构造函数中还需要一个客户端,这个会先去看看这个提供者本地是否已有连接,有的话直接拿来用,没有则需要去创建一个
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
/**
* create rpc invoker. 创建 DubboInvoker
* 重点看 getClients(url) 根据provider url 获取客户端,
*
* url=dubbo://192.168.200.10:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&bean.name=org.apache.dubbo.demo.DemoService&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=20192&qos.port=33333®ister=true®ister.ip=192.168.200.10&release=&remote.application=demo-provider&side=consumer&sticky=false×tamp=1622964228064
*/
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
private ExchangeClient[] getClients(URL url) {
// whether to share connection 是否共享连接
boolean useShareConnect = false;
int connections = url.getParameter(CONNECTIONS_KEY, 0);// 获取连接数,默认为0,表示未配置
List<ReferenceCountExchangeClient> shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
useShareConnect = true;
/**
* The xml configuration should have a higher priority than properties. xml配置比properties优先级更高
* 获取<dubbo:consumer/>中的shareconnections属性值,表示共享连接的数量
*/
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
shareClients = getSharedClient(url, connections); // 获取指定数量的共享连接(第一次获取还没有会创建)
}
// 此时的connections代表的是连接数,已经不区分是共享还是新创建的连接了
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
// 若是共享的,直接从shareClients 里取
if (useShareConnect) {
// 获取共享客户端
clients[i] = shareClients.get(i);
} else {
// 若不是共享的,则新建连接 初始化新的客户端 看这块的实现即可 重点关注的地方
clients[i] = initClient(url);
}
}
return clients;
}
简单看一下先判断是否有配置懒加载,没有则创建普通的,看到Exchanger可能有朋友会记得,在服务端暴露时也是有个Exchanger,这个本身就是一个拓展点,默认会使用HeaderExchanger
private ExchangeClient initClient(URL url) {
// client type setting. 获取客户端类型,默认为 netty
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
// 添加编解码和到 url 中
url = url.addParameter(CODEC_KEY, DubboCodec.NAME); // dubbo编解码器,重要
// enable heartbeat by default 添加心跳包参数到 url 中
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy 获取 lazy 配置,并根据配置值决定创建的客户端类型
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
// 创建懒加载 ExchangeClient 实例
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
/**
* 创建普通 ExchangeClient 实例 分析这块 创建exchangeClient,会绑定一个 Netty Client
*
* ExchangeHandler requestHandler = new ExchangeHandlerAdapter 在 DubboProtocol 中作了内部实现
*/
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
在HeaderExchanger中会进行连接,默认会采用netty进行连接,后面netty之前讲过,就不在重复了
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
/**
* 这里包含了多个调用,分别如下:
* 1. 创建 HeaderExchangeHandler 对象 参数handler= DubboProtocol.requestHandler 很重要,涉及到发送请求时的处理
* 2. 创建 DecodeHandler 对象
* 3. 通过 Transporters 构建 Client 实例
* 4. 创建 HeaderExchangeClient 对象
*
* 重点看:Transporters.connect
*/
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分别如下:
// 1. new HeaderExchangeHandler(handler) 负载处理请求回写结果
// 2. new DecodeHandler(new HeaderExchangeHandler(handler)) 对请求数据和响应结果进行解码操作,如何交由后续流程继续处理
// 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))) 重点看这个bind方法,
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
到这里呢,我们就能拿到封装完的invoker,那么再回到根据invoker生成代理的地方(上面是生成invoker,下面是根据invoker生成代理),proxy_factory本身是拓展点,默认会使用JavassistProxyFactory
?这里值得注意点的是new 了一个InvokerInvocationHandler并将invoker注入进去,这个InvokerInvocationHandler又实现了jdk的InvocationHandler,所以在执行相对应的方法时会进行拦截
?
?
到这里服务端的连接以及注册就讲完了,接下来就是服务的调用了
Dubbo服务调用
上面刚刚讲了客户端的代理的生成,那么接下来生成的代理的方法进行调用时,不就是Dubbo的调用链嘛,我们直接来看代码
?可以看到demoService就是刚刚生成的InvokerInvocationHandler,当执行方法时,就是调用了其中的invoke方法
先是排除了Object类中的方法,再是派出了toString等方法,然后将我们的方法和参数封装到RpcInvocation中
在MockClusterInvoker判断了是否为mock调用,不是的话会直接传到AbstractClusterInvoker中,这里主要进行了三个操作,先是从注册目录中获取服务端的Invoker的list进行路由,然后初始化了负载均衡策略,随后传入了子类的FailoverClusterInvoker中
?FailoverClusterInvoker中会有一个容错机制,它的重试次数会是默认次数(默认为2)+1,随后开始循环,先是通过负载均衡选择出相应的Invoker,然后开始调用
?接着会走到DubboInvoker中,这里值得注意的是会创建一个Future去订阅调用的结果
?然后会在HeaderExchangeChannel中调用客户端的netty进行发送,其实看到HeaderExchangeChannel这个名字就会觉得很眼熟了,因为无论是在服务端还是客户端都会有HeaderExchange
?随后就会在dubbo封装的NettyChannel中将消息发送出去,不过在真正发送消息前,还得进行二次编码以及一次编码?,编码完成之后请求就会通过channel传输到服务端
接着我们来看服务端的代码,服务端接收到请求的第一件事就是解码,所以我们直接看到NettyServer中的解码,关于解码先放一张Dubbo协议的设计,协议头会有16个字节的位置,存放的信息就不一一讲解了
?
可以看到采用了一个do while的形式,先会读取协议头的16个字节,如果没有读到16个,会设置一个NEED_MORE_INPUT的状态并抛出,等待下一次读取
?在对协议头读取完成后会对其进行一些解析以及判断,如拿到请求id,判断是否为心跳
?最后会判断为请求,并返回给业务处理器
?在业务处理中能看到我们请求中的一些信息已经被解析成对象,比如我们的方法名
?在调用会经过HeartbeatHandler中判断是否为心跳
?接着就到了重点,dubbo是可以设置请求派发的,什么意思呢,就是在处理任务时可以根据任务类型选择在netty的io线程中执行还是在工作线程池中执行,我这里没有配置,所以默认是在工作线程中执行,所以来到了AllChannelHandler
?
?可以看到在异步线程中各种事件相对应的处理
????????
?看到received方法中又进行了解码,因为我们之前没有对传入的参数进行解码,所以这里得解码一下?
?在这里先获取了请求id,毕竟得一一对应,调用之后会得到一个future,当future有结果时,就会通过信道将响应结果写回客户端
?具体的调用流程先是根据信息获取invoker,然后通过invoker进行调用,由于invoker就是代理,所以在调用时会执行服务端的实现类,随后将结果返回
?
以上就是对dubbo的调用流程的简介
|