启动流程
注解@EnableDubbo 注解内分别导入 @Import(DubboConfigConfigurationRegistrar.class) @Import(DubboComponentScanRegistrar.class)
DubboConfigConfigurationRegistrar
public class DubboConfigConfigurationRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
importingClassMetadata.getAnnotationAttributes(EnableDubboConfig.class.getName()));
boolean multiple = attributes.getBoolean("multiple");
registerBeans(registry, DubboConfigConfiguration.Single.class);
if (multiple) {
registerBeans(registry, DubboConfigConfiguration.Multiple.class);
}
registerCommonBeans(registry);
}
}
注册DubboConfigConfiguration
@EnableConfigurationBeanBindings({
@EnableConfigurationBeanBinding(prefix = "dubbo.applications", type = ApplicationConfig.class, multiple = true),
@EnableConfigurationBeanBinding(prefix = "dubbo.modules", type = ModuleConfig.class, multiple = true),
@EnableConfigurationBeanBinding(prefix = "dubbo.registries", type = RegistryConfig.class, multiple = true),
@EnableConfigurationBeanBinding(prefix = "dubbo.protocols", type = ProtocolConfig.class, multiple = true),
@EnableConfigurationBeanBinding(prefix = "dubbo.monitors", type = MonitorConfig.class, multiple = true),
@EnableConfigurationBeanBinding(prefix = "dubbo.providers", type = ProviderConfig.class, multiple = true),
@EnableConfigurationBeanBinding(prefix = "dubbo.consumers", type = ConsumerConfig.class, multiple = true),
@EnableConfigurationBeanBinding(prefix = "dubbo.config-centers", type = ConfigCenterBean.class, multiple = true),
@EnableConfigurationBeanBinding(prefix = "dubbo.metadata-reports", type = MetadataReportConfig.class, multiple = true),
@EnableConfigurationBeanBinding(prefix = "dubbo.metricses", type = MetricsConfig.class, multiple = true)
})
public static class Multiple {
}
注册各种配置类,并且指定prefix
DubboComponentScanRegistrar
public class DubboComponentScanRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);
registerServiceAnnotationBeanPostProcessor(packagesToScan, registry);
registerCommonBeans(registry);
}
private void registerServiceAnnotationBeanPostProcessor(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
BeanDefinitionBuilder builder = rootBeanDefinition(ServiceAnnotationBeanPostProcessor.class);
builder.addConstructorArgValue(packagesToScan);
builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry);
}
private Set<String> getPackagesToScan(AnnotationMetadata metadata) {
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
metadata.getAnnotationAttributes(DubboComponentScan.class.getName()));
String[] basePackages = attributes.getStringArray("basePackages");
Class<?>[] basePackageClasses = attributes.getClassArray("basePackageClasses");
String[] value = attributes.getStringArray("value");
Set<String> packagesToScan = new LinkedHashSet<String>(Arrays.asList(value));
packagesToScan.addAll(Arrays.asList(basePackages));
for (Class<?> basePackageClass : basePackageClasses) {
packagesToScan.add(ClassUtils.getPackageName(basePackageClass));
}
if (packagesToScan.isEmpty()) {
return Collections.singleton(ClassUtils.getPackageName(metadata.getClassName()));
}
return packagesToScan;
}
}
功能为注册bean注册了扫描dubbo服务的ServiceAnnotationBeanPostProcessor,同时指定了扫描路径 除此之外还注册了以下bean
static void registerCommonBeans(BeanDefinitionRegistry registry) {
registerInfrastructureBean(registry, ReferenceAnnotationBeanPostProcessor.BEAN_NAME,
ReferenceAnnotationBeanPostProcessor.class);
registerInfrastructureBean(registry, DubboConfigAliasPostProcessor.BEAN_NAME,
DubboConfigAliasPostProcessor.class);
registerInfrastructureBean(registry, DubboLifecycleComponentApplicationListener.BEAN_NAME,
DubboLifecycleComponentApplicationListener.class);
registerInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME,
DubboBootstrapApplicationListener.class);
registerInfrastructureBean(registry, DubboConfigDefaultPropertyValueBeanPostProcessor.BEAN_NAME,
DubboConfigDefaultPropertyValueBeanPostProcessor.class);
}
ServiceAnnotationBeanPostProcessor
public class ServiceAnnotationBeanPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware,
ResourceLoaderAware, BeanClassLoaderAware {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Set<String> packagesToScan;
private Environment environment;
private ResourceLoader resourceLoader;
private ClassLoader classLoader;
public ServiceAnnotationBeanPostProcessor(String... packagesToScan) {
this(Arrays.asList(packagesToScan));
}
public ServiceAnnotationBeanPostProcessor(Collection<String> packagesToScan) {
this(new LinkedHashSet<>(packagesToScan));
}
public ServiceAnnotationBeanPostProcessor(Set<String> packagesToScan) {
this.packagesToScan = packagesToScan;
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
registerBeans(registry, DubboBootstrapApplicationListener.class);
Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);
if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
registerServiceBeans(resolvedPackagesToScan, registry);
} else {
if (logger.isWarnEnabled()) {
logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
}
}
}
private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
DubboClassPathBeanDefinitionScanner scanner =
new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);
BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);
scanner.setBeanNameGenerator(beanNameGenerator);
scanner.addIncludeFilter(new AnnotationTypeFilter(Service.class));
scanner.addIncludeFilter(new AnnotationTypeFilter(com.alibaba.dubbo.config.annotation.Service.class));
for (String packageToScan : packagesToScan) {
scanner.scan(packageToScan);
Set<BeanDefinitionHolder> beanDefinitionHolders =
findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);
if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {
for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
registerServiceBean(beanDefinitionHolder, registry, scanner);
}
if (logger.isInfoEnabled()) {
logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
beanDefinitionHolders +
" } were scanned under package[" + packageToScan + "]");
}
} else {
if (logger.isWarnEnabled()) {
logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["
+ packageToScan + "]");
}
}
}
}
private BeanNameGenerator resolveBeanNameGenerator(BeanDefinitionRegistry registry) {
BeanNameGenerator beanNameGenerator = null;
if (registry instanceof SingletonBeanRegistry) {
SingletonBeanRegistry singletonBeanRegistry = SingletonBeanRegistry.class.cast(registry);
beanNameGenerator = (BeanNameGenerator) singletonBeanRegistry.getSingleton(CONFIGURATION_BEAN_NAME_GENERATOR);
}
if (beanNameGenerator == null) {
if (logger.isInfoEnabled()) {
logger.info("BeanNameGenerator bean can't be found in BeanFactory with name ["
+ CONFIGURATION_BEAN_NAME_GENERATOR + "]");
logger.info("BeanNameGenerator will be a instance of " +
AnnotationBeanNameGenerator.class.getName() +
" , it maybe a potential problem on bean name generation.");
}
beanNameGenerator = new AnnotationBeanNameGenerator();
}
return beanNameGenerator;
}
private Set<BeanDefinitionHolder> findServiceBeanDefinitionHolders(
ClassPathBeanDefinitionScanner scanner, String packageToScan, BeanDefinitionRegistry registry,
BeanNameGenerator beanNameGenerator) {
Set<BeanDefinition> beanDefinitions = scanner.findCandidateComponents(packageToScan);
Set<BeanDefinitionHolder> beanDefinitionHolders = new LinkedHashSet<>(beanDefinitions.size());
for (BeanDefinition beanDefinition : beanDefinitions) {
String beanName = beanNameGenerator.generateBeanName(beanDefinition, registry);
BeanDefinitionHolder beanDefinitionHolder = new BeanDefinitionHolder(beanDefinition, beanName);
beanDefinitionHolders.add(beanDefinitionHolder);
}
return beanDefinitionHolders;
}
private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,
DubboClassPathBeanDefinitionScanner scanner) {
Class<?> beanClass = resolveClass(beanDefinitionHolder);
Annotation service = findServiceAnnotation(beanClass);
AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false);
Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);
String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();
AbstractBeanDefinition serviceBeanDefinition =
buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);
String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);
if (scanner.checkCandidate(beanName, serviceBeanDefinition)) {
registry.registerBeanDefinition(beanName, serviceBeanDefinition);
if (logger.isInfoEnabled()) {
logger.info("The BeanDefinition[" + serviceBeanDefinition +
"] of ServiceBean has been registered with name : " + beanName);
}
} else {
if (logger.isWarnEnabled()) {
logger.warn("The Duplicated BeanDefinition[" + serviceBeanDefinition +
"] of ServiceBean[ bean name : " + beanName +
"] was be found , Did @DubboComponentScan scan to same package in many times?");
}
}
}
private Annotation findServiceAnnotation(Class<?> beanClass) {
Annotation service = findMergedAnnotation(beanClass, Service.class);
if (service == null) {
service = findMergedAnnotation(beanClass, com.alibaba.dubbo.config.annotation.Service.class);
}
return service;
}
private String generateServiceBeanName(AnnotationAttributes serviceAnnotationAttributes, Class<?> interfaceClass) {
ServiceBeanNameBuilder builder = create(interfaceClass, environment)
.group(serviceAnnotationAttributes.getString("group"))
.version(serviceAnnotationAttributes.getString("version"));
return builder.build();
}
private Class<?> resolveClass(BeanDefinitionHolder beanDefinitionHolder) {
BeanDefinition beanDefinition = beanDefinitionHolder.getBeanDefinition();
return resolveClass(beanDefinition);
}
private Class<?> resolveClass(BeanDefinition beanDefinition) {
String beanClassName = beanDefinition.getBeanClassName();
return resolveClassName(beanClassName, classLoader);
}
private Set<String> resolvePackagesToScan(Set<String> packagesToScan) {
Set<String> resolvedPackagesToScan = new LinkedHashSet<String>(packagesToScan.size());
for (String packageToScan : packagesToScan) {
if (StringUtils.hasText(packageToScan)) {
String resolvedPackageToScan = environment.resolvePlaceholders(packageToScan.trim());
resolvedPackagesToScan.add(resolvedPackageToScan);
}
}
return resolvedPackagesToScan;
}
private AbstractBeanDefinition buildServiceBeanDefinition(Annotation serviceAnnotation,
AnnotationAttributes serviceAnnotationAttributes,
Class<?> interfaceClass,
String annotatedServiceBeanName) {
BeanDefinitionBuilder builder = rootBeanDefinition(ServiceBean.class);
AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
MutablePropertyValues propertyValues = beanDefinition.getPropertyValues();
String[] ignoreAttributeNames = of("provider", "monitor", "application", "module", "registry", "protocol",
"interface", "interfaceName", "parameters");
propertyValues.addPropertyValues(new AnnotationPropertyValuesAdapter(serviceAnnotation, environment, ignoreAttributeNames));
addPropertyReference(builder, "ref", annotatedServiceBeanName);
builder.addPropertyValue("interface", interfaceClass.getName());
builder.addPropertyValue("parameters", convertParameters(serviceAnnotationAttributes.getStringArray("parameters")));
List<MethodConfig> methodConfigs = convertMethodConfigs(serviceAnnotationAttributes.get("methods"));
if (!methodConfigs.isEmpty()) {
builder.addPropertyValue("methods", methodConfigs);
}
String providerConfigBeanName = serviceAnnotationAttributes.getString("provider");
if (StringUtils.hasText(providerConfigBeanName)) {
addPropertyReference(builder, "provider", providerConfigBeanName);
}
String monitorConfigBeanName = serviceAnnotationAttributes.getString("monitor");
if (StringUtils.hasText(monitorConfigBeanName)) {
addPropertyReference(builder, "monitor", monitorConfigBeanName);
}
String applicationConfigBeanName = serviceAnnotationAttributes.getString("application");
if (StringUtils.hasText(applicationConfigBeanName)) {
addPropertyReference(builder, "application", applicationConfigBeanName);
}
String moduleConfigBeanName = serviceAnnotationAttributes.getString("module");
if (StringUtils.hasText(moduleConfigBeanName)) {
addPropertyReference(builder, "module", moduleConfigBeanName);
}
String[] registryConfigBeanNames = serviceAnnotationAttributes.getStringArray("registry");
List<RuntimeBeanReference> registryRuntimeBeanReferences = toRuntimeBeanReferences(registryConfigBeanNames);
if (!registryRuntimeBeanReferences.isEmpty()) {
builder.addPropertyValue("registries", registryRuntimeBeanReferences);
}
String[] protocolConfigBeanNames = serviceAnnotationAttributes.getStringArray("protocol");
List<RuntimeBeanReference> protocolRuntimeBeanReferences = toRuntimeBeanReferences(protocolConfigBeanNames);
if (!protocolRuntimeBeanReferences.isEmpty()) {
builder.addPropertyValue("protocols", protocolRuntimeBeanReferences);
}
return builder.getBeanDefinition();
}
private List convertMethodConfigs(Object methodsAnnotation) {
if (methodsAnnotation == null) {
return Collections.EMPTY_LIST;
}
return MethodConfig.constructMethodConfig((Method[]) methodsAnnotation);
}
private ManagedList<RuntimeBeanReference> toRuntimeBeanReferences(String... beanNames) {
ManagedList<RuntimeBeanReference> runtimeBeanReferences = new ManagedList<>();
if (!ObjectUtils.isEmpty(beanNames)) {
for (String beanName : beanNames) {
String resolvedBeanName = environment.resolvePlaceholders(beanName);
runtimeBeanReferences.add(new RuntimeBeanReference(resolvedBeanName));
}
}
return runtimeBeanReferences;
}
private void addPropertyReference(BeanDefinitionBuilder builder, String propertyName, String beanName) {
String resolvedBeanName = environment.resolvePlaceholders(beanName);
builder.addPropertyReference(propertyName, resolvedBeanName);
}
private Map<String, String> convertParameters(String[] parameters) {
if (ArrayUtils.isEmpty(parameters)) {
return null;
}
if (parameters.length % 2 != 0) {
throw new IllegalArgumentException("parameter attribute must be paired with key followed by value");
}
Map<String, String> map = new HashMap<>();
for (int i = 0; i < parameters.length; i += 2) {
map.put(parameters[i], parameters[i + 1]);
}
return map;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}
}
注册了DubboBootstrapApplicationListener org.apache.dubbo.config.spring.beans.factory.annotation.ServiceAnnotationBeanPostProcessor#registerServiceBeans
private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
DubboClassPathBeanDefinitionScanner scanner =
new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);
BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);
scanner.setBeanNameGenerator(beanNameGenerator);
scanner.addIncludeFilter(new AnnotationTypeFilter(Service.class));
scanner.addIncludeFilter(new AnnotationTypeFilter(com.alibaba.dubbo.config.annotation.Service.class));
for (String packageToScan : packagesToScan) {
scanner.scan(packageToScan);
Set<BeanDefinitionHolder> beanDefinitionHolders =
findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);
if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {
for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
registerServiceBean(beanDefinitionHolder, registry, scanner);
}
if (logger.isInfoEnabled()) {
logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
beanDefinitionHolders +
" } were scanned under package[" + packageToScan + "]");
}
} else {
if (logger.isWarnEnabled()) {
logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["
+ packageToScan + "]");
}
}
}
}
创建DubboClassPathBeanDefinitionScanner 利用scanner扫描com.alibaba.dubbo.config.annotation.Service注解
scanner.scan(packageToScan);
Set<BeanDefinitionHolder> beanDefinitionHolders =
findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);
if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {
for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
registerServiceBean(beanDefinitionHolder, registry, scanner);
}
if (logger.isInfoEnabled()) {
logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
beanDefinitionHolders +
" } were scanned under package[" + packageToScan + "]");
}
}
首先将dubbo的@Service注解的类注册成springBean 然后注册ServiceBean
private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,
DubboClassPathBeanDefinitionScanner scanner) {
Class<?> beanClass = resolveClass(beanDefinitionHolder);
Annotation service = findServiceAnnotation(beanClass);
AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false);
Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);
String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();
AbstractBeanDefinition serviceBeanDefinition =
buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);
String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);
if (scanner.checkCandidate(beanName, serviceBeanDefinition)) {
registry.registerBeanDefinition(beanName, serviceBeanDefinition);
if (logger.isInfoEnabled()) {
logger.info("The BeanDefinition[" + serviceBeanDefinition +
"] of ServiceBean has been registered with name : " + beanName);
}
} else {
if (logger.isWarnEnabled()) {
logger.warn("The Duplicated BeanDefinition[" + serviceBeanDefinition +
"] of ServiceBean[ bean name : " + beanName +
"] was be found , Did @DubboComponentScan scan to same package in many times?");
}
}
}
注册的ServiceBean 定义benClass为org.apache.dubbo.config.spring.ServiceBean 属性中有
interface就是要暴露的接口
beanName与bean的name有区分。
dubbo SPI
ExtensionLoader
例如type为,FrameworkExt,调用
org.apache.dubbo.common.extension.ExtensionLoader#getExtensionLoader
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null) {
throw new IllegalArgumentException("Extension type == null");
}
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
}
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type (" + type +
") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
}
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
EXTENSION_LOADERS中存储了加载完成的实例类
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>(64);
第一次进入 EXTENSION_LOADERS没有存储指定类型 执行
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
创建一个ExtensionLoader放进去,构造函数
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
注意如果Type为ExtensionFactory 那么objectFactory为null 用同样的方式获取 type 为 ExtensionFactory的ExtensionLoader org.apache.dubbo.common.extension.ExtensionLoader#getAdaptiveExtension
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError != null) {
throw new IllegalStateException("Failed to create adaptive instance: " +
createAdaptiveInstanceError.toString(),
createAdaptiveInstanceError);
}
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
}
return (T) instance;
}
如果之前没有创建过,那就创建一个
instance = createAdaptiveExtension();
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
org.apache.dubbo.common.extension.ExtensionLoader#getAdaptiveExtensionClass
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
最终org.apache.dubbo.common.extension.ExtensionLoader#loadExtensionClasses
private Map<String, Class<?>> loadExtensionClasses() {
cacheDefaultExtensionName();
Map<String, Class<?>> extensionClasses = new HashMap<>();
for (LoadingStrategy strategy : strategies) {
loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.excludedPackages());
loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.excludedPackages());
}
return extensionClasses;
}
strategies ,从三个路径查找
private static final String SERVICES_DIRECTORY = "META-INF/services/";
private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
进入org.apache.dubbo.common.extension.ExtensionLoader#loadDirectory(java.util.Map<java.lang.String,java.lang.Class<?>>, java.lang.String, java.lang.String, boolean, java.lang.String…)
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type,
boolean extensionLoaderClassLoaderFirst, String... excludedPackages) {
String fileName = dir + type;
try {
Enumeration<java.net.URL> urls = null;
ClassLoader classLoader = findClassLoader();
if (extensionLoaderClassLoaderFirst) {
ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader();
if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) {
urls = extensionLoaderClassLoader.getResources(fileName);
}
}
if(urls == null || !urls.hasMoreElements()) {
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
}
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL resourceURL = urls.nextElement();
loadResource(extensionClasses, classLoader, resourceURL, excludedPackages);
}
}
} catch (Throwable t) {
logger.error("Exception occurred when loading extension class (interface: " +
type + ", description file: " + fileName + ").", t);
}
}
例如ExtensionFactory,配置了三个spi实现 AdaptiveExtensionFactory因为使用了@Adaptive注解,存入cachedAdaptiveClass
AdaptiveExtensionFactory
getAdaptiveExtensionClass().newInstance()
创建实例 AdaptiveExtensionFactory 创建 FrameworkExt 的ExtensionLoader其中,objectFactory 为AdaptiveExtensionFactory
@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {
private final List<ExtensionFactory> factories;
public AdaptiveExtensionFactory() {
ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
for (String name : loader.getSupportedExtensions()) {
list.add(loader.getExtension(name));
}
factories = Collections.unmodifiableList(list);
}
@Override
public <T> T getExtension(Class<T> type, String name) {
for (ExtensionFactory factory : factories) {
T extension = factory.getExtension(type, name);
if (extension != null) {
return extension;
}
}
return null;
}
}
SPI用法一getExtension
创建完,对应类型的ExtentionLoader之后 ExtensionLoader.getExtensionLoader(FrameworkExt.class) 直接调用 org.apache.dubbo.common.extension.ExtensionLoader#getExtension方法
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
return getDefaultExtension();
}
final Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
org.apache.dubbo.common.extension.ExtensionLoader#createExtension
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
initExtension(instance);
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}
创建实例并为注入属性 org.apache.dubbo.common.extension.ExtensionLoader#injectExtension
private T injectExtension(T instance) {
if (objectFactory == null) {
return instance;
}
try {
for (Method method : instance.getClass().getMethods()) {
if (!isSetter(method)) {
continue;
}
if (method.getAnnotation(DisableInject.class) != null) {
continue;
}
Class<?> pt = method.getParameterTypes()[0];
if (ReflectUtils.isPrimitives(pt)) {
continue;
}
try {
String property = getSetterProperty(method);
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("Failed to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
如果是set方法 利用 objectFactory.getExtension(pt, property); objectFactory就是AdaptiveExtensionFactory
@Override
public <T> T getExtension(Class<T> type, String name) {
for (ExtensionFactory factory : factories) {
T extension = factory.getExtension(type, name);
if (extension != null) {
return extension;
}
}
return null;
}
分别调用SpiExtensionFactory处理
public class SpiExtensionFactory implements ExtensionFactory {
@Override
public <T> T getExtension(Class<T> type, String name) {
if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
if (!loader.getSupportedExtensions().isEmpty()) {
return loader.getAdaptiveExtension();
}
}
return null;
}
}
如果set的接口是spi接口,创建对应的AdaptiveExtension注入到set方法中
SPI用法二getAdaptiveExtension
以Protocol为例 org.apache.dubbo.common.extension.ExtensionLoader#getAdaptiveExtension
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError != null) {
throw new IllegalStateException("Failed to create adaptive instance: " +
createAdaptiveInstanceError.toString(),
createAdaptiveInstanceError);
}
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
}
return (T) instance;
}
org.apache.dubbo.common.extension.ExtensionLoader#createAdaptiveExtension
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
与之前ExtensionFactory类似 首先获取spi文件中配置的class org.apache.dubbo.common.extension.ExtensionLoader#getExtensionClasses
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
org.apache.dubbo.common.extension.ExtensionLoader#loadExtensionClasses
private Map<String, Class<?>> loadExtensionClasses() {
cacheDefaultExtensionName();
Map<String, Class<?>> extensionClasses = new HashMap<>();
for (LoadingStrategy strategy : strategies) {
loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.excludedPackages());
loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.excludedPackages());
}
return extensionClasses;
}
private void cacheDefaultExtensionName() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation == null) {
return;
}
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) {
throw new IllegalStateException("More than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if (names.length == 1) {
cachedDefaultName = names[0];
}
}
}
org.apache.dubbo.common.extension.ExtensionLoader#loadClass 加载类时,判断@Adaptive,如果有缓存到cacheAdaptiveClass,其他的加入到extensionClasses map中
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error occurred when loading extension class (interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + " is not subtype of interface.");
}
if (clazz.isAnnotationPresent(Adaptive.class)) {
cacheAdaptiveClass(clazz);
} else if (isWrapperClass(clazz)) {
cacheWrapperClass(clazz);
} else {
clazz.getConstructor();
if (StringUtils.isEmpty(name)) {
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}
String[] names = NAME_SEPARATOR.split(name);
if (ArrayUtils.isNotEmpty(names)) {
cacheActivateClass(clazz, names[0]);
for (String n : names) {
cacheName(clazz, n);
saveInExtensionClass(extensionClasses, clazz, n);
}
}
}
}
回到getAdaptiveExtensionClass,如果对应的配置中配置了带有@Adaptive注解的spi,那么直接返回,如果没有,那么就创建
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
org.apache.dubbo.common.extension.ExtensionLoader#createAdaptiveExtensionClass
private Class<?> createAdaptiveExtensionClass() {
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
根据 接口的注解方法生成对应的子类方法
protocol生成子类 Protocol$Adaptive
package com.xishan.store.usercenter.userweb;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public java.util.List getServers() {
throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
}
、生成对的也很简单,就是从url中获取getProtocol()
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
根据protocol ,使用getExtention接口获取实例,调用对应方法
服务暴露流程
DubboBootstrapApplicationListener
@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
当applicationContext刷新完毕事件触发 进入org.apache.dubbo.config.bootstrap.DubboBootstrap#start
public DubboBootstrap start() {
if (started.compareAndSet(false, true)) {
initialize();
if (logger.isInfoEnabled()) {
logger.info(NAME + " is starting...");
}
exportServices();
if (!isOnlyRegisterProvider() || hasExportedServices()) {
exportMetadataService();
registerServiceInstance();
}
referServices();
if (logger.isInfoEnabled()) {
logger.info(NAME + " has started.");
}
}
return this;
}
进入
private void initialize() {
if (!initialized.compareAndSet(false, true)) {
return;
}
ApplicationModel.initFrameworkExts();
startConfigCenter();
useRegistryAsConfigCenterIfNecessary();
loadRemoteConfigs();
checkGlobalConfigs();
initMetadataService();
initEventListener();
if (logger.isInfoEnabled()) {
logger.info(NAME + " has been initialized!");
}
}
org.apache.dubbo.rpc.model.ApplicationModel#initFrameworkExts
public static void initFrameworkExts() {
Set<FrameworkExt> exts = ExtensionLoader.getExtensionLoader(FrameworkExt.class).getSupportedExtensionInstances();
for (FrameworkExt ext : exts) {
ext.initialize();
}
}
public Set<T> getSupportedExtensionInstances() {
List<T> instances = new LinkedList<>();
Set<String> supportedExtensions = getSupportedExtensions();
if (CollectionUtils.isNotEmpty(supportedExtensions)) {
for (String name : supportedExtensions) {
instances.add(getExtension(name));
}
}
sort(instances, Prioritized.COMPARATOR);
return new LinkedHashSet<>(instances);
}
getSupportedExtensionInstances把spi配置所有的class都实例化并返回 FrameworkExt配置了三个
ConfigManager
生产者,消费者,注册中心的,配置 还记得启动时的那几个配置类吗 DubboConfigConfiguration
分别看这些配置 在这些配置类的父类中
@PostConstruct
public void addIntoConfigManager() {
ApplicationModel.getConfigManager().addConfig(this);
}
在加载完毕bean之后,都会执行这个防范,把配置加入到ConfigManager中 org.apache.dubbo.config.bootstrap.DubboBootstrap#startConfigCenter
private void startConfigCenter() {
Collection<ConfigCenterConfig> configCenters = configManager.getConfigCenters();
if (CollectionUtils.isEmpty(configCenters)) {
ConfigCenterConfig configCenterConfig = new ConfigCenterConfig();
configCenterConfig.refresh();
if (configCenterConfig.isValid()) {
configManager.addConfigCenter(configCenterConfig);
configCenters = configManager.getConfigCenters();
}
} else {
for (ConfigCenterConfig configCenterConfig : configCenters) {
configCenterConfig.refresh();
ConfigValidationUtils.validateConfigCenterConfig(configCenterConfig);
}
}
if (CollectionUtils.isNotEmpty(configCenters)) {
CompositeDynamicConfiguration compositeDynamicConfiguration = new CompositeDynamicConfiguration();
for (ConfigCenterConfig configCenter : configCenters) {
compositeDynamicConfiguration.addConfiguration(prepareEnvironment(configCenter));
}
environment.setDynamicConfiguration(compositeDynamicConfiguration);
}
configManager.refreshAll();
}
如果没有配置,新建一个configCenterConfig,存进去 org.apache.dubbo.config.context.ConfigManager#refreshAll
public void refreshAll() {
write(() -> {
getApplication().ifPresent(ApplicationConfig::refresh);
getMonitor().ifPresent(MonitorConfig::refresh);
getModule().ifPresent(ModuleConfig::refresh);
getProtocols().forEach(ProtocolConfig::refresh);
getRegistries().forEach(RegistryConfig::refresh);
getProviders().forEach(ProviderConfig::refresh);
getConsumers().forEach(ConsumerConfig::refresh);
});
}
分别调用配置类的refresh方法
Environment
三大中心指的:注册中心,元数据中心,配置中心。 在 2.7 之前的版本,Dubbo 只配备了注册中心,主流使用的注册中心为 zookeeper。新增加了元数据中心和配置中心,自然是为了解决对应的痛点,下面我们来详细阐释三大中心改造的原因。 详细信息可以看这篇博客。 https://blog.csdn.net/u012881904/article/details/95891448
Environment 类的定义和构造方法
public class Environment extends LifecycleAdapter implements FrameworkExt {
public static final String NAME = "environment";
private final PropertiesConfiguration propertiesConfiguration;
private final SystemConfiguration systemConfiguration;
private final EnvironmentConfiguration environmentConfiguration;
private final InmemoryConfiguration externalConfiguration;
private final InmemoryConfiguration appExternalConfiguration;
private CompositeConfiguration globalConfiguration;
private Map<String, String> externalConfigurationMap = new HashMap<>();
private Map<String, String> appExternalConfigurationMap = new HashMap<>();
private boolean configCenterFirst = true;
private DynamicConfiguration dynamicConfiguration;
public Environment() {
this.propertiesConfiguration = new PropertiesConfiguration();
this.systemConfiguration = new SystemConfiguration();
this.environmentConfiguration = new EnvironmentConfiguration();
this.externalConfiguration = new InmemoryConfiguration();
this.appExternalConfiguration = new InmemoryConfiguration();
}
- 实现了FrameworkExt接口,
- 成员变量中包含五种配置类。
- 构造方法直接就创建了五种配置类的默认实现,下面我们逐个查看每个配置类
PropertiesConfiguration
PropertiesConfiguration是Environment类的成员变量, 主要加载的是dubbo.properties配置文件中的配置信息。
public class PropertiesConfiguration implements Configuration {
public PropertiesConfiguration() {
ExtensionLoader<OrderedPropertiesProvider> propertiesProviderExtensionLoader = ExtensionLoader.getExtensionLoader(OrderedPropertiesProvider.class);
Set<String> propertiesProviderNames = propertiesProviderExtensionLoader.getSupportedExtensions();
if (propertiesProviderNames == null || propertiesProviderNames.isEmpty()) {
return;
}
List<OrderedPropertiesProvider> orderedPropertiesProviders = new ArrayList<>();
for (String propertiesProviderName : propertiesProviderNames) {
orderedPropertiesProviders.add(propertiesProviderExtensionLoader.getExtension(propertiesProviderName));
}
orderedPropertiesProviders.sort((OrderedPropertiesProvider a, OrderedPropertiesProvider b) -> {
return b.priority() - a.priority();
});
Properties properties = ConfigUtils.getProperties();
for (OrderedPropertiesProvider orderedPropertiesProvider :
orderedPropertiesProviders) {
properties.putAll(orderedPropertiesProvider.initProperties());
}
ConfigUtils.setProperties(properties);
}
@Override
public Object getInternalProperty(String key) {
return ConfigUtils.getProperty(key);
}
}
实现了Configuration接口, 这个是所有配置类都实现的接口
会加载实现了OrderedPropertiesProvider接口的配置类, 如果加载导入会根据配置的优先级排序下, 使用ConfigUtils.getProperties()方法加载classPath下的dubbo.properties。 当然这个路径可以在系统参数中修改 最后将第1步加载的配置合并到第2步的配置中 但是我运行demo程序,发现这个方法不会走到第2步中,直接在第一步中加载到空白后就返回了。 这是因为, ExtensionLoader propertiesProviderExtensionLoader = ExtensionLoader.getExtensionLoader(OrderedPropertiesProvider.class); spi没有指定实现
SystemConfiguration
Environment类的成员变量, 加载系统的一些基本信息。 这个代码中注释中
public class SystemConfiguration implements Configuration {
@Override
public Object getInternalProperty(String key) {
return System.getProperty(key);
}
}
逻辑很简单就是从系统中获取配置信息
EnvironmentConfiguration
Environment类的成员变量, 加载系统环境配置信息 。 功能几乎和SystemConfiguration一致,代码也是超级简单。
public class EnvironmentConfiguration implements Configuration {
@Override
public Object getInternalProperty(String key) {
String value = System.getenv(key);
if (StringUtils.isEmpty(value)) {
value = System.getenv(StringUtils.toOSStyleKey(key));
}
return value;
}
}
VM options 需要以 -D 或 -X 或 -XX 开头,每个参数最好使用空格隔开。 program arguments 每个参数需要以空格隔开。否则将会被识别成一个参数,自己用的时候还得手动处理。 Environment variable 没有前缀,优先级低于 VM options ,即如果VM options 有一个变量和 Environment variable中的变量的key相同,则以VM options 中为准,(如果用命令行启动,这个参数需要在运行java类以前使用 set JAVA_HOME=D:\jdk1.8.0_05 这种方式进行临时修改,这种方式只在当前cmd窗口有效,点击看详情 设置临时的java环境变量)。 java提供了System类的静态方法getenv()和getProperty()用于返回系统相关的变量与属性,**getenv()**方法返回的变量大多于系统相关,
getProperty() 方法返回的变量大多与java程序有关。
System.getenv() 方法是获取指定的环境变量的值。
System.getenv(String name) 接收参数为任意字符串,当存在指定环境变量时即返回环境变量的值,否则返回null。
System.getProperty() 是获取系统的相关属性,包括文件编码、操作系统名称、区域、用户名等,此属性一般由jvm自动获取,不能设置。
System.getProperty(String key) 接收参数为任意字符串,当存在指定属性时即返回属性的值,否则返回null。 具体应用如下:
externalConfiguration和appExternalConfiguration
这两个成员变量的对象是同一种类InmemoryConfiguration, 我们看下InmemoryConfiguration
public class InmemoryConfiguration implements Configuration {
private Map<String, String> store = new LinkedHashMap<>();
@Override
public Object getInternalProperty(String key) {
return store.get(key);
}
public void addProperty(String key, String value) {
store.put(key, value);
}
public void addProperties(Map<String, String> properties) {
if (properties != null) {
this.store.putAll(properties);
}
}
public void setProperties(Map<String, String> properties) {
if (properties != null) {
this.store = properties;
}
}
public void clear() {
this.store.clear();
}
}
从源码中可以看到,这new出来的就是一个空白的配置类, 他提供了往里面设置配置信息的方法。 到这里构造方法结束, 构造方法就是创建了五个成员变量, 其中PropertiesConfiguration变量会从配置文件中加载配置信息,但是demo程序中并没有加载, 我们接着往下看。
ServiceRepository
private ConcurrentMap<String, ServiceDescriptor> services = new ConcurrentHashMap<>();
private ConcurrentMap<String, ConsumerModel> consumers = new ConcurrentHashMap<>();
private ConcurrentMap<String, ProviderModel> providers = new ConcurrentHashMap<>();
对服务,生产者,消费者进行管理
initialize初始化方法
这个很爱在Spi加载到Environment类后,紧接着就会调用这个方法
@Override
public void initialize() throws IllegalStateException {
ConfigManager configManager = ApplicationModel.getConfigManager();
Optional<Collection<ConfigCenterConfig>> defaultConfigs = configManager.getDefaultConfigCenter();
defaultConfigs.ifPresent(configs -> {
for (ConfigCenterConfig config : configs) {
this.setExternalConfigMap(config.getExternalConfiguration());
this.setAppExternalConfigMap(config.getAppExternalConfiguration());
}
});
this.externalConfiguration.setProperties(externalConfigurationMap);
this.appExternalConfiguration.setProperties(appExternalConfigurationMap);
}
这个方法就是获取配置中心的信息 , 并将配置中心的配置放入到externalConfiguration和appExternalConfiguration 两个成员变量中 在demo示例中,因为没有引入配置中心的包, 获取到的配置中心的值是空的。
getPrefixedConfiguration方法
Dubbo会从API, XML, annotation), 启动参数, config center, 等等很多地方加载到配置信息。 这个方法会根据配置的优先级过滤出有用的配置信息。
public synchronized CompositeConfiguration getPrefixedConfiguration(AbstractConfig config) {
CompositeConfiguration prefixedConfiguration = new CompositeConfiguration(config.getPrefix(), config.getId());
Configuration configuration = new ConfigConfigurationAdapter(config);
if (this.isConfigCenterFirst()) {
prefixedConfiguration.addConfiguration(systemConfiguration);
prefixedConfiguration.addConfiguration(environmentConfiguration);
prefixedConfiguration.addConfiguration(appExternalConfiguration);
prefixedConfiguration.addConfiguration(externalConfiguration);
prefixedConfiguration.addConfiguration(configuration);
prefixedConfiguration.addConfiguration(propertiesConfiguration);
} else {
prefixedConfiguration.addConfiguration(systemConfiguration);
prefixedConfiguration.addConfiguration(environmentConfiguration);
prefixedConfiguration.addConfiguration(configuration);
prefixedConfiguration.addConfiguration(appExternalConfiguration);
prefixedConfiguration.addConfiguration(externalConfiguration);
prefixedConfiguration.addConfiguration(propertiesConfiguration);
}
return prefixedConfiguration;
}
CompositeConfiguration 对象构造参数中如果传入了prefix和id字段, 则通过这个对象获取配置信息时候会自动在传入的配置key前面加上prefix和id前缀。
先构造个CompositeConfiguration 类对象 逐步根据优先级将配置信息加入到CompositeConfiguration类对象中 返回CompositeConfiguration类对象, org.apache.dubbo.config.AbstractConfig#refresh
public void refresh() {
Environment env = ApplicationModel.getEnvironment();
try {
CompositeConfiguration compositeConfiguration = env.getPrefixedConfiguration(this);
Method[] methods = getClass().getMethods();
for (Method method : methods) {
if (MethodUtils.isSetter(method)) {
try {
String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method)));
if (StringUtils.isNotEmpty(value) && ClassUtils.isTypeMatch(method.getParameterTypes()[0], value)) {
method.invoke(this, ClassUtils.convertPrimitive(method.getParameterTypes()[0], value));
}
} catch (NoSuchMethodException e) {
logger.info("Failed to override the property " + method.getName() + " in " +
this.getClass().getSimpleName() +
", please make sure every property has getter/setter method provided.");
}
} else if (isParametersSetter(method)) {
String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method)));
if (StringUtils.isNotEmpty(value)) {
Map<String, String> map = invokeGetParameters(getClass(), this);
map = map == null ? new HashMap<>() : map;
map.putAll(convert(StringUtils.parseParameters(value), ""));
invokeSetParameters(getClass(), this, map);
}
}
}
} catch (Exception e) {
logger.error("Failed to override ", e);
}
}
在refresh方法中,使用environment中的变量覆盖,调用对应set方法当前Config中的变量 然后来到initialize的
useRegistryAsConfigCenterIfNecessary方法
private void useRegistryAsConfigCenterIfNecessary() {
if (environment.getDynamicConfiguration().isPresent()) {
return;
}
if (CollectionUtils.isNotEmpty(configManager.getConfigCenters())) {
return;
}
configManager.getDefaultRegistries().stream()
.filter(registryConfig -> registryConfig.getUseAsConfigCenter() == null || registryConfig.getUseAsConfigCenter())
.forEach(registryConfig -> {
String protocol = registryConfig.getProtocol();
String id = "config-center-" + protocol + "-" + registryConfig.getPort();
ConfigCenterConfig cc = new ConfigCenterConfig();
cc.setId(id);
if (cc.getParameters() == null) {
cc.setParameters(new HashMap<>());
}
if (registryConfig.getParameters() != null) {
cc.getParameters().putAll(registryConfig.getParameters());
}
cc.getParameters().put(CLIENT_KEY, registryConfig.getClient());
cc.setProtocol(registryConfig.getProtocol());
cc.setPort(registryConfig.getPort());
cc.setAddress(registryConfig.getAddress());
cc.setNamespace(registryConfig.getGroup());
cc.setUsername(registryConfig.getUsername());
cc.setPassword(registryConfig.getPassword());
if (registryConfig.getTimeout() != null) {
cc.setTimeout(registryConfig.getTimeout().longValue());
}
cc.setHighestPriority(false);
configManager.addConfigCenter(cc);
});
startConfigCenter();
}
如果配置中心为空, 用注册中心作为配置中心 org.apache.dubbo.config.bootstrap.DubboBootstrap#startConfigCenter 开启配置中心
compositeDynamicConfiguration.addConfiguration(prepareEnvironment(configCenter));
org.apache.dubbo.config.bootstrap.DubboBootstrap#prepareEnvironment
private DynamicConfiguration prepareEnvironment(ConfigCenterConfig configCenter) {
if (configCenter.isValid()) {
if (!configCenter.checkOrUpdateInited()) {
return null;
}
DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl());
String configContent = dynamicConfiguration.getProperties(configCenter.getConfigFile(), configCenter.getGroup());
String appGroup = getApplication().getName();
String appConfigContent = null;
if (isNotEmpty(appGroup)) {
appConfigContent = dynamicConfiguration.getProperties
(isNotEmpty(configCenter.getAppConfigFile()) ? configCenter.getAppConfigFile() : configCenter.getConfigFile(),
appGroup
);
}
try {
environment.setConfigCenterFirst(configCenter.isHighestPriority());
environment.updateExternalConfigurationMap(parseProperties(configContent));
environment.updateAppExternalConfigurationMap(parseProperties(appConfigContent));
} catch (IOException e) {
throw new IllegalStateException("Failed to parse configurations from Config Center.", e);
}
return dynamicConfiguration;
}
return null;
}
static DynamicConfiguration getDynamicConfiguration(URL connectionURL) {
String protocol = connectionURL.getProtocol();
DynamicConfigurationFactory factory = getDynamicConfigurationFactory(protocol);
return factory.getDynamicConfiguration(connectionURL);
}
通过spi获取对应的config对应的是NacosDynamicConfiguration 通过这个获取nacos上的配置文件
environment.setConfigCenterFirst(configCenter.isHighestPriority());
environment.updateExternalConfigurationMap(parseProperties(configContent));
environment.updateAppExternalConfigurationMap(parseProperties(appConfigContent));
设置配置中心配置优先级 就算设置优先级,系统变量和jvm变量都更高 但是有了配置中心的配置,会覆盖掉项目中的配置文件配置 来到 org.apache.dubbo.config.bootstrap.DubboBootstrap#loadRemoteConfigs
public Set<String> getRegistryIds() {
Set<String> registryIds = new HashSet<>();
registryIds.addAll(getSubProperties(ApplicationModel.getEnvironment().getExternalConfigurationMap(),
REGISTRIES_SUFFIX));
registryIds.addAll(getSubProperties(ApplicationModel.getEnvironment().getAppExternalConfigurationMap(),
REGISTRIES_SUFFIX));
return unmodifiableSet(registryIds);
}
String _REGISTRIES_SUFFIX _= “dubbo.registries.” 配置了的话就加载这个配置 校验配置合法性
checkGlobalConfigs();
initMetadataService
元数据,具体什么是元数据,可以看看这篇文章 https://blog.csdn.net/weixin_38308374/article/details/105984050
private void initMetadataService() {
startMetadataReport();
this.metadataService = getExtension(getMetadataType());
this.metadataServiceExporter = new ConfigurableMetadataServiceExporter(metadataService);
}
org.apache.dubbo.config.bootstrap.DubboBootstrap#startMetadataReport
private void startMetadataReport() {
ApplicationConfig applicationConfig = getApplication();
String metadataType = applicationConfig.getMetadataType();
Collection<MetadataReportConfig> metadataReportConfigs = configManager.getMetadataConfigs();
if (CollectionUtils.isEmpty(metadataReportConfigs)) {
if (REMOTE_METADATA_STORAGE_TYPE.equals(metadataType)) {
throw new IllegalStateException("No MetadataConfig found, you must specify the remote Metadata Center address when 'metadata=remote' is enabled.");
}
return;
}
MetadataReportConfig metadataReportConfig = metadataReportConfigs.iterator().next();
ConfigValidationUtils.validateMetadataConfig(metadataReportConfig);
if (!metadataReportConfig.isValid()) {
return;
}
MetadataReportInstance.init(metadataReportConfig.toUrl());
}
通过配置创建元数据中心
metadataReport = metadataReportFactory.getMetadataReport(metadataReportURL);
在启动时 org.apache.dubbo.metadata.report.support.AbstractMetadataReport#storeProviderMetadataTask 会在元数据中心创建配置
private void storeProviderMetadataTask(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
try {
if (logger.isInfoEnabled()) {
logger.info("store provider metadata. Identifier : " + providerMetadataIdentifier + "; definition: " + serviceDefinition);
}
allMetadataReports.put(providerMetadataIdentifier, serviceDefinition);
failedReports.remove(providerMetadataIdentifier);
Gson gson = new Gson();
String data = gson.toJson(serviceDefinition);
doStoreProviderMetadata(providerMetadataIdentifier, data);
saveProperties(providerMetadataIdentifier, data, true, !syncReport);
} catch (Exception e) {
failedReports.put(providerMetadataIdentifier, serviceDefinition);
metadataReportRetry.startRetryTask();
logger.error("Failed to put provider metadata " + providerMetadataIdentifier + " in " + serviceDefinition + ", cause: " + e.getMessage(), e);
}
}
创建元数据配置,具体怎么使用后续再看
initEventListener
private void initEventListener() {
addEventListener(this);
}
public DubboBootstrap addEventListener(EventListener<?> listener) {
eventDispatcher.addEventListener(listener);
return this;
}
eventDispatcher中加入一个事件 至此,initialize方法执行完毕
exportServices
org.apache.dubbo.config.bootstrap.DubboBootstrap#exportServices
private void exportServices() {
configManager.getServices().forEach(sc -> {
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
if (exportAsync) {
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future<?> future = executor.submit(() -> {
sc.export();
});
asyncExportingFutures.add(future);
} else {
sc.export();
exportedServices.add(sc);
}
});
}
认识ServiceConfig 我们知道 在扫描dubbo 服务时,不仅会创建真正的实例,还会创建一个ServiceBean beanDefination 也会实例化这个ServiceBean
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, BeanNameAware, ApplicationEventPublisherAware {
因为其继承自AbstractConfig
也会在bean加载完毕之后将自己加入到Configmanager
也因此,这里遍历ServiceConfig进行暴露
org.apache.dubbo.config.ServiceConfig#export
public synchronized void export() {
if (!shouldExport()) {
return;
}
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
checkAndUpdateSubConfigs();
serviceMetadata.setVersion(version);
serviceMetadata.setGroup(group);
serviceMetadata.setDefaultGroup(group);
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
doExport();
}
exported();
}
org.apache.dubbo.config.ServiceConfig#checkAndUpdateSubConfigs
private void checkAndUpdateSubConfigs() {
completeCompoundConfigs();
checkDefault();
checkProtocol();
List<ConfigInitializer> configInitializers = ExtensionLoader.getExtensionLoader(ConfigInitializer.class)
.getActivateExtension(URL.valueOf("configInitializer://"), (String[]) null);
configInitializers.forEach(e -> e.initServiceConfig(this));
if (!isOnlyInJvm()) {
checkRegistry();
}
this.refresh();
if (StringUtils.isEmpty(interfaceName)) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, getMethods());
checkRef();
generic = Boolean.FALSE.toString();
}
if (local != null) {
if ("true".equals(local)) {
local = interfaceName + "Local";
}
Class<?> localClass;
try {
localClass = ClassUtils.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
if (stub != null) {
if ("true".equals(stub)) {
stub = interfaceName + "Stub";
}
Class<?> stubClass;
try {
stubClass = ClassUtils.forNameWithThreadContextClassLoader(stub);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
}
}
checkStubAndLocal(interfaceClass);
ConfigValidationUtils.checkMock(interfaceClass, this);
ConfigValidationUtils.validateServiceConfig(this);
postProcessConfig();
总体就是校验配置的正确性以及更新配置
serviceMetadata.setVersion(version);
serviceMetadata.setGroup(group);
serviceMetadata.setDefaultGroup(group);
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
version和group是在暴露服务的注解上指定的
服务分组与多版本简介
在前面章节《Dubbo 动态配置中心》中,有的小伙伴可能会疑惑到我们暴露的服务使用了版本号0.0.1,那这个版本号代表什么意思呢?其实在 Dubbo 中默认是没有版本号,我们在《Dubbo 服务注册与发现》章节中介绍了服务提供者向注册中心注册的元数据中就包含了:接口全限定名、方法名称、版本号、分组等,这里我们的一个服务的唯一标识就是通过这四个方面来限定的。假设同一个服务和同一个方法有不同的版本号或分组那么这个就代表是不同的服务。
如上图,我们定义了一个queryAll方法但是使用了版本和分组分别是:(0.0.1+g1)、(0.0.2+g2)的两种组合,那么在客户端就存在着这两种服务的调用。我们只要记住:Dubbo 服务调用唯一标识一个服务的四要素。
使用场景
前面我们已经了解了多版本和分组简单使用,那么在我们的应用中有什么使用场景呢?对应版本来说我们通常维护多版本的意义其实就是功能升级、bug修复等。而分组的概念主要用于同一个接口用于不同的逻辑形成以组进行业务逻辑的划分。
-
当我们提供一个Dubbo服务给上游系统使用,过了一段时间我们需要对次接口新增业务逻辑,那么我们在测试完新增业务功能后上线时我们需要对新增处理逻辑的接口维护一个新版本。这样在新版本上线后逐渐让老版本用户切换新版本,等切换完成关闭老版本服务实现业务上线版本切换。 -
我们对接第三方支付系统中常常接收异步通知,那么网关系统在接受异步通知后会分发消息到各个系统。我们的做法是网关提供一个接口让各个需要通知的的系统自行实现接口并且使用不同的分组来标识,这样网关系统就可以进行对不同分组进行通知处理。
进入org.apache.dubbo.config.ServiceConfig#doExportUrls
private void doExportUrls() {
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
repository.registerService(pathKey, interfaceClass);
serviceMetadata.setServiceKey(pathKey);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
public void registerProvider(String serviceKey,
Object serviceInstance,
ServiceDescriptor serviceModel,
ServiceConfigBase<?> serviceConfig,
ServiceMetadata serviceMetadata) {
ProviderModel providerModel = new ProviderModel(serviceKey, serviceInstance, serviceModel, serviceConfig,
serviceMetadata);
providers.putIfAbsent(serviceKey, providerModel);
providersWithoutGroup.putIfAbsent(keyWithoutGroup(serviceKey), providerModel);
}
遍历protocolsConfig,暴露到注册中心 doExportUrlsFor1Protocol(protocolConfig, registryURLs);
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
ServiceConfig.appendRuntimeParameters(map);
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
AbstractConfig.appendParameters(map, provider);
AbstractConfig.appendParameters(map, protocolConfig);
AbstractConfig.appendParameters(map, this);
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}
if (CollectionUtils.isNotEmpty(getMethods())) {
for (MethodConfig method : getMethods()) {
AbstractConfig.appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
for (ArgumentConfig argument : arguments) {
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
if (methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
}
}
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
if(ConfigUtils.isEmpty(token) && provider != null) {
token = provider.getToken();
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
serviceMetadata.getAttachments().putAll(map);
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(SCOPE_KEY);
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
if (metadataService != null) {
metadataService.publishServiceDefinition(url);
}
}
}
this.urls.add(url);
}
首先根据生成provide的url参数 参数含义可以参照下面 URL参数记录,URL中参数的查询优先级为: name desc mock 配置此method在客户端调用时是直接调用,还是强制使用mock,还是调用失败后使用mock retries 调用Provider失败后的重试其他Provider的次数 forks 并发调用时Provider的个数 timeout 调用的Provider的超时时间,毫秒 merger MergeableClusterInvoker调用时,指定的merge方式或者merge方法 reference.filter 用于Consumer端控制、指定过滤器名称 service.filter 用于Provider端控制、指定过滤器名称 accesslog 用于控制AccessLogFilter是否加载进FilterChain中 actives 用于控制ActiveLimitFilter是否加载进FilterChain中 cache 用于控制CacheFilter是否加载进FilterChain中 deprecated 用于控制DeprecatedFilter是否加载进FilterChain中 executes 用于控制ExecuteLimitFilter是否加载进FilterChain中,以及指定信号量初始值单个Provider最大可并发数 generic 用于控制GenericImplFilter是否加载进FilterChain中 token 用于控制TokenFilter是否加载进FilterChain中 tps 用于控制TpsLimitFilter是否加载进FilterChain中 validation 用于控制ValidationFilter是否加载进FilterChain中 invoker.listener 用于控制invoker过程中的监听器名称 exporter.listener 用于控制export过程中的监听器名称 async Consumer端控制本地调用是否是异步调用 return Consumer端控制本地调用是否不需要调用返回值 connections 用于控制Consumer在调用多个Provider时,自身的connneticon是否共享或最大个数 codec 指定编码器 heartbeat 指定是否发送心跳,以及心跳的发送间隔 exchanger 指定exchanger扩展点的实现类 heartbeat.timeout 心跳超时时间 sent Consumer端发送时,Consumer的等待超时时间 codec 指定编码器 dispather 指定分发器 channel.handler 控制、指定handler处理器 threadname Provider端指定的线程池扩展点中线程名称 threads Provider端指定的线程池扩展点中核心线程数和最大线程数的数量 queues Provider端指定的线程池扩展点中队列的大小,0为使用SynchronousQueue,小于0为无限 dump.directory Provider端线程池线程越界时,jstack命令dump线程文件快照存储的路径 corethreads Provider端指定的线程池扩展点中cache/limited线程池核心线程数 alive Provider端指定的线程池扩展点中cache线程池线程空闲时间,毫秒 connect.queue.capacity Provider端dispatcher为connection时,指定线程池队列容量 connect.queue.warning.size Provider端dispatcher为connection时,指定线程池队列容量告警阈值
还会把接口方法放进参数
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
dubbo.provider.host可以配置生产者暴露地址,为什么暴露方还能配置i地址。因为一台电脑可以有多个ip地址 一个网卡在同一时间内只有一个IP地址,一台电脑可以同时有多个IP地址,每增加一个网络设备即可多增加一个IP地址。 如常见的笔记本电脑配置有有线网卡和无线网卡,则分别有线网卡有一个IP地址,无线网卡有一个IP地址。 IP地址是指互联网协议地址,又译为网际协议地址,IP地址是IP协议提供的一种统一的地址格式。 最后,经过组装,暴露的url大概是这个样子
dubbo://172.31.96.1:20880/com.xishan.store.usercenter.userapi.facade.UserReadFacade?anyhost=true&application=user-server&bind.ip=172.31.96.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=dubbo&interface=com.xishan.store.usercenter.userapi.facade.UserReadFacade&metadata-type=remote&methods=findById,findByAccount,register&pid=25324&qos.enable=false&release=2.7.6&side=provider&timeout=4000×tamp=1650857544628
本地暴露org.apache.dubbo.config.ServiceConfig#exportLocal
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
改变protocol和ip
injvm://127.0.0.1/com.xishan.store.usercenter.userapi.facade.UserReadFacade?anyhost=true&application=user-server&bind.ip=172.31.96.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=dubbo&interface=com.xishan.store.usercenter.userapi.facade.UserReadFacade&metadata-type=remote&methods=findById,findByAccount,register&pid=25324&qos.enable=false&release=2.7.6&side=provider&timeout=4000×tamp=1650857544628
先看这个PROTOCOL和PROXY_FACTORY
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
在ServiceConfig类加载时,会记载静态属性,执行这两个方法 看下生成ProxyFactory$Adaptive的代码内容
package com.xishan.store.usercenter.userweb;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0, arg1);
}
public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
getInvoker是根据url.getParameter(“proxy”, “javassist”); 如果为空,默认使用javassist 对应JavassistProxyFactory 继续看org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
这里可以看出来,服务提供者的实例被使用Wrapper进行了包装,看下wrapper生成的代码
package com.xx.xx.samples.loader;
import com.alibaba.dubbo.common.bytecode.NoSuchPropertyException;
import com.xx.xx.samples.loader.ClassGenerator2.DC;
import com.xx.xx.samples.loader.service.impl.UserServiceImpl;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
public class Wrapper20 extends Wrapper2 implements DC {
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts0;
public static Class[] mts1;
public static Class[] mts2;
public String[] getPropertyNames() {
return pns;
}
public boolean hasProperty(String var1) {
return pts.containsKey(var1);
}
public Class getPropertyType(String var1) {
return (Class)pts.get(var1);
}
public String[] getMethodNames() {
return mns;
}
public String[] getDeclaredMethodNames() {
return dmns;
}
public void setPropertyValue(Object var1, String var2, Object var3) {
try {
UserServiceImpl var4 = (UserServiceImpl)var1;
} catch (Throwable var6) {
throw new IllegalArgumentException(var6);
}
throw new NoSuchPropertyException("Not found property \"" + var2 + "\" filed or setter method in class com.xx.xx.samples.loader.service.impl.UserServiceImpl.");
}
public Object getPropertyValue(Object var1, String var2) {
try {
UserServiceImpl var3 = (UserServiceImpl)var1;
} catch (Throwable var5) {
throw new IllegalArgumentException(var5);
}
throw new NoSuchPropertyException("Not found property \"" + var2 + "\" filed or setter method in class com.xx.xx.samples.loader.service.impl.UserServiceImpl.");
}
public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {
UserServiceImpl var5;
try {
var5 = (UserServiceImpl)var1;
} catch (Throwable var8) {
throw new IllegalArgumentException(var8);
}
try {
if ("findUserList".equals(var2) && var3.length == 0) {
return var5.findUserList();
}
if ("findById".equals(var2) && var3.length == 1) {
return var5.findById((Integer)var4[0]);
}
if ("updateById".equals(var2) && var3.length == 1) {
var5.updateById((Integer)var4[0]);
return null;
}
} catch (Throwable var9) {
throw new InvocationTargetException(var9);
}
throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class com.xx.xx.samples.loader.service.impl.UserServiceImpl.");
}
public Wrapper20() {
}
}
为什么要生成wrapper这个东西,通过启动过程 生成Wrapper20类,这样在invokeMethod时,不需要通过反射调用,提升运行时调用效率 回到协议暴露
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
spi又登场了 进入 org.apache.dubbo.common.extension.ExtensionLoader#createExtension
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
initExtension(instance);
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
如果有cachedWrapperClasses,遍历cachedWrapperClasses,分别用构造函数,包裹根据url protocol spi生成的instance
Wrapper
Wrapper机制,即扩展点自动包装。Wrapper 类一样实现了扩展点接口,可是 Wrapper 不是扩展点的真正实现。它的用途主要是用于从 ExtensionLoader 返回扩展点时,包装在真正的扩展点实现外。即从 ExtensionLoader 中返回的其实是 Wrapper 类的实例,Wrapper 持有了实际的扩展点实现类。 扩展点的 Wrapper 类能够有多个,也能够根据须要新增。 经过 Wrapper 类能够把全部扩展点公共逻辑移至 Wrapper 中。新加的 Wrapper 在全部的扩展点上添加了逻辑,有些相似 AOP,即 Wrapper 代理了扩展点。web
Wrapper的规范
Wrapper 机制不是经过注解实现的,而是经过一套 Wrapper 规范实现的。 Wrapper 类在定义时须要遵循以下规范。apache
- 该类要实现 SPI 接口
- 该类中要有 SPI 接口的引用
- 该类中必须含有一个含参的构造方法且参数只能有一个类型为SPI借口
- 在接口实现方法中要调用 SPI 接口引用对象的相应方法
- 该类名称以 Wrapper 结尾
可以看出来Wrapper在配置在spi中,只不过其是特殊的子类
这样执行export首先进入org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
protocol属性是他扩展的目标,说白了这就是一个装饰器模式,不断装饰,不断扩展功能。 org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#buildInvokerChain
ProtocolFilterWrapper
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
try {
Filter.Listener listener = listenableFilter.listener(invocation);
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {
}
return asyncResult.whenCompleteWithContext((r, t) -> {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
});
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
filter
获取所有的activity的filter对象
@SPI
public interface Filter {
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
interface Listener {
void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
}
可见,filter接口的功能为 invoke 方法,内部,返回入参的invoker.invoke(invocation);
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
invocation.put(TIMEOUT_FILTER_START_TIME, System.currentTimeMillis());
return invoker.invoke(invocation);
}
监听回调,结果,和错误 回头看创建invoker的方式 创建链式拦截器filter Invoker
asyncResult = filter.invoke(next, invocation);
内部使用执行filter逻辑,并在最后invoker.invoke(invocation); 这样链式调用,最终调用真正的invoker返回结果。 生产者使用这几个filter。
比较常见的有超时,和异常处理filter
QosProtocolWrapper
QoS(Quality of Service,服务质量)指一个网络能够利用各种基础技术,为指定的网络通信提供更好的服务能力,是网络的一种安全机制, 是用来解决网络延迟和阻塞等问题的一种技术。dubbo为用户提供类似的网络服务用来online和offline service来解决网络延迟,阻塞等问题。 QosProtocolWrapper 便完成了 Dubbo QOS 功能的处理。 QosProtocolWrapper部分代码实现如下:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
return protocol.export(invoker);
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
startQosServer(url);
return protocol.refer(type, url);
}
return protocol.refer(type, url);
}
@Override
public void destroy() {
protocol.destroy();
stopServer();
}
这里可以看到,当第一个服务向服务中心进行注册时或者第一次引用时,Dubbo 会启动Qos 服务,可以通过 qos.enable参数控制是否开启。
Qos 基础使用 Dubbo的QoS是默认开启的,端口为22222,可以通过配置修改端口 telnet localhost 22222
ls : 列出所有服务列表
online 服务名 : 上线某个服务
offline 服务名:下线某个服务
quit :退出服务
ProtocolListenerWrapper
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}
对于非_Registry protool_ _使用_ListenerExporterWrapper包装 通过spi寻找ExporterListener,可以自定义,dubbo并没有自带此类型实现的spi
public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
if (exporter == null) {
throw new IllegalArgumentException("exporter == null");
}
this.exporter = exporter;
this.listeners = listeners;
if (CollectionUtils.isNotEmpty(listeners)) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
listener.exported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
如果实现,会执行对应listener接口的exported方法 最终进入
InjvmProtocol
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
返回InjvmExporter 最终
exporters.add(exporter);
把exporter存到exporters中,exportLocal结束
注册中心暴露
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
}
如果注册中心url不为空,暴露到注册中心
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
创建invoker的方式跟班底暴露的 创建invoker的方式相同,不再重复说明 此时的url是registry protocol 因此使用的是RegistryProtocol 其他过程和本地暴露相同。也是经过wrapper的扩展,
注意,注册中心暴露过程不会增加filter,监听器,启动qos服务等
org.apache.dubbo.registry.integration.RegistryProtocol#export
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
URL providerUrl = getProviderUrl(originInvoker);
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
}
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
notifyExport(exporter);
return new DestroyableExporter<>(exporter);
}
生成注册中心url(nacos/zookeeper) 生成生产者dubbo协议 url doLocalExport本地暴露dubbo接口
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
这是生成dubbo协议的url进行暴露 和和本地暴露相同。也是经过wrapper的扩展,暴露过程中会增加filter,监听器,启动qos服务等 最终来到DubboProtocol的export方法
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer
private void openServer(URL url) {
String key = url.getAddress();
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
server.reset(url);
}
}
}
根据url获取到IP地址和端口号判断是否有这个server,如果没有就创建一个 创建server的过程就是开启netty服务的过程
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
最终进入org.apache.dubbo.remoting.transport.AbstractServer
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
executor = executorRepository.createExecutorIfAbsent(url);
}
开启绑定监听端口
如果server已经创建过了 执行
server.reset(url);
创建的nettyServer会重新设置url org.apache.dubbo.remoting.transport.AbstractServer#reset
public void reset(URL url) {
if (url == null) {
return;
}
try {
if (url.hasParameter(ACCEPTS_KEY)) {
int a = url.getParameter(ACCEPTS_KEY, 0);
if (a > 0) {
this.accepts = a;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (url.hasParameter(IDLE_TIMEOUT_KEY)) {
int t = url.getParameter(IDLE_TIMEOUT_KEY, 0);
if (t > 0) {
this.idleTimeout = t;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
executorRepository.updateThreadpool(url, executor);
super.setUrl(getUrl().addParameters(url.getParameters()));
}
再原有url的基础上扩展新的url 这样就完成了DubboProtocol的暴露 回到RegistryProtocol的暴露 将服务暴露到注册中心
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
}
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
notifyExport(exporter);
return new DestroyableExporter<>(exporter);
export暴露到注册中兴
org.apache.dubbo.registry.integration.RegistryProtocol#notifyExport
private <T> void notifyExport(ExporterChangeableWrapper<T> exporter) {
List<RegistryProtocolListener> listeners = ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
.getActivateExtension(exporter.getOriginInvoker().getUrl(), "registry.protocol.listener");
if (CollectionUtils.isNotEmpty(listeners)) {
for (RegistryProtocolListener listener : listeners) {
listener.onExport(this, exporter);
}
}
}
没有默认实现得RegistryProtocolListener,所以这里什么也没做。我们也可以利用dubbo spi去自定义逻辑
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
最终暴露完成添加到exporters中
WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
if (metadataService != null) {
metadataService.publishServiceDefinition(url);
}
服务暴露完成后,将serviceDefination暴露到元数据中心
最后添加暴露url
this.urls.add(url);
dubbo服务暴露结束
暴露完成后进入exported
public void exported() {
dispatch(new ServiceConfigExportedEvent(this));
}
ServiceNameMappingListener
会在注册中心创建映射
ServiceNameMapping
其作用为
**
* Map the specified Dubbo service interface, group, version and protocol to current Dubbo service name
*
为当前dubboService name做一个映射
请求执行流程
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
处理来自消费者的请求
public void received(Channel channel, Object message) throws RemotingException {
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
if (message instanceof Request) {
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
}
根据不同的消息类型进入对应的处理逻辑
- 如果是request请求,交给handler处理
- 如果是response ,通过future处理
- 如果是telnet协议,实际上直接请求的就是字符串,使用telnet处理器处理,请求是字符串,返回的也是字符串,请求由dubbo服务器处理,返回由telnet客户端处理
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
Object msg = req.getData();
try {
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
这里看起来像是异步处理 进入org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter#reply看看
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
重点
Invoker<?> invoker = getInvoker(channel, inv);
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv));
}
return exporter.getInvoker();
从exporterMap中获取对应的invoker
invoker.invoke()
尽到了前面讲到的dubbo Wrapper机制的ProtocolFilterWrapper
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
try {
Filter.Listener listener = listenableFilter.listener(invocation);
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {
}
return asyncResult.whenCompleteWithContext((r, t) -> {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
});
}
EchoFilter
EchoFilter是消费者用来判断生产者服务是否可用的一个接口
@Activate(group = CommonConstants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
}
return invoker.invoke(inv);
}
}
invker变成了nextFilter封装的invoker,继续执行下一个invoker,filter
ClassLoaderFilter
@Activate(group = CommonConstants.PROVIDER, order = -30000)
public class ClassLoaderFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
ClassLoader ocl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
try {
return invoker.invoke(invocation);
} finally {
Thread.currentThread().setContextClassLoader(ocl);
}
}
}
为当前线程设置classLoader
GenericFilter
GenericFilter的Invoke方法在methodName为generic,且参数长度为3时,通过struct2MapAll方法将oldParams转换为newParams,发起newInvocation
ContextFilter
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Map<String, Object> attachments = invocation.getObjectAttachments();
if (attachments != null) {
Map<String, Object> newAttach = new HashMap<>(attachments.size());
for (Map.Entry<String, Object> entry : attachments.entrySet()) {
String key = entry.getKey();
if (!UNLOADING_KEYS.contains(key)) {
newAttach.put(key, entry.getValue());
}
}
attachments = newAttach;
}
RpcContext context = RpcContext.getContext();
context.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
String remoteApplication = (String) invocation.getAttachment(REMOTE_APPLICATION_KEY);
if (StringUtils.isNotEmpty(remoteApplication)) {
context.setRemoteApplicationName(remoteApplication);
} else {
context.setRemoteApplicationName((String) context.getAttachment(REMOTE_APPLICATION_KEY));
}
if (attachments != null) {
if (context.getObjectAttachments() != null) {
context.getObjectAttachments().putAll(attachments);
} else {
context.setObjectAttachments(attachments);
}
}
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
context.clearAfterEachInvoke(false);
return invoker.invoke(invocation);
} finally {
context.clearAfterEachInvoke(true);
RpcContext.removeContext();
RpcContext.removeServerContext();
}
}
根据请求,组装RpcContext。后续可以直接取数据,实现服务间的数据透传
TraceFilter
1. dubbo telnet trace 演示 如果对dubbo telnet 服务治理命令不熟悉的同学可以去dubbo官网文档学习下:官网文档链接 我们这里演示 trace的使用,我们可以使用trace能干什么呢,主要是监听 某个接口的任意方法或者某个方法 n次,然后返回对应的执行时间。 借用下dubbo官网对于trace的解释:
我们这里演示下,我这里现在有个服务提供者跟服务调用者,然后我这telnet 一下 服务调用者,看下某个接口的某个方法的执行时间。 注意: telnet 的ip就是服务提供者的机器ip ,端口是dubbo 端口,就是你配置的那个端口,我这里是18108
我们看到,监听三次后就不再监听了,每次打印会将你的接口名+方法名+参数+结果值+ 执行时间打印出来。其实我们主角TraceFilter 就与这个功能实现有关,下面我们看下TraceFilter。
2. TraceFilter解析 我们在解析TraceFilter 源码之前先讲解一下trace 大体的实现流程,我们telnet 命令会被dubbo handler 单读处理,然后将trace命令解析出来,交给TraceTelnetHandler ,然后它经过一顿操作,找到接口暴露的invoker,然后再向TraceFilter中添加这个tracer,其实就是将要监听接口名,方法名,监听次数,还有channel 给TraceFilter,接下来就是调用的时候收集 执行时间,将执行时间 找到对应的channel 然后发送给telnet客户端。 接下来我们就看下这个TraceFilter ,首先看下 添加tracer与移除tracer方法
private static final ConcurrentMap<String, Set<Channel>> tracers = new ConcurrentHashMap<String, Set<Channel>>();
public static void addTracer(Class<?> type, String method, Channel channel, int max) {
channel.setAttribute(TRACE_MAX, max);
channel.setAttribute(TRACE_COUNT, new AtomicInteger());
String key = method != null && method.length() > 0 ? type.getName() + "." + method : type.getName();
Set<Channel> channels = tracers.get(key);
if (channels == null) {
tracers.putIfAbsent(key, new ConcurrentHashSet<Channel>());
channels = tracers.get(key);
}
channels.add(channel);
}
上面这段代码就是添加tracer,先解释下参数Class<?> type 这个就是你监听接口class,method: 监听的方法,channel:你发起telnet那个通道,max: 这个就是监听的最大次数。 首先是往channel 塞了 trace.max 与trace.count 两个属性值,这个 trace.max 就是最大监听次数,trace.count 就是计数器,记录调用了几次了。接着就是拼装key,如果method没有的话就使用type的全类名, 如果有method话就是 全类名.方法名 ,先从tracers这个map里面查找有没有对应的channels,没有找到就创建set集合,最后添加到这个集合中。
public static void removeTracer(Class<?> type, String method, Channel channel) {
channel.removeAttribute(TRACE_MAX);
channel.removeAttribute(TRACE_COUNT);
String key = method != null && method.length() > 0 ? type.getName() + "." + method : type.getName();
Set<Channel> channels = tracers.get(key);
if (channels != null) {
channels.remove(channel);
}
}
这个是移除tracer的方法,先移除channel中的这两个属性,拼装key,根据key从tracers中获取channels,然后移除对应的channel。 接下来我们就要看下invoke方法了
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
long start = System.currentTimeMillis();
Result result = invoker.invoke(invocation);
long end = System.currentTimeMillis();
if (tracers.size() > 0) {
String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
Set<Channel> channels = tracers.get(key);
if (channels == null || channels.isEmpty()) {
key = invoker.getInterface().getName();
channels = tracers.get(key);
}
if (channels != null && !channels.isEmpty()) {
for (Channel channel : new ArrayList<Channel>(channels)) {
if (channel.isConnected()) {
try {
int max = 1;
Integer m = (Integer) channel.getAttribute(TRACE_MAX);
if (m != null) {
max = (int) m;
}
int count = 0;
AtomicInteger c = (AtomicInteger) channel.getAttribute(TRACE_COUNT);
if (c == null) {
c = new AtomicInteger();
channel.setAttribute(TRACE_COUNT, c);
}
count = c.getAndIncrement();
if (count < max) {
String prompt = channel.getUrl().getParameter(Constants.PROMPT_KEY, Constants.DEFAULT_PROMPT);
channel.send("\r\n" + RpcContext.getContext().getRemoteAddress() + " -> "
+ invoker.getInterface().getName()
+ "." + invocation.getMethodName()
+ "(" + JSON.toJSONString(invocation.getArguments()) + ")" + " -> " + JSON.toJSONString(result.getValue())
+ "\r\nelapsed: " + (end - start) + " ms."
+ "\r\n\r\n" + prompt);
}
if (count >= max - 1) {
channels.remove(channel);
}
} catch (Throwable e) {
channels.remove(channel);
logger.warn(e.getMessage(), e);
}
} else {
channels.remove(channel);
}
}
}
}
return result;
}
我们可以看到在执行前后都记录了时间戳,然后判断如果有tracer的话,就要拼装key,先拼装全类型.方法名形式的,进行get,如果结果是空的话,就使用接口名获取channels。如果channels不是空,就遍历,如果channel是连接状态的话,先判断trace.max与trace.count。然后对trace.count这个计数器自增1,表示调用次数+1,如果调用次数小于 最大调用次数(就是咱们设置的那个times)的话,就向channel中发送 “接口+方法名+参数+结果值+ 执行时间”给我们。如果count大于等于max-1的话 就移除channel。 最后就是将执行结果result返回了。 好了,以上就是TraceFilter的全部的,还是很简单的。
TimeoutFilter
@Activate(group = CommonConstants.PROVIDER)
public class TimeoutFilter implements Filter, Filter.Listener {
private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);
private static final String TIMEOUT_FILTER_START_TIME = "timeout_filter_start_time";
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
invocation.put(TIMEOUT_FILTER_START_TIME, System.currentTimeMillis());
return invoker.invoke(invocation);
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
Object startTime = invocation.get(TIMEOUT_FILTER_START_TIME);
if (startTime != null) {
long elapsed = System.currentTimeMillis() - (Long) startTime;
if (invoker.getUrl() != null && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", Integer.MAX_VALUE)) {
if (logger.isWarnEnabled()) {
logger.warn("invoke time out. method: " + invocation.getMethodName() + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
}
}
}
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
}
}
可以看出来,生产者对曹=超时时间没有严格的控制了,根据生产者配置的超时时间给了个警告
MonitorFilter
dubbo将监控独立出一个监控层,但是我认为监控只能算作是dubbo的一个功能,可以监控服务的健康程度或者做服务治理,监控功能不是dubbo必须的,可以不开启监控。而且在运行过程中,监控宕机,也不会影响服务访问。 dubbo提供了一个简单的监控中心,可以参考下面的文章。但这个监控中心只能满足简单监控功能,对于复杂的需要自行开发。本文对监控中心不做过多介绍。 一、总述 dubbo对服务端和客户端都进行了监控,监控内容主要是包含以下四大类信息:
服务调用或者服务请求的基本信息,包括服务地址,端口,服务名,方法名等; 服务调用消耗时间; 服务请求数据大小或者服务返回值大小; 服务请求的结果,成功或者失败; 服务方法的总调用次数。 监控功能主要由MonitorFilter实现,该类实现了Filter接口。在dubbo架构中,MonitorFilter可以拦截所有客户端发出请求或者服务端处理请求。 我的理解:从功能上来说,MonitorFilter不能算是过滤器,因为它不过滤任何东西,相反它是要做拦截,因此我觉得MonitorFilter更合适叫做拦截器。
二、MonitorFilter应用原理 MonitorFilter的定义如下:
public class MonitorFilter implements Filter, Filter.Listener {
MonitorFilter方法介绍 前面描述了MonitorFilter是如何被创建,如何对服务端和客户端进行拦截。下面我们深入到MonitorFilter内部,对MonitorFilter的每个方法进行分析。
1、invoke方法 当客户端访问远程服务或者服务端收到请求时,都会调用MonitorFilter的invoke方法,该方法比较简单,只是收集两个信息:服务调用开始时间和服务方法调用次数。
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());
getConcurrent(invoker, invocation).incrementAndGet();
}
return invoker.invoke(invocation);
}
如果使用监控功能,必须开启相应的dubbo-admin,,否则
if (invoker.getUrl().hasParameter(MONITOR_KEY))
条件不会满足
代码getConcurrent如下:
private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) {
String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
AtomicInteger concurrent = concurrents.get(key);
if (concurrent == null) {
concurrents.putIfAbsent(key, new AtomicInteger());
concurrent = concurrents.get(key);
}
return concurrent;
}
获取之后自增一个 2、onMessage和onError MonitorFilter实现了Filter.Listener接口,onMessage和onError便是该接口要求实现的两个方法。这两个方法用于监听服务返回值,代码可以参见上文ProtocolFilterWrapper的buildInvokerChain方法。 onMessage和onError两个方法比较类似,唯一的区别是,服务访问成功调用onMessage,失败则调用onError。
@Override
public void onMessage(Result result, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), (long) invocation.get(MONITOR_FILTER_START_TIME), false);
getConcurrent(invoker, invocation).decrementAndGet();
}
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), (long) invocation.get(MONITOR_FILTER_START_TIME), true);
getConcurrent(invoker, invocation).decrementAndGet();
}
}
collect collect方法的最后一个参数error用于区分是onMessage调用还是onError调用。
private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
try {
URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);
Monitor monitor = monitorFactory.getMonitor(monitorUrl);
if (monitor == null) {
return;
}
URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
monitor.collect(statisticsURL);
} catch (Throwable t) {
logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
}
}
private URL createStatisticsUrl(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
long elapsed = System.currentTimeMillis() - start;
int concurrent = getConcurrent(invoker, invocation).get();
String application = invoker.getUrl().getParameter(APPLICATION_KEY);
String service = invoker.getInterface().getName();
String method = RpcUtils.getMethodName(invocation);
String group = invoker.getUrl().getParameter(GROUP_KEY);
String version = invoker.getUrl().getParameter(VERSION_KEY);
int localPort;
String remoteKey, remoteValue;
if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {
localPort = 0;
remoteKey = MonitorService.PROVIDER;
remoteValue = invoker.getUrl().getAddress();
} else {
localPort = invoker.getUrl().getPort();
remoteKey = MonitorService.CONSUMER;
remoteValue = remoteHost;
}
String input = "", output = "";
if (invocation.getAttachment(INPUT_KEY) != null) {
input = invocation.getAttachment(INPUT_KEY);
}
if (result != null && result.getAttachment(OUTPUT_KEY) != null) {
output = result.getAttachment(OUTPUT_KEY);
}
return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);
}
在collect方法中创建了Monitor对象,因此Monitor对象的创建属于懒加载,只有在使用的时候才会创建。
四、总结 本文详细介绍了MonitorFilter的实现原理及方法实现。由于MonitorFilter作为过滤器,保证了所有的服务调用都会经过该过滤器。 MonitorFilter功能的实现还依赖于Monitor对象,下一篇文章介绍Monitor的实现原理。
ExceptionFilter
@Activate(group = CommonConstants.PROVIDER)
public class ExceptionFilter implements Filter, Filter.Listener {
private Logger logger = LoggerFactory.getLogger(ExceptionFilter.class);
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
if (appResponse.hasException() && GenericService.class != invoker.getInterface()) {
try {
Throwable exception = appResponse.getException();
if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
return;
}
try {
Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
Class<?>[] exceptionClassses = method.getExceptionTypes();
for (Class<?> exceptionClass : exceptionClassses) {
if (exception.getClass().equals(exceptionClass)) {
return;
}
}
} catch (NoSuchMethodException e) {
return;
}
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
return;
}
String className = exception.getClass().getName();
if (className.startsWith("java.") || className.startsWith("javax.")) {
return;
}
if (exception instanceof RpcException) {
return;
}
appResponse.setException(new RuntimeException(StringUtils.toString(exception)));
} catch (Throwable e) {
logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
}
}
}
@Override
public void onError(Throwable e, Invoker<?> invoker, Invocation invocation) {
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
}
public void setLogger(Logger logger) {
this.logger = logger;
}
}
- 如果不是运行时异常,直接抛出
- 如果,方法了异常,直接抛出
- 如果,异常时方法没有声明的,打印错误日志
- 如果是jdk或者,dubbo自带异常,直接抛出
- 如果异常和方法同属一个jar包,直接抛出
- 其他情况,保证成RuntimeException抛出
appResponse.setException(new RuntimeException(StringUtils.toString(exception)));
抛出的异常最终由org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest 处理
try {
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
执行真正的invoker
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
执行doInvoke实际执行
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {
UserServiceImpl var5;
try {
var5 = (UserServiceImpl)var1;
} catch (Throwable var8) {
throw new IllegalArgumentException(var8);
}
try {
if ("findUserList".equals(var2) && var3.length == 0) {
return var5.findUserList();
}
if ("findById".equals(var2) && var3.length == 1) {
return var5.findById((Integer)var4[0]);
}
if ("updateById".equals(var2) && var3.length == 1) {
var5.updateById((Integer)var4[0]);
return null;
}
} catch (Throwable var9) {
throw new InvocationTargetException(var9);
}
throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class com.xx.xx.samples.loader.service.impl.UserServiceImpl.");
}
public Wrapper20() {
}
}
由动态生成的类执行,并返回结果 再把结果封装成AsyncRpcResult,发送给消费者
至此请求执行流程结束
|