服务导出之注册到zookeeper
- ServiceConfig#export导出, 层层调用到最后, 就是调用RegistryProtocol#export来进行服务导出,上一篇解释了,也解释了容器的启动;
- 由于我们使用的zookeeper注册中心, 因此dubbo会将服务信息注册到注册中心中;
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
URL providerUrl = getProviderUrl(originInvoker);
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
return new DestroyableExporter<>(exporter);
}
Registry#getRegistry(originInvoker)
目的:获取一个Registry实例对象; 工作:
- 获取registryUrl;
- 通过工厂模式,创建一个Registry对象;
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
return registryFactory.getRegistry(registryUrl);
}
private URL getRegistryUrl(Invoker<?> originInvoker) {
URL registryUrl = originInvoker.getUrl();
if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);
}
return registryUrl;
}
RegistryFactory#getRegistry(registryUrl)
是一个接口;
@SPI("dubbo")
public interface RegistryFactory {
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
- ZookeeperRegistryFactory类实现了AbstractRegistryFactory抽象类, 而AbstractRegistryFactory实现了RegistryFactory接口;
- 调用getRegistry方法调用的是ZookeeperRegistryFactory父类AbstractRegistryFactory的getRegistry方法
- AbstractRegistryFactory#getRegistry定义了创建注册中心的一些公共逻辑,而由于注册中心有好几种,因此将注册中心的创建createRegistry逻辑交由子类去实现;这是工厂模式的通常做法;、
- 使用ReentrantLock可重入锁, 保证线程安全, 避免重复创建Registry实例;
@Override
public Registry getRegistry(URL url) {
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = url.toServiceStringWithoutResolving();
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
LOCK.unlock();
}
}
protected abstract Registry createRegistry(URL url);
ZookeeperRegistryFactory#createRegistry
创建了一个ZookeeperRegistry实例;
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
创建ZookeeperRegistry实例;
- 调用super(url)去加载一些公共的配置;
- 获取group组名 ,默认值为“dubbo”
- 判断组名是否以“/”开头, 没有则拼接"/"
- Zookeeper注册中心的根节点设置为组名 group;
- 连接注册中心;
- 监听是否连接成功,连接失败,则进行重连;
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
创建完ZookeeperRegistry实例后,层层返回, 返回到获取Registry的地方;
RegistryProtocol#getRegisteredProviderUrl
目的: 简化删除URL中的一些无用数据;
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
工作:
- 判断RegistryProtocol是否配置简化简述Simplified, 默认时false;
- 过滤URL中以“.”开头的key, 以及monitor, bind.ip, bind.port, qos.enable, qos_host, qos_port, qos.accept.foreign.ip, validation,interfaces的key ; 过滤后,剩下的key就是待删除的key;
- 删除URL中 待删除的key;
- 返回
private URL getRegisteredProviderUrl(final URL providerUrl, final URL registryUrl) {
if (!registryUrl.getParameter(SIMPLIFIED_KEY, false)) {
return providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameters(
MONITOR_KEY, BIND_IP_KEY, BIND_PORT_KEY, QOS_ENABLE, QOS_HOST, QOS_PORT, ACCEPT_FOREIGN_IP, VALIDATION_KEY,
INTERFACES);
} else {
String extraKeys = registryUrl.getParameter(EXTRA_KEYS_KEY, "");
if (!providerUrl.getPath().equals(providerUrl.getParameter(INTERFACE_KEY))) {
if (StringUtils.isNotEmpty(extraKeys)) {
extraKeys += ",";
}
extraKeys += INTERFACE_KEY;
}
String[] paramsToRegistry = getParamsToRegistry(DEFAULT_REGISTER_PROVIDER_KEYS
, COMMA_SPLIT_PATTERN.split(extraKeys));
return URL.valueOf(providerUrl, paramsToRegistry, providerUrl.getParameter(METHODS_KEY, (String[]) null));
}
}
private static String[] getFilteredKeys(URL url) {
Map<String, String> params = url.getParameters();
if (CollectionUtils.isNotEmptyMap(params)) {
return params.keySet().stream()
.filter(k -> k.startsWith(HIDE_KEY_PREFIX))
.toArray(String[]::new);
} else {
return new String[0];
}
}
ProviderConsumerRegTable.registerProvider(originInvoker,registryUrl, registeredProviderUrl)
目的: 当前服务提供者Invoker,以及该服务对应的注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable 工作:
- 创建一个ProviderInvokerWrapper实例,包装了注册中URL,服务执行器, 以及简化后的URL;
- 获取providerUrl中的serviceKey;
- 从缓存providerInvokers根据key获取map;
- 如果为空,则放入一个空的ConcurrentHashMap, 再根据key将这个ConcurrentHashMap拿出来;
- 以invoker为key, wrapperInvoker包装实例为value, 放入invokers,即刚创建的ConcurrenthashMap;
public static <T> ProviderInvokerWrapper<T> registerProvider(Invoker<T> invoker, URL registryUrl, URL providerUrl) {
ProviderInvokerWrapper<T> wrapperInvoker = new ProviderInvokerWrapper<>(invoker, registryUrl, providerUrl);
String serviceUniqueName = providerUrl.getServiceKey();
ConcurrentMap<Invoker, ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
if (invokers == null) {
providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashMap<>());
invokers = providerInvokers.get(serviceUniqueName);
}
invokers.put(invoker, wrapperInvoker);
return wrapperInvoker;
}
register(registryUrl, registeredProviderUrl)
注册到Zookeeper注册中心;
- 获取REGISTER_KEY的值,判断是否要注册;
- 需要注册则将简化后的服务URL放入注册中心;
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
- 首先调用registryFactory.getRegistry获取注册中心实例, 由于前面已经创建一个注册中心实例,并且放入了缓存,因此,这个步骤中,直接从map中获取就可以了;
- 调用的ZookeeperRegistry#register方法;
public void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
ZookeeperRegistry#register(registeredProviderUrl)
- ZookeeperRegistry本身并没有register方法,但继承了FailbackRegistry父类, 父类中已经实现了register方法
- 因此,调用ZookeeperRegistry实际上调用了父类FailbackRegistry的register方法;
- 调用了doRegister(url)方法, 由于注册中心有好几种,不可能是通用的操作,因此实现逻辑由子类去实现;即调用了ZookeeperRegistry#doRegister(url)方法
- 个人看法:普通工厂模式也是这种设计理念;
@Override
public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
doRegister(url);
} catch (Exception e) {
Throwable t = e;
addFailedRegistered(url);
}
}
ZookeeperRegistry#doRegister(url)
目的:给Zookeeper发送请求,创建一个值toUrlPath(url)为动态的配置结点;
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
toUrlPath(url)
获取结点的值;
private String toUrlPath(URL url) {
return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
}
private String toCategoryPath(URL url) {
return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
}
private String toServicePath(URL url) {
String name = url.getServiceInterface();
if (ANY_VALUE.equals(name)) {
return toRootPath();
}
return toRootDir() + URL.encode(name);
}
private String toRootDir() {
if (root.equals(PATH_SEPARATOR)) {
return root;
}
return root + PATH_SEPARATOR;
}
服务导出源码流程
- ServiceBean.export()方法是导出的入口方法,会执行ServiceConfig.export()方法完成服务导出,导出完了之后会发布一个Spring事件ServiceBeanExportedEvent
- 在ServiceConfig.export()方法中会先调用checkAndUpdateSubConfigs(),这个方法主要完成AbstractConfig的参数刷新(从配置中心获取参数等等),AbstractConfig是指ApplicationConfig、ProtocolConfig、ServiceConfig等等,刷新完后会检查stub、local、mock等参数是否配置正确
- 参数刷新和检查完成了之后,就会开始导出服务,如果配置了延迟导出,那么则按指定的时间利用ScheduledExecutorService来进行延迟导出
- 否则调用doExport()进行服务导出
- 继续调用doExportUrls()进行服务导出
- 首先通过loadRegistries()方法获得所配置的注册中心的URL,可能配了多个配置中心,那么当前所导出的服务需要注册到每个配置中心去,这里,注册中心的是以URL的方式来表示的,使用的是什么注册中心、注册中心的地址和端口,给注册中心所配置的参数等等,都会存在在URL上,此URL以registry://开始
- 获得到注册中心的registryURLs之后,就会遍历当前服务所有的ProtocolConfig,调用doExportUrlsFor1Protocol(protocolConfig, registryURLs);方法把当前服务按每个协议每个注册中心分别进行导出
- 在doExportUrlsFor1Protocol()方法中,会先构造一个服务URL,包括
a. 服务的协议dubbo://, b. 服务的IP和PORT,如果指定了就取指定的,没有指定IP就获取服务器上网卡的IP, c. 以及服务的PATH,如果没有指定PATH参数,则取接口名 d. 以及服务的参数,参数包括服务的参数,服务中某个方法的参数 e. 最终得到的URL类似: dubbo://192.168.1.110:20880/com.tuling.DemoService?timeout=3000&&sayHello.loadbalance=random - 得到服务的URL之后,会把服务URL作为一个参数添加到registryURL中去,然后把registryURL、服务的接口、当前服务实现类ref生成一个Invoker代理对象,再把这个代理对象和当前ServiceConfig对象包装成一个DelegateProviderMetaDataInvoker对象,DelegateProviderMetaDataInvoker就表示了完整的一个服务
- 接下来就会使用Protocol去export导出服务了,导出之后将得到一个Exporter对象(该Exporter对象,可以理解为主要可以用来卸载(unexport)服务,什么时候会卸载服务?在优雅关闭Dubbo应用的时候)
- 接下来我们来详细看看Protocol是怎么导出服务的?
- 但调用protocol.export(wrapperInvoker)方法时,因为protocol是Protocol接口的一个Adaptive对象,所以此时会根据wrapperInvoker的genUrl方法得到一个url,根据此url的协议找到对应的扩展点,此时扩展点就是RegistryProtocol,但是,因为Protocol接口有两个包装类,一个是ProtocolFilterWrapper、ProtocolListenerWrapper,所以实际上在调用export方法时,会经过这两个包装类的export方法,但是在这两个包装类的export方法中都会Registry协议进行了判断,不会做过多处理,所以最终会直接调用到RegistryProtocol的export(Invoker originInvoker)方法
- 在RegistryProtocol的export(Invoker originInvoker)方法中,主要完成了以下几件事情:
a. 生成监听器,监听动态配置中心此服务的参数数据的变化,一旦监听到变化,则重写服务URL,并且在服务导出时先重写一次服务URL b. 拿到重写之后的URL之后,调用doLocalExport()进行服务导出,在这个方法中就会调用DubboProtocol的export方法去导出服务了,导出成功后将得到一个ExporterChangeableWrapper ⅰ. 在DubboProtocol的export方法中主要要做的事情就是启动NettyServer,并且设置一系列的RequestHandler,以便在接收到请求时能依次被这些RequestHandler所处理 ⅱ. 这些RequestHandler在上文已经整理过了 c. 从originInvoker中获取注册中心的实现类,比如ZookeeperRegistry d. 将重写后的服务URL进行简化,把不用存到注册中心去的参数去除 e. 把简化后的服务URL调用ZookeeperRegistry.registry()方法注册到注册中心去 f. 最后将ExporterChangeableWrapper封装为DestroyableExporter对象返回,完成服务导出
Exporter架构
一个服务导出成功后,会生成对应的Exporter:
- DestroyableExporter:Exporter的最外层包装类,这个类的主要作用是可以用来unexporter对应的服务
- ExporterChangeableWrapper:这个类主要负责在unexport对应服务之前,把服务URL从注册中心中移除,把该服务对应的动态配置监听器移除
- ListenerExporterWrapper:这个类主要负责在unexport对应服务之后,把服务导出监听器移除
- DubboExporter:这个类中保存了对应服务的Invoker对象,和当前服务的唯一标志,当NettyServer接收到请求后,会根据请求中的服务信息,找到服务对应的DubboExporter对象,然后从对象中得到Invoker对象
总结
- 服务导出中使用构造器模式,反射, 工厂模式,代码通用性也很强;
- 大概了解服务导出的过程, 至于Dubbo配置的动态变化,这涉及到监听机制,Zookeeper的一大特性吧。 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener) 订阅注册中心配置, 同时有一个监听器,如果配置发生变动,则触发overrideSubscribeListener监听器,重新导入配置;
|