前言:
在Dubbo中,为provider和consumer提供了一种被称为隐式参数传递的策略,可用于在两者之间传递参数。
本文先通过一个示例来展示下其使用过程,后续通过源码来分析下其传递过程。
1.示例分析
1.1 consumer示例
public class Application {
// 服务提供者代码有所精简,本质上还是与之前的示例一样
public static void main(String[] args) throws Exception {
ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(DemoService.class);
// 设置参数
RpcContext.getContext().setAttachment("address", "beijing");
RpcContext.getContext().setAttachment("age", "12");
reference.setScope("remote");
DemoService service = reference.get();
String message = service.sayHello("dubbo");
System.out.println(message);
}
}
相比之前的示例中,多了设置参数的代码,这个需要传递到provider。
1.2 provider示例
public class ProviderApplication {
public static void main(String[] args) {
// 服务实现(自定义DemoService接口)
DemoService demoService = new DemoServiceImpl();
// 当前应用配置
ApplicationConfig application = new ApplicationConfig();
application.setName("provider");
// 连接注册中心配置
RegistryConfig registry = new RegistryConfig();
// 本地zookeeper作为配置中心
registry.setAddress("zookeeper://localhost:2181");
// 服务提供者协议配置
ProtocolConfig protocol = new ProtocolConfig();
// dubbo协议,并以20881端口暴露
protocol.setName("dubbo");
protocol.setPort(20881);
// 服务提供者暴露服务配置
ServiceConfig<DemoService> service = new ServiceConfig<DemoService>();
service.setApplication(application);
service.setRegistry(registry);
service.setProtocol(protocol);
service.setInterface(DemoService.class);
service.setRef(demoService);
service.setVersion("1.0.0");
// 暴露及注册服务
service.export();
}
}
// 接口
public interface DemoService {
String sayHello(String name);
}
public class DemoServiceImpl implements DemoService {
private static final Logger logger = LoggerFactory.getLogger(org.apache.dubbo.demo.provider.DemoServiceImpl.class);
@Override
public String sayHello(String name) {
// 在实现类中获取consumer传递过来的参数并打印出来
String address = RpcContext.getContext().getAttachment("address");
String age = RpcContext.getContext().getAttachment("age");
System.out.println("address:" + address);
System.out.println("age:" +age);
return "Hi " + name;
}
}
相比较之前的示例而言,多了在DemoService接口的实现类DemoServiceImpl中获取consumer传递过来的参数的代码。
1.3 测试结果
address:beijing
age=12
所以,从consumer端传递过来的参数,在provider端被接收到。
下面我们就从源码的角度来分析下整个参数传递的过程。
2.源码分析
2.1 RpcContext解析
在分析传递过程之前,先来看下RpcContext的作用
/**
* Thread local context. (API, ThreadLocal, ThreadSafe)
* <p>
* Note: RpcContext is a temporary state holder. States in RpcContext changes every time when request is sent or received.
* For example: A invokes B, then B invokes C. On service B, RpcContext saves invocation info from A to B before B
* starts invoking C, and saves invocation info from B to C after B invokes C.
*
* @export
* @see org.apache.dubbo.rpc.filter.ContextFilter
*/
public class RpcContext {
// 参数存放位置
protected final Map<String, Object> attachments = new HashMap<>();
// 有关于RpcContext,其是线程安全的,每一个线程独享一个RpcContext
private static final InternalThreadLocal<RpcContext> LOCAL = new InternalThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
// 获取RpcContext对象
public static RpcContext getContext() {
return LOCAL.get();
}
// 设置attachment参数
public RpcContext setAttachment(String key, String value) {
return setObjectAttachment(key, (Object) value);
}
public RpcContext setAttachment(String key, Object value) {
return setObjectAttachment(key, value);
}
@Experimental("Experiment api for supporting Object transmission")
public RpcContext setObjectAttachment(String key, Object value) {
// 实际就是将参数设置到上面的attachments
if (value == null) {
attachments.remove(key);
} else {
attachments.put(key, value);
}
return this;
}
}
通过对RpcContext的分析,我们知道:RpcContext是线程安全的,每一个线程独享一个RpcContext对象,我们在调用setAttachment()方法时将参数设置到该map中。
设置到RpcContext有什么用呢?我们继续分析consumer传递参数的过程
2.1 consumer传递参数
通过前面的分析,我们知道consumer方法的调用,会通过ClusterInvoker,默认是FailoverClusterInvoker,调用其invoke()方法。我们就从其invoke()方法分析起。
2.1.1 AbstractClusterInvoker.invoke() 获取consumer设置的参数
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// 这里就从当前线程的ThreadLocal中获取RpcContext对象中存放的attachments参数信息
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
// 并将参数绑定到RpcInvocation.attachments属性中
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// FailoverClusterInvoker实现
return doInvoke(invocation, invokers, loadbalance);
}
}
2.1.2 DubboInvoker.doInvoke() 发送数据
public class DubboInvoker<T> extends AbstractInvoker<T> {
protected Result doInvoke(final Invocation invocation) throws Throwable {
// 获取设置好的Invocation对象,里面有上一步骤获取的参数
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
// 直接通过client传递出去
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
传递对象就比较简单了,就是通过Netty channel将invocation对象(包含attachments)传递到服务端
2.2 provider接收参数
依旧我们之前对provider接收请求的过程分析,其会先经过一些Filter的处理,最后才交由接口实现类处理。
我们从 看起
2.2.1 ContextFilter 解析attachments
public class ContextFilter implements Filter, Filter.Listener {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 这个invocation就是从消费者端传递过来的那个invocation对象
// 我们从这里获取其attachments
Map<String, Object> attachments = invocation.getObjectAttachments();
if (attachments != null) {
Map<String, Object> newAttach = new HashMap<>(attachments.size());
for (Map.Entry<String, Object> entry : attachments.entrySet()) {
String key = entry.getKey();
if (!UNLOADING_KEYS.contains(key)) {
newAttach.put(key, entry.getValue());
}
}
attachments = newAttach;
}
RpcContext context = RpcContext.getContext();
...
if (attachments != null) {
// 将invocation中获取到的attachments重新放置到当前RpcContext中
if (context.getObjectAttachments() != null) {
context.getObjectAttachments().putAll(attachments);
} else {
context.setObjectAttachments(attachments);
}
}
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
context.clearAfterEachInvoke(false);
return invoker.invoke(invocation);
} finally {
context.clearAfterEachInvoke(true);
// IMPORTANT! For async scenario, we must remove context from current thread, so we always create a new RpcContext for the next invoke for the same thread.
RpcContext.removeContext(true);
RpcContext.removeServerContext();
}
}
}
通过ContextFilter?的分析我们知道:就是在当前Filter中,ContextFilter将从消费者端获取到的attachments,重新添加到provider端的RpcContext.attachments中。
这样,后续在我们的DemoService实现类中,就可以通过RpcContext获取到attachments了。
总结:
对隐式参数传递整个分析过程并不算困难,只要我们之前分析consumer、provider的过程足够坚实。
基本上Dubbo的这些扩展功能,都是通过这些Filter来实现的。
所以在分析其源码的时候,先分析主干信息,千万不要陷入无穷无尽的细节中了。
等主干分析清楚之后,再来对细节各个突破即可。
|