dubbo 扩展机制
??????????
????????????????
???????????????????????? ? ? ? ? ? ?????????
dubbo 扩展原理
?????????
spi(service provider interface)
使用策略模式,一个接口有多个实现类;
接口使用的实现类不在程序中直接指定,由外部配置
?????????????
java spi加载
一次性加载所有实现类并初始化(有些初始化可能比较耗时,也会造成资源浪费);
java spi加载抛出的异常信息异常可能会被吞掉,导致无法排查出真实出错原因
????????????
dubbo spi加载:对java spi做了一些改进
加载所有实现类的类名并缓存,但是没有初始化;
dubbo扩展加载失败会抛出真是异常并打印日志,部分扩展加载失败不会影响其他扩展的加载;
dubbo扩展增加了ioc(通过setter注入其他扩展)、aop(wrapper包装类包含通用逻辑);
????????????
*************
相关注解
????????????
@SPI:扩展接口标识
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE}) //标注在类/接口上,表明该接口是个扩展接口
public @interface SPI {
/**
* default extension name
*/
String value() default "";
/**
* scope of SPI, default value is application scope.
*/
ExtensionScope scope() default ExtensionScope.APPLICATION;
}
???????????
@Adaptive:从url中提取参数,创建动态扩展类
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
/**
* Decide which target extension to be injected. The name of the target extension is decided by the parameter passed
* in the URL, and the parameter names are given by this method.
* <p>
* If the specified parameters are not found from {@link URL}, then the default extension will be used for
* dependency injection (specified in its interface's {@link SPI}).
* <p>
* For example, given <code>String[] {"key1", "key2"}</code>:
* <ol>
* <li>find parameter 'key1' in URL, use its value as the extension's name</li>
* <li>try 'key2' for extension's name if 'key1' is not found (or its value is empty) in URL</li>
* <li>use default extension if 'key2' doesn't exist either</li>
* <li>otherwise, throw {@link IllegalStateException}</li>
* </ol>
* If the parameter names are empty, then a default parameter name is generated from interface's
* class name with the rule: divide classname from capital char into several parts, and separate the parts with
* dot '.', for example, for {@code org.apache.dubbo.xxx.YyyInvokerWrapper}, the generated name is
* <code>String[] {"yyy.invoker.wrapper"}</code>.
*
* @return parameter names in URL
*/
String[] value() default {};
}
??????????
@Aativate:默认激活的扩展实现类
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
/**
* Activate the current extension when one of the groups matches. The group passed into
* {@link ExtensionLoader#getActivateExtension(URL, String, String)} will be used for matching.
*
* @return group names to match
* @see ExtensionLoader#getActivateExtension(URL, String, String)
*/
String[] group() default {};
/**
* Activate the current extension when the specified keys appear in the URL's parameters.
* <p>
* For example, given <code>@Activate("cache, validation")</code>, the current extension will be return only when
* there's either <code>cache</code> or <code>validation</code> key appeared in the URL's parameters.
* </p>
*
* @return URL parameter keys
* @see ExtensionLoader#getActivateExtension(URL, String)
* @see ExtensionLoader#getActivateExtension(URL, String, String)
*/
String[] value() default {};
/**
* Relative ordering info, optional
* Deprecated since 2.7.0
*
* @return extension list which should be put before the current one
*/
@Deprecated
String[] before() default {};
/**
* Relative ordering info, optional
* Deprecated since 2.7.0
*
* @return extension list which should be put after the current one
*/
@Deprecated
String[] after() default {};
/**
* Absolute ordering info, optional
*
* Ascending order, smaller values will be in the front o the list.
*
* @return absolute ordering info
*/
int order() default 0;
}
???????????
*************
相关类与接口
????
ExtensionLoader
/**
* {@link org.apache.dubbo.rpc.model.ApplicationModel}, {@code DubboBootstrap} and this class are
* at present designed to be singleton or static (by itself totally static or uses some static fields).
* So the instances returned from them are of process or classloader scope. If you want to support
* multiple dubbo servers in a single process, you may need to refactor these three classes.
* <p>
* Load dubbo extensions
* <ul>
* <li>auto inject dependency extension </li>
* <li>auto wrap extension in wrapper </li>
* <li>default extension is an adaptive instance</li>
* </ul>
*
* @see <a href="http://java.sun.com/j2se/1.5.0/docs/guide/jar/jar.html#Service%20Provider">Service Provider in Java 5</a>
* @see org.apache.dubbo.common.extension.SPI
* @see org.apache.dubbo.common.extension.Adaptive
* @see org.apache.dubbo.common.extension.Activate
*/
public class ExtensionLoader<T> {
private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);
private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");
private final ConcurrentMap<Class<?>, Object> extensionInstances = new ConcurrentHashMap<>(64);
private final Class<?> type;
private final ExtensionInjector injector;
private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<>();
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();
private final Map<String, Object> cachedActivates = Collections.synchronizedMap(new LinkedHashMap<>());
private final Map<String, Set<String>> cachedActivateGroups = Collections.synchronizedMap(new LinkedHashMap<>());
private final Map<String, String[][]> cachedActivateValues = Collections.synchronizedMap(new LinkedHashMap<>());
private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();
private final Holder<Object> cachedAdaptiveInstance = new Holder<>();
private volatile Class<?> cachedAdaptiveClass = null;
private String cachedDefaultName;
private volatile Throwable createAdaptiveInstanceError;
private Set<Class<?>> cachedWrapperClasses;
private Map<String, IllegalStateException> exceptions = new ConcurrentHashMap<>();
private static volatile LoadingStrategy[] strategies = loadLoadingStrategies();
/**
* Record all unacceptable exceptions when using SPI
*/
private Set<String> unacceptableExceptions = new ConcurrentHashSet<>();
private ExtensionDirector extensionDirector;
private List<ExtensionPostProcessor> extensionPostProcessors;
private InstantiationStrategy instantiationStrategy;
private Environment environment;
private ActivateComparator activateComparator;
private ScopeModel scopeModel;
private AtomicBoolean destroyed = new AtomicBoolean();
*********
获取扩展类
public T getExtension(String name) {
T extension = getExtension(name, true);
if (extension == null) {
throw new IllegalArgumentException("Not find extension: " + name);
}
return extension;
}
public T getExtension(String name, boolean wrap) {
checkDestroyed();
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
return getDefaultExtension();
}
String cacheKey = name;
if (!wrap) {
cacheKey += "_origin";
}
final Holder<Object> holder = getOrCreateHolder(cacheKey);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name, wrap);
holder.set(instance);
}
}
}
return (T) instance;
}
*********
获取自适应扩展类
public T getAdaptiveExtension() {
checkDestroyed();
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError != null) {
throw new IllegalStateException("Failed to create adaptive instance: " +
createAdaptiveInstanceError.toString(),
createAdaptiveInstanceError);
}
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
}
return (T) instance;
}
*********
获取自动激活的扩展
public List<T> getActivateExtension(URL url, String key) {
return getActivateExtension(url, key, null);
}
/**
* This is equivalent to {@code getActivateExtension(url, values, null)}
*
* @param url url
* @param values extension point names
* @return extension list which are activated
* @see #getActivateExtension(org.apache.dubbo.common.URL, String[], String)
*/
public List<T> getActivateExtension(URL url, String[] values) {
return getActivateExtension(url, values, null);
}
/**
* This is equivalent to {@code getActivateExtension(url, url.getParameter(key).split(","), null)}
*
* @param url url
* @param key url parameter key which used to get extension point names
* @param group group
* @return extension list which are activated.
* @see #getActivateExtension(org.apache.dubbo.common.URL, String[], String)
*/
public List<T> getActivateExtension(URL url, String key, String group) {
String value = url.getParameter(key);
return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group);
}
/**
* Get activate extensions.
*
* @param url url
* @param values extension point names
* @param group group
* @return extension list which are activated
* @see org.apache.dubbo.common.extension.Activate
*/
public List<T> getActivateExtension(URL url, String[] values, String group) {
checkDestroyed();
// solve the bug of using @SPI's wrapper method to report a null pointer exception.
Map<Class<?>, T> activateExtensionsMap = new TreeMap<>(activateComparator);
List<String> names = values == null ? new ArrayList<>(0) : asList(values);
Set<String> namesSet = new HashSet<>(names);
if (!namesSet.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {
if (cachedActivateGroups.size() == 0) {
synchronized (cachedActivateGroups) {
// cache all extensions
if (cachedActivateGroups.size() == 0) {
getExtensionClasses();
for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Object activate = entry.getValue();
String[] activateGroup, activateValue;
if (activate instanceof Activate) {
activateGroup = ((Activate) activate).group();
activateValue = ((Activate) activate).value();
} else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
} else {
continue;
}
cachedActivateGroups.put(name, new HashSet<>(Arrays.asList(activateGroup)));
String[][] keyPairs = new String[activateValue.length][];
for (int i = 0; i < activateValue.length; i++) {
if (activateValue[i].contains(":")) {
keyPairs[i] = new String[2];
String[] arr = activateValue[i].split(":");
keyPairs[i][0] = arr[0];
keyPairs[i][1] = arr[1];
} else {
keyPairs[i] = new String[1];
keyPairs[i][0] = activateValue[i];
}
}
cachedActivateValues.put(name, keyPairs);
}
}
}
}
// traverse all cached extensions
cachedActivateGroups.forEach((name, activateGroup) -> {
if (isMatchGroup(group, activateGroup)
&& !namesSet.contains(name)
&& !namesSet.contains(REMOVE_VALUE_PREFIX + name)
&& isActive(cachedActivateValues.get(name), url)) {
activateExtensionsMap.put(getExtensionClass(name), getExtension(name));
}
});
}
if (namesSet.contains(DEFAULT_KEY)) {
// will affect order
// `ext1,default,ext2` means ext1 will happens before all of the default extensions while ext2 will after them
ArrayList<T> extensionsResult = new ArrayList<>(activateExtensionsMap.size() + names.size());
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
if (!name.startsWith(REMOVE_VALUE_PREFIX)
&& !namesSet.contains(REMOVE_VALUE_PREFIX + name)) {
if (!DEFAULT_KEY.equals(name)) {
if (containsExtension(name)) {
extensionsResult.add(getExtension(name));
}
} else {
extensionsResult.addAll(activateExtensionsMap.values());
}
}
}
return extensionsResult;
} else {
// add extensions, will be sorted by its order
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
if (!name.startsWith(REMOVE_VALUE_PREFIX)
&& !namesSet.contains(REMOVE_VALUE_PREFIX + name)) {
if (!DEFAULT_KEY.equals(name)) {
if (containsExtension(name)) {
activateExtensionsMap.put(getExtensionClass(name), getExtension(name));
}
}
}
}
return new ArrayList<>(activateExtensionsMap.values());
}
}
public List<T> getActivateExtensions() {
checkDestroyed();
List<T> activateExtensions = new ArrayList<>();
TreeMap<Class<?>, T> activateExtensionsMap = new TreeMap<>(activateComparator);
getExtensionClasses();
for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Object activate = entry.getValue();
if (!(activate instanceof Activate)) {
continue;
}
activateExtensionsMap.put(getExtensionClass(name), getExtension(name));
}
if (!activateExtensionsMap.isEmpty()) {
activateExtensions.addAll(activateExtensionsMap.values());
}
return activateExtensions;
}
?????????????
???????????
ExtensionInjector:扩展注入接口
@SPI(scope = ExtensionScope.SELF)
public interface ExtensionInjector extends ExtensionAccessorAware {
/**
* Get instance of specify type and name.
*
* @param type object type.
* @param name object name.
* @return object instance.
*/
<T> T getInstance(Class<T> type, String name);
@Override
default void setExtensionAccessor(ExtensionAccessor extensionAccessor) {
}
}
???????????????????????
???????????
AdaptiveExtensionInjector:默认使用的注入实现类
@Adaptive //有注解@Adaptive,默认使用该编译器
public class AdaptiveExtensionInjector implements ExtensionInjector, Lifecycle {
private List<ExtensionInjector> injectors = Collections.emptyList();
private ExtensionAccessor extensionAccessor;
public AdaptiveExtensionInjector() {
}
@Override
public void setExtensionAccessor(ExtensionAccessor extensionAccessor) {
this.extensionAccessor = extensionAccessor;
}
@Override
public void initialize() throws IllegalStateException {
ExtensionLoader<ExtensionInjector> loader = extensionAccessor.getExtensionLoader(ExtensionInjector.class);
List<ExtensionInjector> list = new ArrayList<ExtensionInjector>();
for (String name : loader.getSupportedExtensions()) {
list.add(loader.getExtension(name));
}
injectors = Collections.unmodifiableList(list);
}
@Override
public <T> T getInstance(Class<T> type, String name) {
for (ExtensionInjector injector : injectors) {
T extension = injector.getInstance(type, name);
if (extension != null) {
return extension;
}
}
return null;
}
@Override
public void start() throws IllegalStateException {
}
@Override
public void destroy() throws IllegalStateException {
}
}
??????????
????????????
Compiler:代码编译器接口
@SPI(value = "javassist", scope = ExtensionScope.FRAMEWORK) //默认javassist
public interface Compiler {
/**
* Compile java source code.
*
* @param code Java source code
* @param classLoader classloader
* @return Compiled class
*/
Class<?> compile(String code, ClassLoader classLoader);
}
???????????????????????
??????????????
AdaptiveCompiler
@Adaptive //有注解@Adaptive,默认使用该编译器
public class AdaptiveCompiler implements Compiler, ScopeModelAware {
private FrameworkModel frameworkModel;
@Override
public void setFrameworkModel(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
}
private static volatile String DEFAULT_COMPILER;
public static void setDefaultCompiler(String compiler) {
DEFAULT_COMPILER = compiler;
}
@Override
public Class<?> compile(String code, ClassLoader classLoader) {
Compiler compiler;
ExtensionLoader<Compiler> loader = frameworkModel.getExtensionLoader(Compiler.class);
String name = DEFAULT_COMPILER; // copy reference
if (name != null && name.length() > 0) {
compiler = loader.getExtension(name);
} else {
compiler = loader.getDefaultExtension();
}
return compiler.compile(code, classLoader);
}
}
??????????
?? ????????????
???????????????????????? ? ? ? ? ? ?????????
java spi 示例
?????????
???????????????????????
?????????
HelloService
public interface HelloService {
String hello();
}
???????????
HelloServiceImpl
public class HelloServiceImpl implements HelloService {
@Override
public String hello() {
return "hello";
}
}
???????????
HelloServiceImpl2
public class HelloServiceImpl2 implements HelloService {
@Override
public String hello() {
return "hello 2";
}
}
????????????
META-INF/services/com.example.demo.service.HelloService文件
com.example.demo.service.impl.HelloServiceImpl
com.example.demo.service.impl.HelloServiceImpl2
????????????
Test:测试类
public class Test {
public static void main(String[] args) {
ServiceLoader<HelloService> helloServices = ServiceLoader.load(HelloService.class);
for (HelloService helloService : helloServices){
System.out.println(helloService.hello());
}
}
}
??????????
运行测试类,控制台输出:
hello
hello 2
? ? ????????
?????????????????
???????????????????????? ? ? ? ? ? ?????????
dubbo spi 示例
?????????
???????????????????????
????????????
HelloService
@SPI //此处添加注解,标识该接口是个扩展接口
public interface HelloService {
String hello();
}
???????????
HelloServiceImpl
public class HelloServiceImpl implements HelloService {
@Override
public String hello() {
return "hello";
}
}
??????????
HelloServiceImpl2
public class HelloServiceImpl2 implements HelloService {
@Override
public String hello() {
return "hello 2";
}
}
???????????
META-INF/dubbo/com.example.demo.service.HelloService文件
impl=com.example.demo.service.impl.HelloServiceImpl
impl2=com.example.demo.service.impl.HelloServiceImpl2
??????????
DubboTest:测试类
public class DubboTest {
public static void main(String[] args) {
HelloService helloService = ExtensionLoader.getExtensionLoader(HelloService.class).getExtension("impl");
System.out.println(helloService.hello());
HelloService helloService2 = ExtensionLoader.getExtensionLoader(HelloService.class).getExtension("impl2");
System.out.println(helloService2.hello());
}
}
????????
点击运行,控制台输出:
log4j:WARN No appenders could be found for logger (org.apache.dubbo.rpc.model.FrameworkModel).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
hello
hello 2
????????
?????????????
|