写在前面
服务提供者提供的服务标记了@Service注解的类 ,想要被服务消费者使用,必须将服务暴露出去,即让服务消费者拿到封装服务信息的com.alibaba.dubbo.common.URL 对象字符串,当前服务暴露的方式有如下三种:
远程暴露:即将服务信息注册到远端的注册中心,如配置<dubbo:service scope="remote" />。
本地暴露:JVM内部调用,因为信息已经在内存中,通过内存可以直接获取调用信息,因此叫做本地暴露,如配置<dubbo:service scope="local">。
不暴露:不暴露服务,可以忽略这种方式,如配置<dubbo:service scope="none">。
本文来分享的是本地暴露 ,相关的源码在模块dubbo-rpc-injmv 中,如下图:
在dubbo之服务提供者配置 一文中,我们其实分析了部分服务暴露的内容,大家可以看下,本文为了承接,会有部分内容的重叠,我们就从方法com.alibaba.dubbo.config.ServiceConfig.doExportUrls 来开始分析。
1:doExportUrls
源码如下:
class FakeCls {
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
}
2022-01-21 18:25:43 处获取所有配置的注册中心的地址,具体参考1.1:loadRegistries 。2022-01-21 18:34:22 处是将服务按照指定的协议注册到注册中心,具体参考1.2:doExportUrlsFor1Protocol 。
1.1:loadRegistries
本文讲解的时本地服务暴漏,不会使用到这里的信息,但是为了完整性,放在这里,对这部分感兴趣的朋友可以参考dubbo之服务远程暴露 文章分析。
1.2:doExportUrlsFor1Protocol
将服务按照指定的协议注册到注册中心,分为远程暴漏和本地暴漏,其中本地暴漏不会注册服务到注册中心,因为是同一个JVM,信息可以直接从JVM中获取到,因为本文重点分析的是本地服务暴漏,所以关于远程暴漏的相关源码会选择性忽略,关于这部分的分析,可以才参考dubbo之服务远程暴露 文章分析。源码如下:
class FakeCls {
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
String scope = url.getParameter(Constants.SCOPE_KEY);
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
}
}
this.urls.add(url);
}
}
2022-01-22 12:25:51 处是本地服务暴漏,具体参考1.3:exportLocal 。
1.3:exportLocal
源码如下:
class FakeCls {
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST)
.setPort(0);
StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
}
该小节以下部分稍微有点绕,大家吃耐心,不懂的话,多看几遍!!!
2022-01-22 17:21:22 处protocol定义为private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); ,可以看到是获取自适应扩展类 ,其中从Protocol接口也可以看出来,源码如下:
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
可以看到export 方法是标注了@Adaptive 注解的,此处protocol ’ 是Protocol#Adaptive ,这个很好理解,因为获取就是动态生成的自适应子类,通过其进行最终调用真正的扩展类实现,那么想要知道调用的到底是谁就需要知道生成的代码到是什么样子的,我们可以通过如下的步骤来获取其内容:
在com.alibaba.dubbo.common.extension.ExtensionLoader.createAdaptiveExtensionClass中的代码ClassLoader classLoader = findClassLoader();添加条件变量"code.contains("Protocol$Adaptive")",然后再次运行程序,就可以停止在这里,获取code的内容了。
如下是我获取的内容:
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
我们重点关注其中的export 方法,可以看到是调用url.getProtocol() 作为目标扩展类的名称,那么是什么值呢?我们的url为injvm://127.0.0.1/... 可以看到协议是injvm ,那么对应的扩展类是谁呢,可以从文件META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 中找到答案,其中key为injvm 的的配置项内容是injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol ,因此最终调用的扩展类类是com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol ,但是真的是这样吗?我们来debug看一下,如下图:
从图中可以看出,还分别调用了QosProtocolWrapper ,ProtocolListenerWrapper ,ProtocolFilterWrapper ,这是Protocol的Wrapper类,我们从META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 中可以看出来,如下图:
关于Wrapper详细可以参考dubbo之SPI Wrapper分析 。
最终调用过程为Protocol$Adaptive->QosProtocolWrapper->ProtocolListenerWrapper->ProtocolFilterWrapper->InjvmProtocol 。具体的我们在2:Protocol 分析。
2:Protocol
源码如下:
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
2.1:Protocol$Adaptive
如何获取该类信息可以参考1.3:exportLocal 。
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
2.2:ProtocolListenerWrapper
源码如下:
class FakeCls {
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
}
2022-01-23 19:29:28 处protocol.export(invoker) 继续调用装饰的protocol类,这里调用的就是ProtocolFilterWrapper ,关于该类参考2.3:ProtocolFilterWrapper 。Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)) 处是使用keyexporter.listener ,从url获取值从而获取要激活的ExporterListener 扩展类。ListenerExporterWrapper 构造函数源码如下:
class FakeCls {
public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {}
}
该监听器的作用是用来监听Exporter暴漏完毕和取消暴漏完毕。
2.3:ProtocolFilterWrapper
主要用于给Invoker增加Filter过滤器链,在调用真正的服务方法之前会调用过滤器Filter的逻辑,具体参考2.3.1:export 。
2.3.1:export
源码如下:
class FakeCls {
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
}
2022-01-24 16:02:53 处buildInvokerChain 添加Filter链,具体参考2.3.2:buildInvokerChain 。
2.3.2:buildInvokerChain
源码如下:
class FakeCls {
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
}
越靠后的Filter越先执行先执行,执行顺序如filter1->filte2->filter3->...->服务类方法 。
2.4:InjvmProtocol
该类是Injvm协议的实现类,我们还是从入口方法export开始。
2.4.1:export
源码如下:
class FakeCls {
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
}
主要是创建了InjvmExporter 对象。关于该对象,具体参考3:Exporter 。
3:Exporter
该接口用于基于相关协议来暴露服务,接口源码如下:
public interface Exporter<T> {
Invoker<T> getInvoker();
void unexport();
}
主要类图如下:
接下来我们从类AbstractExporter 开始来看以下。
3.1:AbstractExporter
源码如下:
public abstract class AbstractExporter<T> implements Exporter<T> {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final Invoker<T> invoker;
private volatile boolean unexported = false;
public AbstractExporter(Invoker<T> invoker) {
if (invoker == null)
throw new IllegalStateException("service invoker == null");
if (invoker.getInterface() == null)
throw new IllegalStateException("service type == null");
if (invoker.getUrl() == null)
throw new IllegalStateException("service url == null");
this.invoker = invoker;
}
@Override
public Invoker<T> getInvoker() {
return invoker;
}
@Override
public void unexport() {
if (unexported) {
return;
}
unexported = true;
getInvoker().destroy();
}
@Override
public String toString() {
return getInvoker().toString();
}
}
3.2:InjvmExporter
AbstractExporter 的子类,源码如下:
class InjvmExporter<T> extends AbstractExporter<T> {
private final String key;
private final Map<String, Exporter<?>> exporterMap;
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
exporterMap.put(key, this);
}
@Override
public void unexport() {
super.unexport();
exporterMap.remove(key);
}
}
3.3 ListenerExporterWrapper
具有监听功能的Exporter的Wrapper类,源码如下:
public class ListenerExporterWrapper<T> implements Exporter<T> {
private static final Logger logger = LoggerFactory.getLogger(ListenerExporterWrapper.class);
private final Exporter<T> exporter;
private final List<ExporterListener> listeners;
public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
if (exporter == null) {
throw new IllegalArgumentException("exporter == null");
}
this.exporter = exporter;
this.listeners = listeners;
if (listeners != null && !listeners.isEmpty()) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
listener.exported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
@Override
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
@Override
public void unexport() {
try {
exporter.unexport();
} finally {
if (listeners != null && !listeners.isEmpty()) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
listener.unexported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
}
ExporterListener参考4:ExporterListener 。
4:ExporterListener
源码如下:
@SPI
public interface ExporterListener {
void exported(Exporter<?> exporter) throws RpcException;
void unexported(Exporter<?> exporter);
}
类图如下:
接下来看下这个唯一的实现类ExporterListenerAdapter ,如下:
public abstract class ExporterListenerAdapter implements ExporterListener {
@Override
public void exported(Exporter<?> exporter) throws RpcException {
}
@Override
public void unexported(Exporter<?> exporter) throws RpcException {
}
}
也仅仅是个空实现,没有实际的逻辑。
|