1. 前言
之前的文章对Dubbo服务暴露和引用流程进行了分析,再然后了解了Dubbo网络通讯协议的设计细节,现在终于到了最激动人心的时刻,我们将分析Dubbo是如何实现服务调用的。篇幅原因,文章会拆分成两篇,本篇从Consumer的视角,分析服务的调用过程和对响应结果的处理,下一篇会从Provider的视角分析Dubbo是如何处理RPC请求并响应结果的。 这是官方文档给的图,从中就可以看出大概的处理流程。代理对象通过客户端发送网络请求,请求/响应对象会经过Codec编解码、序列化发送到对端,对端接收到数据后进行解码和反序列化,通过Dispatcher将请求派发到具体的线程处理,最终响应结果。
2. 源码分析
通过一个简单的Demo开始分析,如下:
public interface HelloService {
R<String> say(String name);
}
public class HelloServiceImpl implements HelloService {
@Override
public R<String> say(String name) {
return R.ok("hello " + name);
}
}
Consumer在只有接口的情况下是不能实例化的,接口对象是Dubbo帮我们生成的代理对象,这个代理对象的实现细节是怎样的呢?能否窥探一下实现类的源码呢?我这里借助Arthas工具,反编译代理类的源码,如下:
public class proxy0 implements ClassGenerator.DC, Destroyable,EchoService,HelloService {
public static Method[] methods;
private InvocationHandler handler;
public R say(String string) {
Object[] objectArray = new Object[]{string};
Object object = this.handler.invoke(this, methods[0], objectArray);
return (R)object;
}
public void $destroy() {
Object[] objectArray = new Object[]{};
Object object = this.handler.invoke(this, methods[1], objectArray);
}
public Object $echo(Object object) {
Object[] objectArray = new Object[]{object};
Object object2 = this.handler.invoke(this, methods[2], objectArray);
return object2;
}
public proxy0() {
}
public proxy0(InvocationHandler invocationHandler) {
this.handler = invocationHandler;
}
}
Dubbo生成的代理类自动帮我们实现了很多接口:DC 接口,它是一个标记接口,仅代表它是动态生成的类。还实现了Destroyable 接口,代表它是可被销毁的,它的$destroy() 方法会调用对应Invoker的destroy() 方法。EchoService 接口用于回声测试,用来测试服务的可用性。
通过查看代理类的源码,我们发现,调用say() 方法它会帮我们转交给InvocationHandler处理,这个InvocationHandler对象就是在生成代理对象时指定的,代码如下:
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
}
综上所述,当我们调用接口的自定义方法时,代理对象会触发InvokerInvocationHandler#invoke() 方法,我们以此为入口分析Dubbo服务调用的奥秘。
2.1 发送请求
RPC调用最终就是要发送网络请求,但是在那之前,Dubbo需要做大量的前置处理,例如服务降级、拦截器、过滤器、集群容错、异步转同步等等。Dubbo在设计上,遵循了单一职责,这些功能全都通过一个个Invoker类来实现,采用装饰者模式,将最基础的DubboInvoker经过一层层的包装,最终实现这一整套复杂的功能。
2.1.1 InvokerInvocationHandler
在InvokerInvocationHandler#invoke() 方法里,逻辑很简单,如果调用的方法来自Object,则直接调用Invoker本身,无需触发RPC调用。如果是自定义的方法,则需要创建RpcInvocation对象,交给Invoker处理。
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);
if (consumerModel != null) {
rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
}
return invoker.invoke(rpcInvocation).recreate();
2.1.2 MockClusterInvoker
该类的职责是服务降级、数据Mock。当接口调用失败时,Dubbo会尝试Mock数据并返回,适用于非关键流程。 需要注意的是,对于业务异常,Dubbo是不会Mock的,只针对非业务异常,例如超时。 具体实现就是对invoke 逻辑进行try catch,如果捕获到非业务异常,则执行Mock逻辑,返回Mock数据。
2.1.3 InterceptorInvokerNode
该类的职责是实现Cluster层的拦截器,通过实现ClusterInterceptor接口,来对invoke调用做拦截。 Dubbo目前提供了两个实现类:ConsumerContextClusterInterceptor和ZoneAwareClusterInterceptor。前者用于设置和清理RpcContext,后者猜测是让Invoker具备区域感知的能力,可以优先调用同机房的服务。
Result asyncResult;
try {
interceptor.before(next, invocation);
asyncResult = interceptor.intercept(next, invocation);
} catch (Exception e) {
if (interceptor instanceof ClusterInterceptor.Listener) {
ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
listener.onError(e, clusterInvoker, invocation);
}
throw e;
} finally {
interceptor.after(next, invocation);
}
2.1.4 AbstractClusterInvoker
该类的职责是实现ClusterInvoker的基础逻辑,采用模板方法模式,实现一套算法骨架,子类只需要实现自己特有的逻辑。 AbstractClusterInvoker#invoke() 方法首先会将RpcContext的attachments写入到RpcInvocation,然后通过Directory过滤可调用的服务列表得到一组Invoker,再初试化LoadBalance,后续负载均衡将在这一组Invoker里面选出一个最终的Invoker进行调用。
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
2.1.5 FailoverClusterInvoker
Cluster是集群容错接口,默认的集群容错方案是FailoverCluster,我们这里只拿它举例。 该类的职责是实现服务调用失败重试的逻辑,doInvoke() 方法首先会获取重试次数,然后利用Loadbalance进行负载均衡,选择一个最终的Invoker并对它进行调用,如果捕获到非业务异常,则利用其它Invoker做重试。
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
RpcException le = null;
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
}
2.1.6 ProtocolFilterWrapper
该类的职责是执行过滤器链,它是一个包装类,Dubbo SPI机制会自动把Protocol对象进行包装。 buildInvokerChain() 方法用于构建FilterChain,利用SPI加载激活的的Filter,将它们编排成一个单向链表,Invoker在链表的末尾,确保在执行invoke方法前,先经过所有的Filter。
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
throw e;
}
}
2.1.7 AsyncToSyncInvoker
该类的职责是实现异步转同步的功能,Dubbo在发送网络请求后,服务端什么时候响应结果,客户端是不知道的,所以invoke调用是异步的,返回的是AsyncRpcResult,什么时候服务端返回数据了,Dubbo才会将结果写入AsyncRpcResult。 ?
这是Dubbo底层的实现细节,但是作为开发者而言,RPC调用就应该和调用本地方法一样,必须拿到响应结果了程序才能往下走,所以才有了AsyncToSyncInvoker。它的逻辑很简单,就是调用AsyncRpcResult#get() 方法阻塞当前线程,直到服务端响应结果。
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = invoker.invoke(invocation);
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
return asyncResult;
}
2.1.8 DubboInvoker
该类的职责是实现dubbo协议的远程调用,它是底层的Invoker,没有再包装其它Invoker了。它的逻辑也很简单,利用ExchangeClient将RpcInvocation作为参数发送到Provider,得到CompletableFuture结果,将其包装为AsyncRpcResult并返回。
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
2.1.9 HeaderExchangeChannel
该类的职责是负责发送网络请求,Dubbo所有的网络请求最终都会封装为Request对象,它记录了RequestID,协议版本,请求体等信息。 它底层又会依赖Channel,这就涉及到网络传输层了,以Netty为例,最终会调用channel.writeAndFlush() 将数据发送到对端。
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
至此,服务请求就结束了,再往后就是网络传输层的逻辑,对Request对象进行编码和序列化,这里就不细说了。
2.2 处理响应
请求发送以后,客户端线程会阻塞,直到服务端响应结果。客户端是如何处理响应结果的呢?又是如何将响应结果和每次请求匹配上的呢?
2.2.1 DefaultFuture
前面已经说过,Dubbo所有的请求都会封装成Request对象,Request的构造函数里,会自动生成RequestID,这个ID是全局自增的,利用原子类AtomicLong,这就保证了同一个客户端发出的所有请求,RequestID是唯一的。服务端在响应结果时,会将这个RequestID原样写回,客户端根据服务端响应的RequestID就知道具体是哪个请求的响应结果了。 ?
另外,客户端每次发送请求前,都会创建一个DefaultFuture对象,它继承自CompletableFuture,没有结果时调用get() 方法线程会阻塞。DefaultFuture在构造函数中,会将自身放入一个全局Map中,然后客户端阻塞等待结果。服务端响应数据后,客户端根据RequestID取出对应的DefaultFuture并写入结果,客户端线程停止阻塞,程序正常运行。 ?
当然,客户端接收到的依然是字节序列,需要解码成Response对象,Response里的data就是方法的返回值,客户端需要对其进行反序列化才能得到最终的结果,这里不细说。
3. 总结
Dubbo自动为接口生成代理对象Proxy,当触发的是自定义方法时,Proxy会转交给InvokerInvocationHandler执行,它会创建RpcInvocation对象,然后交给后续Invoker执行,这些Invoker包括:服务降级、拦截器、集群容错、Filter、异步转同步等等,最后才到具体协议对应的Invoker,例如DubboInvoker,最终构建Request对象发送网络请求。 ?
在Dubbo设计体系里,大部分逻辑都是在客户端实现的,因此Consumer服务调用比Provider处理请求要复杂的多。Dubbo遵循了单一原则,你会看到有各种各样的Invoker类,但是每个Invoker类的职责都很简单清晰,采用装饰者模式将Invoker一层一层的包装,最终实现了一整套复杂的功能。
|