服务引入
前面瞎说了服务引入中的构造路由,这次瞎说服务目录监听配置
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
directory#subscribe逻辑异常的复杂;
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
给subscribeUrl订阅路径 添加目录参数, 参数值为:“providers , configurators , routers”;
RegistryDirectory#subscribe(URL url)
- 设置服务目录的consumerUrl属性;
- 监听consumer应用 : CONSUMER_CONFIGURATION_LISTENER实例中,有一个服务目录List容器, 调用addNotifyListener,就是将当前的服务目录放进List容器中;
- 监听引入的服务的动态配置;
- 调用ZookeeperRegistry#subscriibe 订阅服务;
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}
ReferenceConfigurationListener
引用服务的动态配置监听器;
- 设置服务目录;
- 设置消费者订阅的URL;
- 获取消费者的动态配置规则Key, 格式为:‘{interfaceName}:[version]:[group]’ + “.configurators”;
不设置group和version : 则为dubbo-demo-consumer-application.configurators, 在与Zookeeper交互的时候, path前面会 增加 rootPath, 值为/dubbo/config ,增加group, 值为dubbo;最终发送给zookeeper的path值为/dubbo/config/dubbo/dubbo-demo-consumer-application.configurators - 调用initWith方法, 监听 3中生成的key;
private static class ReferenceConfigurationListener extends AbstractConfiguratorListener {
private RegistryDirectory directory;
private URL url;
ReferenceConfigurationListener(RegistryDirectory directory, URL url) {
this.directory = directory;
this.url = url;
this.initWith(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX);
}
}
- 获取动态配置;
- 添加监听器, 监听的key为/dubbo/config/dubbo/dubbo-demo-consumer-application.configurators
- 从zookeeper中获取path为/dubbo/config/dubbo/dubbo-demo-consumer-application.configurators的数据;
- 如果存在配置信息, 则生成confurators;
public abstract class AbstractConfiguratorListener implements ConfigurationListener {
protected List<Configurator> configurators = Collections.emptyList();
protected final void initWith(String key) {
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
dynamicConfiguration.addListener(key, this);
String rawConfig = dynamicConfiguration.getRule(key, DynamicConfiguration.DEFAULT_GROUP);
if (!StringUtils.isEmpty(rawConfig)) {
genConfiguratorsFromRawRule(rawConfig);
}
}
}
public class ZookeeperDynamicConfiguration implements DynamicConfiguration {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperDynamicConfiguration.class);
@Override
public Object getInternalProperty(String key) {
return zkClient.getContent(key);
}
}
registry.subscribe(url, this)
void subscribe(URL url, NotifyListener listener);
我们使用的是ZookeeperRegistry, ZookeeperRegistry的父类FallbackRegistry中实现了subscribe方法; FallbackRegistry#subscribe
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
doSubscribe(url, listener);
} catch (Exception e) {
}
}
public abstract void doSubscribe(URL url, NotifyListener listener);
ZookeeperRegistry#doSubscribe(url, listener) 工作:
- 调用toCategoriesPath(url)获取所有需要监听的目录, 遍历
- /dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.configurators
- /dubbo/org.apache.dubbo.demo.DemoService/providers
- /dubbo/org.apache.dubbo.demo.DemoService/configurators
- /dubbo/org.apache.dubbo.demo.DemoService/routers
- 根据消费订阅URL从zkListeners监听器中获取监听器, 结果为map;
- 如果为空, 则放入key为消费订阅URL, 值为空的ConcurrentHashMap, 然后再拿出来listeners;
- 判断当前的listeners 是否已经存在, 不存在就新创建一个
- 存在, 则调用zookeeper客户端 发送 创建 非临时节点的path结点的命令,如果结点已经存在,客户端就不创建了。
- List children = zkClient.addChildListener(path, zkListener) : 这个步骤, 会获取path的所有子节点的值, 并且添加监听器监听path的所有子节点, 最终返回的是path的所有路径子节点值的String集合, 因此, 通过这个步骤, 如果是providers, 就可以获取到 /dubbo/org.apache.dubbo.demo.DemoService/providers 路径下的所有子节点值了, 即服务提供了多少个Provider;
- notify(url, listener, urls)调用此方法生成Invoker;如果有多个Provider, 就生成多个Invoker;
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
} else {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
} catch (Throwable e) {
}
}
1. zkClient.create(path, false)
调用zookeeper客户端创建path结点信息;
- ZookeeperClient#create(path, false)
public interface ZookeeperClient {
void create(String path, boolean ephemeral);
}
- 如果不是临时结点, 则会判断持久化结点集合中是否包含了当前path,包含则返回;
- 判断path是否存在, 如果存在,则把path加入持久化结点集合中;
- 如果不是临时结点, 且不存在持久化结点, 则判断path是否以"/"结尾, 是则截取前面的path部分,递归执行同样的流程;
- 如果不是临时结点, 则创建一个持久化结点, 并将path加入到持久化节点集合中;
public abstract class AbstractZookeeperClient<TargetDataListener, TargetChildListener> implements ZookeeperClient {
@Override
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
if(persistentExistNodePath.contains(path)){
return;
}
if (checkExists(path)) {
persistentExistNodePath.add(path);
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
create(path.substring(0, i), false);
}
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
persistentExistNodePath.add(path);
}
}
@Override
public void createPersistent(String path) {
try {
client.create().forPath(path);
} catch (NodeExistsException e) {
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
2. zkClient.addChildListener(path, zkListener)
ZookeeperClient 是一个接口, 存放所有与zk交互的操作
public interface ZookeeperClient {
List<String> addChildListener(String path, ChildListener listener);
}
AbstractZookeeperClient#addChildListener(String path, final ChildListener listener) 工作: 获取path下的所有子节点值, 并设置监听器, 返回所有子节点的值;
@Override
public List<String> addChildListener(String path, final ChildListener listener) {
return addTargetChildListener(path, targetListener);
}
@Override
public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
try {
return client.getChildren().usingWatcher(listener).forPath(path);
} catch (NoNodeException e) {
return null;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
3. urls.addAll(toUrlsWithEmpty(url, path, children))
private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
if (urls == null || urls.isEmpty()) {
int i = path.lastIndexOf(PATH_SEPARATOR);
String category = i < 0 ? path : path.substring(i + 1);
URL empty = URLBuilder.from(consumer)
.setProtocol(EMPTY_PROTOCOL)
.addParameter(CATEGORY_KEY, category)
.build();
urls.add(empty);
}
return urls;
}
遍历providers, 判断字符串中是否包含了"😕/” ,是则创建一个URL放入urls,最后返回urls;
private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
List<URL> urls = new ArrayList<>();
if (CollectionUtils.isNotEmpty(providers)) {
for (String provider : providers) {
provider = URL.decode(provider);
if (provider.contains(PROTOCOL_SEPARATOR)) {
URL url = URL.valueOf(provider);
if (UrlUtils.isMatch(consumer, url)) {
urls.add(url);
}
}
}
}
return urls;
}
4. FallbackRegistry#notify(URL url, NotifyListener listener, List urls)
主要做了一些校验,异常处理, 以及调用了doNotify(url, listener, urls)
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
try {
doNotify(url, listener, urls);
} catch (Exception t) {
}
}
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
super.notify(url, listener, urls);
}
AbstractRegistry#notify(URL url, NotifyListener listener, List urls) 参数urls : 包含了providers, configurators, routers, configurators的所有值;
- 创建一个Map result, 往map中放入key为providder,值为空的ArrayList, 再把ArrayList拿出, 将urls加入categoryList
- 遍历result
- 获取URL集合,调用listener主动通知,参数为服务URL集合-categoryList
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
listener.notify(categoryList);
saveProperties(url);
}
}
- RegistryDirectory#notify(List urls)
将URL进行分组,urls 传入的是providers, configurators, routers, configurators的一种;
- 对urls进行过滤分组操作,根据URL的协议是否为为providers, configurators, routers, configurators继续分组操作,转换为一个Map;
- categoryUrls获取key为“configurators”的值,再根据动态配置的URL集合,生成configurators
- categoryUrls获取key为“routers”的值 , 再将URL集合生成Router,加入路由链中;
- categoryUrls获取key为"providers"的值;
- 调用refreshOverrideAndInvoker方法, 刷新参数, 因为服务URL的有些参数, 消费者也配置了,所以需要确定参数优先级, 再将URL转换为Invoker;
@Override
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
refreshOverrideAndInvoker(providerURLs);
}
RegistryDirectory#refreshOverrideAndInvoker(List urls) 9. 参数覆盖; 10.生成Invoker;
private void refreshOverrideAndInvoker(List<URL> urls) {
overrideDirectoryUrl();
refreshInvoker(urls);
}
RegistryDirectory#verrideDirectoryUrl() 利用动态配置重写服务目录地址
private void overrideDirectoryUrl() {
this.overrideDirectoryUrl = directoryUrl;
List<Configurator> localConfigurators = this.configurators;
doOverrideUrl(localConfigurators);
List<Configurator> localAppDynamicConfigurators = CONSUMER_CONFIGURATION_LISTENER.getConfigurators();
doOverrideUrl(localAppDynamicConfigurators);
if (serviceConfigurationListener != null) {
List<Configurator> localDynamicConfigurators = serviceConfigurationListener.getConfigurators();
doOverrideUrl(localDynamicConfigurators);
}
}
RegistryDirectory#refreshInvoker(List invokerUrls) 10. 获取invokerMap; 11. 将invokerUrls加入cachedInvokerUrls缓存执行器的URL集合中, 方便其他URL的比较; 12. toInvokers(invokerUrls) 生成Map<String, Invoker> invokers; 13. 将invokers设置给路由链;
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
} else {
this.forbidden = false;
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap;
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<>();
this.cachedInvokerUrls.addAll(invokerUrls);
}
if (invokerUrls.isEmpty()) {
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
}
}
RegistryDirectory#toInvokers(List urls) @Reference中是可以i配置 protocol = http /dubbo的, 因此有多个provider, 则需要根据协议进行过滤;
- 创建存储invoker的集合, key为url, value为invokers;
- 参数协议是否支持 provider的协议, 不支持,则跳过当前provider的URL, 处理下一个Provider的URL;
- SPI校验是否支持Provider的Protocol;
- 合并覆盖参数
- 获取本地的Invoke的map, 以url的值为key, 获取Invoker;
- 如果当前服务提供者URL没有生产过Invoker代理实例, 有 则以url值为key, 查出来的invoker为value放入返回结果;
- 没有 则 创建一个新的Invoker代理实例, 放入newUrlInvokerMap返回结果中;多个Provider就创建多个Invoekers, 放入返回结果中;
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
continue;
}
URL url = mergeUrl(providerUrl);
String key = url.toFullString();
if (keys.contains(key)) {
continue;
}
keys.add(key);
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) {
try {
boolean enabled = true;
if (enabled) {
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
}
if (invoker != null) {
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
- routerChain.setInvokers(newInvokers)
- 路由链实例设置 invokers执行器属性的值;
- 遍历路由对象, 调用notify方法;
public void setInvokers(List<Invoker<T>> invokers) {
this.invokers = (invokers == null ? Collections.emptyList() : invokers);
routers.forEach(router -> router.notify(this.invokers));
}
TagRouter#notify(List<Invoker> invokers) 10. 获取一个服务执行器; 11. 获取其URL; 12. 获取URL 的 "remote.application"参数的值; 13. 移除原有的监听器; 14. 以12结果的值 + ".tag-router"为key, 添加一个标签路由监听器; 15. 获取zookeeper中的key的配置信息 16. 处理配置信息,转换为TagRouterRule实例, 设置TagRouter的tagRouterRule属性;
@Override
public <T> void notify(List<Invoker<T>> invokers) {
if (CollectionUtils.isEmpty(invokers)) {
return;
}
Invoker<T> invoker = invokers.get(0);
URL url = invoker.getUrl();
String providerApplication = url.getParameter(CommonConstants.REMOTE_APPLICATION_KEY);
synchronized (this) {
if (!providerApplication.equals(application)) {
if (!StringUtils.isEmpty(application)) {
configuration.removeListener(application + RULE_SUFFIX, this);
}
String key = providerApplication + RULE_SUFFIX;
configuration.addListener(key, this);
application = providerApplication;
String rawRule = configuration.getRule(key, DynamicConfiguration.DEFAULT_GROUP);
if (StringUtils.isNotEmpty(rawRule)) {
this.process(new ConfigChangeEvent(key, rawRule));
}
}
}
}
总结
get()方法
- 调用checkAndUpdateSubConfigs(), 检查更新参数, 和服务提供者类似, 把ReferenceBean里面的属性值替换为优先级别最高的参数值;
- 调用init()方法生成代理对象ref, get()方法返回ref;
- 生成代理对象前, 先把消费者引入服务设置的参数添加到一个map中, 会根据这个map中的参数从注册中心查找服务;
- 把消费者配置的所有注册中心获取出来;
4.1 如果只有一个注册中心, 那么直接调用Protocol#refer(interfaceClass, urls.get(0))得到一个Invoker对象; 4.2 如果有多个注册中心,则遍历每个注册中心, 调用Protocol#refer(interfaceClass, url)生成一个Invoker添加到invokers中, 然后调用ClUSTER.join(new StaticDirectory(u, invokers)),封装所有invokers得到一个invoker; - 把最终的invoker对象调用PROXY_FACTORY.getProxy(invoker)得到一个代理对象实例,返回,这个代理对象实例就是ref;
Protocol#refer(interfaceClass, url)生成代理对象实例
- class表示引入的服务接口, url是注册中心的url (registry://) ,该URL包含了一个refer参数, 参数值为当前所要引入服务的参数;
- 调用dorefer(cluster, registry, type, url)
- 在doRefer方法中生成一个RegistryDirectory;
- 获取新版本的路由器链, 添加到RegistryDirectory中;
5.RegistryDirectory监听几个目录 , 在完成监听器的订阅绑定后, 会自动触发一次去获取这些目录上的数据; 5.1 当前所引入服务的动态配置目录 5.2 当前所引入服务的提供者目录; 5.3 当前所引入服务的老版本的动态配置目录; 5.4 当前所引入的老版本路由器目录; - 调用cluster#join(directory)得到一个invoker;
- 返回invoker(消费者引入了多个group的服务, 返回的是MergeableClusterinvoker(directory)), 否则返回的是new FailoverClusterInvoker(directory);
- 上面返回的Invoker最终被MockClusterInvoker包装,最终返回MockClusterInvoker;
服务目录
消费端每个服务对应一个服务目录RegistryDirectory。
- serviceType :服务接口
- serviceKey: 表示引入的服务key, serviceclass+version+group
- queryMap : 引入服务的配置参数;
- configurators :动态配置;
- routerChain: 路由链
每个Router自己本身也是一个监听器,负责监听对应的路径;
- AppRouter:应用路由,监听的路径为"/dubbo/config/dubbo/dubbo-demo-consumer-application.condition-router;
- ServiceRouter: 服务路由,监听的路径为"/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.condition-router"
- TagRouter: 标签路由,标签路由和应用路由、服务路由有所区别,应用路由和服务路由都是在消费者启动,在构造路由链时会进行监听器的绑定,但是标签路由不是消费者启动的时候绑定监听器的,是在引入服务时,获取到服务的提供者URL之后,才会去监听.tag-router节点中的内容,监听的路径为"/dubbo/config/dubbo/dubbo-demo-provider-application.tag-router"
-
invokers : 服务目录当前缓存的服务提供者Invoker; -
ConsumerConfigurationListener: 监听本应用的动态配置; 监听本应用的动态配置,当应用的动态配置发生了修改后,会调用RegistryDirectory的refreshInvoker()方法,对应的路径为:“/dubbo/config/dubbo/dubbo-demo-consumer-application.configurators” -
ReferenceConfigurationlistener:监听所引入服务的动态配置; 监听所引入的服务的动态配置,当服务的动态配置发生了修改后,会调用RegistryDirectory的refreshInvoker()方法,对应的路径为:“/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.configurators” -
RegistryDirectory:本身也是一个监听器, 会监听所引入的服务提供者, 老版本的动态配置, 服务路由,路径分别为:
-
“/dubbo/org.apache.dubbo.demo.DemoService/providers” -
“/dubbo/org.apache.dubbo.demo.DemoService/configurators” -
“/dubbo/org.apache.dubbo.demo.DemoService/routers” -
当ConsumerConfigurationListener接收到了消费者应用的动态配置数据变化后,会调用当前消费者应用中的所有RegistryDirectory的refreshInvoker()方法,表示刷新消费者应用中引入的每个服务对应的Invoker -
当ReferenceConfigurationListener接收到了某个服务的动态配置数据变化后,会调用该服务对应的RegistryDirectory的refreshInvoker()方法,表示刷新该服务对应的Invoker; -
当AppRouter和ServiceRouter接收到条件路由的数据变化后,就会更新Router内部的routerRule和conditionRouters属性。这两个属性在服务调用过程中会用到。 -
当TagRouter接收到标签路由的数据变化后,就会更新TagRouter内部的tagRouterRule的属性,这个属性在服务调用过程中会用到。 -
当RegistryDirectory接收到"/dubbo/org.apache.dubbo.demo.DemoService/configurators"节点数据变化后,会生成configurators -
当RegistryDirectory接收到"/dubbo/org.apache.dubbo.demo.DemoService/routers"节点数据变化后,会生成Router并添加到routerChain中 -
当RegistryDirectory接收到"/dubbo/org.apache.dubbo.demo.DemoService/providers"节点数据变化后,会调用refreshOverrideAndInvoker()方法。这个方法就是用来针对每个服务提供者来生成Invoker的。
Invoker总结
- MockclusterInvoker:完成Mock共嗯那个, 由MockClusterWrapper生成, MockClusterWrapper是Cluster的包装类, 通过Cluster#join()方法得到MockClusterInvoker;
- FailoverClusterInvoker: 完成集群容错功能, 是MockClusterInvoker的下级;
- RegistryAwareClusterInvoker:如果制定了多个注册中心,那么RegistryAwareCluterInvoker完成选择默认注册中心进行调用,如果没有默认的, 则会遍历所有注册中心进行调用,如果该注册中心没有对应的服务则跳过;
- Dubboinvoker:完成Dubbo协议的底层数据发送;
- ProtocolFilterWrapper$CallbackRegistrationInvoker:完成对Filter调用,是Protocol的包装类, 通过Protocol#refer得到CallbackRegistrationInvoker;
服务引入总结
- 调用服务时, 首先需要进行Mock判断,再进行路由过滤后,会一个或者多个服务可以调用, 这时候, 就是负载均衡了,根据特定策略选出一个进行调用, 调用前需要启动Netty或者tomcat作为发起请求的客户端,如果调用不成功,还需要进行集群容错;
- 服务存在于服务注册目录RegistryDirectory中的invokers;
- 集群容错离存在于FailoverClusterInvoker
- Mock模拟逻辑存在于MockClusterInvoker;
- 启动服务客户端存在于DubboInvoker中;
- 选择服务存在于RegistryDirectory中;
|