Dubbo约定如下
-
SPI文件存储路径在META-INF/dubbo/internal目录下,并且文件名为接口名的全路径名接口包名+接口名 -
每个SPI文件里面格式定义为:扩展名=具体类名,例如dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
实现路径:
-
getExtensionLoader(Class<T> type) 就是为该接口new一个ExtensionLoader ,然后缓存起来。 -
getAdaptiveExtension() 获取一个拓展装饰类的对象,如果@Adaptive注解在类上就是一个装饰类,如果注释在方法上就是一个动态代理类,例如Protocol$Adaptive对象(如果是动态代理类,与dubbo中URL配合可以在运行时动态决定使用的实现类)。 -
getExtension(String name) 获取一个对象
这些方法都在ExtensionLoader 中,ExtensionLoader 是最核心的类,负责扩展点的加载和生命周期管理。
getExtensionLoader(Class type)
getExtensionLoader方法 这是一个静态工厂方法,入参是一个可扩展的接口,返回一个该接口的ExtensionLoader实体类。通过这个实体类,可以根据name获得具体的扩展,也可以获得一个自适应扩展。
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null) {
throw new IllegalArgumentException("Extension type == null");
}
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type +
") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
如果缓存EXTENSION_LOADERS 中没有获取到对应的ExtensionLoader 则新建一个ExtensionLoader
在ExtensionLoader 结构体中有两个成员变量type 和objectFactory 。
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory =
(type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
type是我们传入的class类型,objectFactory 是通过ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension() 获取的一个被@Adaptive 标注的ExtensionFactory 的实现类AdaptiveExtensionFactory
最后生成的ExtensionLoader 会被放入EXTENSION_LOADERS 中。
关于objectFactory的一些细节
- objectFactory就是ExtensionFactory,它就是通过
ExtensionLoader.getExtensionLoader(ExtensionFactory.class) 来实现的,但是它的objectFactory=null -objectFactory 作用,它就是为dubbo的IOC提供所有对象。 objectFactory 的IOC对象具体由ExtensionFactory 的子类SpiExtensionFactory 和SpringExtensionFactory 提供。
getAdaptiveExtension()
在对自适应拓展生成过程进行深入分析之前,我们先来看一下与自适应拓展息息相关的一个注解,即 @Adaptive 注解。该注解的定义如下:
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
String[] value() default {};
}
@Adaptive可以标注在类或者接口方法上
- 标注在类上
getAdaptiveExtension 会直接返回被@Adaptive 注解实现类。
如上述所说的ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()返回的就是AdaptiveExtensionFactory
@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {
private final List<ExtensionFactory> factories;
public AdaptiveExtensionFactory() {
ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
for (String name : loader.getSupportedExtensions()) {
list.add(loader.getExtension(name));
}
factories = Collections.unmodifiableList(list);
}
@Override
public <T> T getExtension(Class<T> type, String name) {
for (ExtensionFactory factory : factories) {
T extension = factory.getExtension(type, name);
if (extension != null) {
return extension;
}
}
return null;
}
}
- 标注在接口的方法上,方法参数必定有URL或者传入的方法入参对象中有
getUrl 方法,dubbo动态编译生成如Protocol$Adaptive 类,运行时会根据URL中的参数,决定使用哪个实现类。
从Protocol 获取自适应类的过程来解析getAdaptiveExtension 执行过程
@SPI("dubbo")
public interface Protocol {
............. 删除多余代码
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
............删除多余代码
}
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
getAdaptiveExtension 方法首先会检查缓存,缓存未命中,则调用 createAdaptiveExtension 方法创建自适应拓展。
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;
}
重要的方法是createAdaptiveExtension
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
createAdaptiveExtension 主要执行三个动作:
- 调用
getAdaptiveExtensionClass 方法获取自适应拓展 Class 对象 - 通过反射进行实例化
- 调用
injectExtension 方法向拓展实例中注入依赖
接下来,分析 getAdaptiveExtensionClass 方法的逻辑。
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
getAdaptiveExtensionClass 方法同样包含了三个逻辑,如下:
- 调用
getExtensionClasses 获取所有的拓展类 - 检查缓存
cachedAdaptiveClass ,若缓存不为空,则直接返回缓存(cachedAdaptiveClass缓存的是被@Adaptive注解的类) - 若缓存为空,则调用
createAdaptiveExtensionClass 创建自适应拓展类
首先从第一个逻辑说起,getExtensionClasses 这个方法用于获取某个接口的所有实现类其实就是解析META-INF/dubbo/internal/ ,META-INF/dubbo/ ,META-INF/services/ 里面放置的文件。比如该方法可以获取 Protocol 接口的DubboProtocol、HttpProtocol、InjvmProtocol 等实现类。在获取实现类的过程中,如果某个某个实现类被 Adaptive 注解修饰了,如AdaptiveExtensionFactory,那么该类就会被赋值给 cachedAdaptiveClass 变量。此时,上面步骤中的第二步条件成立(缓存不为空),直接返回 cachedAdaptiveClass 。如果所有的实现类均未被 Adaptive 注解修饰,那么执行第三步逻辑,创建自适应拓展类:
getExtensionClasses
先分析第一个逻辑getExtensionClasses 获取所有的拓展类,代码如下:
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
getExtensionClasses 主要执行两个动作:
- 检查缓存
cachedClasses 中是否有值。 - 如果没有则
loadExtensionClasses() 获取所有的拓展类。 - 最后将
classes 放入cachedClasses 中。
接着看loadExtensionClasses() 是如何获取所有的扩展类的。
private Map<String, Class<?>> loadExtensionClasses() {
cacheDefaultExtensionName();
Map<String, Class<?>> extensionClasses = new HashMap<>();
for (LoadingStrategy strategy : strategies) {
loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(),
strategy.overridden(), strategy.excludedPackages());
loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"),
strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
}
return extensionClasses;
}
loadExtensionClasses 主要做两件事情。
- 从
META-INF/dubbo/internal/,从META-INF/dubbo/(DubboSPI路径)和META-INF/services/ (JDK SPI路径)获取所有扩展。
继续看loadDirectory方法
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type) {
String fileName = dir + type;
try {
Enumeration<java.net.URL> urls;
ClassLoader classLoader = findClassLoader();
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL resourceURL = urls.nextElement();
loadResource(extensionClasses, classLoader, resourceURL);
}
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", description file: " + fileName + ").", t);
}
}
loadDirectory 主要包含三个逻辑:
- 获取一个类加载器。
- 通过加载器,加载文件名为
META-INF/dubbo/internal/ ,META-INF/dubbo/,META-INF/services/ 下的资源,如org.apache.dubbo.rpc.Protocol 。 - 遍历urls获取META-INF
/dubbo/internal/org.apache.dubbo.rpc.Protocol 中的SPI实现。
解析的逻辑在loadResource 代码如下:
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), "utf-8"));
try {
String line;
while ((line = reader.readLine()) != null) {
final int ci = line.indexOf('#');
if (ci >= 0) {
line = line.substring(0, ci);
}
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
if (line.length() > 0) {
loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
}
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", class file: " + resourceURL + ") in " + resourceURL, t);
}
}
loadResource 逻辑比较简单就是一行行读取,然后获取扩展名和扩展实现类,然后调用loadClass 方法
继续看loadClass方法
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
boolean overridden) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error occurred when loading extension class (interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + " is not subtype of interface.");
}
if (clazz.isAnnotationPresent(Adaptive.class)) {
cacheAdaptiveClass(clazz, overridden);
} else if (isWrapperClass(clazz)) {
cacheWrapperClass(clazz);
} else {
clazz.getConstructor();
if (StringUtils.isEmpty(name)) {
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException(
"No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}
String[] names = NAME_SEPARATOR.split(name);
if (ArrayUtils.isNotEmpty(names)) {
cacheActivateClass(clazz, names[0]);
for (String n : names) {
cacheName(clazz, n);
saveInExtensionClass(extensionClasses, clazz, n, overridden);
}
}
}
执行loadClass 方法主要是为以下几个属性赋值
-
cachedAdaptiveClass 如果这个类class含有adaptive注解就赋值,例如 AdaptiveExtensionFactory ,而例如Protocol在这个环节是没有的。 2.cachedWrapperClasses 只有当该class无adaptive 注解,并且构造函数包含目标接口(type)类型 例如Protocol里面的spi就只有ProtocolFilterWapper 和ProtocolListenerWrapper 能命中 -
cachedActivates 被Activate注解的类 -
剩下的类就存储在cachedNames 和extensionClasses ,extensionClasses 是从外部传入的,之后它的值会放入cachedClasses 缓存中。
至此加载SPI扩展的方法就执行完了。
createAdaptiveExtensionClass
回到getAdaptiveExtensionClass 方法,像Protocol 所有的实现类均未被 Adaptive 注解修饰,就会调用createAdaptiveExtensionClass 生成一个动态类。相关代码如下
private Class<?> createAdaptiveExtensionClass() {
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
createAdaptiveExtensionClass 方法用于生成自适应拓展类,该方法首先会生成自适应拓展类的源码,然后通过 Compiler 实例(Dubbo 默认使用 javassist 作为编译器)编译源码,得到代理类 Class 实例。生成动态类的代码比较复杂,但是所有的动态类都是根据一个模板来生成的。
package <扩展点接口所在包>
public class <扩展点接口名>$Adpative implements <扩展点接口>{
public <有@Adaptive注解的接口方法>(<方法参数>){
if(是否有URL类型参数?) 使用该URL参数
else if(是否方法类型上有URL属性) 使用该URL属性
#<else 在加载扩展点生成自适应扩展点类时抛异常,即加载扩展点失败!>
if(获取的URL == null){
throw new IllegalArgumentException("url = null")
}
根据@Adaptive注解上声明的Key的顺序,从URL获取Value,作为实际扩展点名。
如URL没有Value,则使用缺省扩展点实现。如没有扩展点,throw new IllegalArgumentException("Fail to get extension");
扩展点实现调用方法,并返回结果
}
public <没有@Adaptive注解的接口方法>(<方法参数>){
throw new UnsupportedOperationException("is not adaptive method!")
}
}
下面是动态生成Protocol$Adpative的代码
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
private static final org.apache.dubbo.common.logger.Logger logger = org.apache.dubbo.common.logger.LoggerFactory
.getLogger(ExtensionLoader.class);
private java.util.concurrent.atomic.AtomicInteger count = new java.util.concurrent.atomic.AtomicInteger(0);
public void destroy() {
throw new UnsupportedOperationException(
"method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException(
"method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1)
throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url("
+ url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = null;
try {
extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader
.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
} catch (Exception e) {
if (count.incrementAndGet() == 1) {
logger.warn(
"Failed to find extension named " + extName
+ " for type org.apache.dubbo.rpc.Protocol, will use default extension dubbo instead.",
e);
}
extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader
.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension("dubbo");
}
return extension.refer(arg0, arg1);
}
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0)
throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url("
+ url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = null;
try {
extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader
.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
} catch (Exception e) {
if (count.incrementAndGet() == 1) {
logger.warn(
"Failed to find extension named " + extName
+ " for type org.apache.dubbo.rpc.Protocol, will use default extension dubbo instead.",
e);
}
extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader
.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class)
.getExtension("dubbo");
}
return extension.export(arg0);
}
}
生成动态类之后,在createAdaptiveExtension 方法会通过反射生成实例,并调用injectExtension 方法进入IOC反转控制模式,实现依赖注入。
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
到此,关于getAdaptiveExtension 自适应拓展的原理,实现就分析完了。有关动态编译和依赖注入的原理下一章再分析。
getExtension(String name)
上一节我们分析了Dubbo自适应扩展机制,下面我们从 ExtensionLoader 的 getExtension 方法作为入口,对拓展类对象的获取过程进行详细的分析。
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
return getDefaultExtension();
}
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
上面了逻辑比较简单首先判断是否使用默认扩展,如果不是查找缓存,若未命中则调用createExtension创建拓展对象。
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
createExtension 方法的逻辑稍复杂一下,主要执行以下四个步骤:
- 通过
getExtensionClasses 获取所有的拓展类(在上一节中我们已经讲解过这个方法)。 - 查找缓存
EXTENSION_INSTANCES 中,是否有扩展实例,没有则通过反射创建拓展对象。 - 将拓展对象包裹在相应的
Wrapper 对象中,以Protocol 为例,它最终被getExtensionClasses 解析出的wrapper类ProtocolFilterWrapper和 ProtocolListenerWrapper包裹, - 调用injectExtension向拓展对象中注入依赖。
第一个步骤是加载拓展类的关键,第三和第四个步骤是 Dubbo IOC 与 AOP 的具体实现。
有关getExtensionClasses 分析,请看上一节,下面着重分析第三和第四步骤,先分析
Dubbo IOC
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
if (method.getAnnotation(DisableInject.class) != null) {
continue;
}
Class<?> pt = method.getParameterTypes()[0];
if (ReflectUtils.isPrimitives(pt)) {
continue;
}
try {
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
在上面代码中,objectFactory 变量的类型为 AdaptiveExtensionFactory ,AdaptiveExtensionFactory 内部维护了一个 ExtensionFactory 列表,用于存储其他类型的 ExtensionFactory 。Dubbo 目前提供了两种 ExtensionFactory ,分别是 SpiExtensionFactory 和 SpringExtensionFactory 。前者用于创建自适应的拓展,后者是用于从 Spring 的 IOC 容器中获取所需的拓展。Dubbo的IOC反转控制,就是从spi和spring里面提取对象赋值。
Dubbo AOP
下面只截取了createExtension 部分代码
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
在Protoco l中wrapperClass类 是ProtocolFilterWrapper 或ProtocolListenerWrapper 这两个类中的构造函数都有Protocol参数,
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
}
Dubbo AOP 获取wrapperClass 然通过反射生成对象,进行AOP,然后调用injectExtension 进行IOC.最后将值重新赋给instance ,所以getExtension 方法最后返回的是一个wrapper类。
好了Dubbo SPI实现源码解析讲完了,本篇文章就先到这里了。
|