服务消费端Invoker的包装
服务消费端的Invoker涉及到服务导出流程, 由ReferenceConfigde#get()方法生成一个代理实例Invoker返回; 这次目的的就是 把整个包装链路的理清除而已;
ReferenceConfigde#get()
导出的逻辑,全在init方法里;
public synchronized T get() {
checkAndUpdateSubConfigs();
if (ref == null) {
init();
}
return ref;
}
private void init() {
Map<String, String> map = new HashMap<String, String>();
ref = createProxy(map);
}
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
} else {
if (url != null && url.length() > 0) {
}
} else {
}
if (urls.size() == 1) {
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url;
}
}
if (registryURL != null) {
URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else {
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
return (T) PROXY_FACTORY.getProxy(invoker);
}
- 首先会根据配置的urls判断是否只有一个URL
- 只有一个,则直接调用REF_PROTOCOL.refer(interfaceClass, urls.get(0))创建一个Invoker, 调用代理工厂返回代理实例, 返回是一个FailoverClusterInvoker实例invoker;
- 如果多个URL, 为每个URL调用REF_PROTOCOL.refer(interfaceClass, urls.get(0))创建一个Invoker实例;
然后判断是否存在注册中心URL 3.1 存在, 则设置URL的CLUSTER_KEY为registryaware, 然后调用CLUSTER.join(new StaticDirectory(u, invokers))方法,通过SPI机制,调用生成一个RegistryAwareClusterInvoker实例invoker; 3.2 不存在, 直接调用CLUSTER.join(new StaticDirectory(invokers)),, 通过SPI机制调用FailoverClusterInvoker生成一个FailoverClusterInvoker实例invoker;
调用CLUSTER.join(new StaticDirectory(invokers)), 由于SPI机制, 首先会调用Wrapper类处理;
存在多个URL的情况下,存在注册中心, 第一层: MockClusterWrapper#join
该类作用是Mock请求处理; 调用join方法
- 调用this.cluster.join(directory)生成Invoker实例返回,作为MockClusterInvoker的构造参数,
- 生成一个MockClusterInvoker实例invoker;
- 即最外层的Invoker类型为MockClusterInvoker类型
public class MockClusterWrapper implements Cluster {
private Cluster cluster;
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}
}
存在多个URL的情况下,存在注册中心, 第二层: RegistryAwareClusterInvoker#join
调用join方法生成一个RegistryAwareClusterInvoker实例;
public class RegistryAwareCluster implements Cluster {
public final static String NAME = "registryaware";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new RegistryAwareClusterInvoker<T>(directory);
}
}
存在多个URL的情况下,存在注册中心, 第三层: FailoverClusterInvoker#join
不管是ReferenceConfigde#get()的1步骤, 还是3.1步骤, 最终都会生成一个FailoverClusterInvoker的Invoker实例;即调用REF_PROTOCOL.refer(interfaceClass, url)最终生成的Invoker实例类型为FailoverClusterInvoker
REF_PROTOCOL.refer(interfaceClass, url)
通过SPI自适应扩展点技术, 第一个调用的是RegistryProtocol类的refer方法; 调用doRefer方法生成invoker实例;
- 参数cluster : 代表集群容错的实例, 默认是FailoverClusterInvoker;
- registry : 注册中心地址;
- type :接口类型;
- url : 服务的URL参数;
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = URLBuilder.from(url)
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
.removeParameter(REGISTRY_KEY)
.build();
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
同样的会调用cluster#join去生成一个实例,同样会调用包装类Wrapper#join方法,包装类是MockClusterWrapper, 又创建了一个MockClusterInvoker实例;再调用了FailoverCluster#join方法,创建一个FailoverClusterInvoker实例,传入了RegistryDirectory实例, RegistryDirectory内部封装了一个内部DelegateInvoker属性;
画层图层关系如下
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
return invoker;
}
RegistryDirectory#subscribe
订阅监听目录的作用;
- 调用registry#subscribe方法, 完成订阅;
- 使用Zookeeper注册中心的话, registry的类型为ZookeeperRegistry, 类里面没有定义subscribe方法,继承的FallbackRegistry类中, 定义了subscribe方法;
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}
FailbackRegistry#subscribe
- FailbackRegistry#subscribe调用了doSubscribe方法,由子类实现, 简单工厂模式;
- ZookeeperRegistry#doSubscribe方法中调用了notify方法,父类中定义了notify方法
- FallbackRegistry#notify中调用了父类AbstractRegistry的notify方法;
- AbstractRegistry又调用了listener#notify方法; listener类型为RegistryDirectory
- RegistryDirectory#notify方法中调用了refreshOverrideAndInvoker方法;
- RegistryDirectory#refreshOverrideAndInvoker 调用了refreshInvoker方法
public class FailbackRegistry extends AbstractRegistry {
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
try {
doSubscribe(url, listener);
} catch (Exception e) {
}
}
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
try {
doNotify(url, listener, urls);
} catch (Exception t) {
}
}
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
super.notify(url, listener, urls);
}
}
public class ZookeeperRegistry extends FailbackRegistry {
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
} else {
notify(url, listener, urls);
}
} catch (Throwable e) {
}
}
}
public class FailbackRegistry extends AbstractRegistry {
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
listener.notify(categoryList);
}
}
}
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
@Override
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
refreshOverrideAndInvoker(providerURLs);
}
private void refreshOverrideAndInvoker(List<URL> urls) {
refreshInvoker(urls);
}
}
RegistryDirectory#refreshInvoker
- 会调用toInvokers为每个URL生成一个Invoker;
- 给自定的invokers属性赋值
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
} else {
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
}
}
}
第四层: RegistryDirectory$InvokerDelegate
toInvokers中,
- 会创建一个内部类InvokerDelegate的实例;
- 调用protocol#refer,会调用DubboProtocol#refer方法处理;由于SPI机制,会先调用包装类Wrapper进行处理;
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
for (URL providerUrl : urls) {
if (invoker == null) {
try {
if (enabled) {
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); }
} catch (Throwable t) {}
} else {
}
}
return newUrlInvokerMap;
}
}
第五层: ListenerInvokerWrapper
protocol.refer(type, url)返回一个Invoker,作为参数, 创建一个ListenerInvokerWrapper实例;
public class ProtocolListenerWrapper implements Protocol {
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, INVOKER_LISTENER_KEY)));
}
}
protocol.refer(type, url)下一层调用的是ProtocolFilterWrapper#refer方法;
第六层 : CallbackRegistrationInvoker
- 调用buildInvokerChain结束,返回一个CallbackRegistrationInvoker实例;因此第五层是CallbackRegistrationInvoker;
- buildInvokerChain方法中, 会生成一个过滤器处理链;
- protocol.refer(type, url) : 返回Invoker实例;
- REFERENCE_FILTER_KEY : 值为reference.filter
- CommonConstants.CONSUMER : 值为consumer
工作:
- 会根据key, value拿到组名为consumer的所有Filter;
- 为每个Filter创建对应的Invoker实例;
- 最终将最外层Filter对应的Invoker实例作为参数, 创建CallbackRegistrationInvoker实例;
public class ProtocolFilterWrapper implements Protocol {
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
}
return asyncResult;
}
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
}
客户端Filter调用次序
dubbo-rpc-api模块
genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter
activelimit=org.apache.dubbo.rpc.filter.ActiveLimitFilter
consumercontext=org.apache.dubbo.rpc.filter.ConsumerContextFilter
dubbo-monitor-api模块
monitor=org.apache.dubbo.monitor.support.MonitorFilter
dubbo-rpc-dubbo模块
future=org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter
调用次序为:
- ConsumerContextFilter#invoke
- FutureFilter#invoke
- MonitorFilter#invoke
MonitorFilter#refer调用完后, 过滤器链Invoker生成结束;
ProtocolFilterWrapper#refer方法中, 调用buildInvokerChain的第一个参数为:protocol.refer(type, url), 即下一个Protocol调用的是DubboProtocol#refer方法
AbstractProtocol#refer
DubboProtocol中没有定义refer, 其继承了AbstractProtocol类, AbstractProtocol中定义了refer方法;
第七层 : AsyncToSyncInvoker
该类的作用: 异步请求转换为同步请求, 这个类Invoker非常重要
- protocolBindingRefer(type, url)方法,返回一个Invoker实例作为参数, 创建一个AsyncToSyncInvoker实例;
- protocolBindingRefer是一个抽象方法, 由子类实现 , 这里是DubboProtocol
public abstract class AbstractProtocol implements Protocol {
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
}
DubboProtocol#protocolBindingRefer
第九层 : DubboInvoker
protocolBindingRefer方法里面,创建了一个DubboInvoker实例返回;
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
至此, Invoker的包装层数宏观上一共由9层;细看还需要加上过滤器Filter对应的Invoke人:
最终的图如下:
|