相关阅读
简介
本文基于Spring Boot 2.6.6 ,dubbo-spring-boot-starter 3.0.6 环境。
由上文Dubbo学习之DubboReference可知,标注了注解DubboReference 的Bean最终会被注册为ReferenceBean ;本文主要分析ReferenceBean 的实例化过程,以及Dubbo Reference的创建过程;
Demo
核心依赖:
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>3.0.6</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-zookeeper</artifactId>
<version>3.0.6</version>
</dependency>
消费者核心配置参数:
server:
port: 8080
dubbo:
application:
id: dubbo-consumer
name: dubbo-consumer
registry:
address: zookeeper://127.0.0.1:2181
protocol:
port: 20880
消费者 Reference 配置代码: 方式一:
@DubboReference
private DemoService demoService;
方式二(推荐):
@Configuration
public class ReferenceConfig {
@Bean
@DubboReference
public ReferenceBean<DemoService> demoService() {
return new ReferenceBean<>();
}
}
ReferenceBean实例化
初始化
ReferenceBean 实现了InitializingBean 接口,故在其被创建后进行初始化时,会调用其afterPropertiesSet 方法,代码如下:
public void afterPropertiesSet() throws Exception {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
Assert.notEmptyString(getId(), "The id of ReferenceBean cannot be empty");
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(getId());
this.interfaceClass = (Class<?>) beanDefinition.getAttribute(ReferenceAttributes.INTERFACE_CLASS);
this.interfaceName = (String) beanDefinition.getAttribute(ReferenceAttributes.INTERFACE_NAME);
Assert.notNull(this.interfaceClass, "The interface class of ReferenceBean is not initialized");
if (beanDefinition.hasAttribute(Constants.REFERENCE_PROPS)) {
referenceProps = (Map<String, Object>) beanDefinition.getAttribute(Constants.REFERENCE_PROPS);
} else {
if (beanDefinition instanceof AnnotatedBeanDefinition) {
if (referenceProps == null) {
referenceProps = new LinkedHashMap<>();
}
ReferenceBeanSupport.convertReferenceProps(referenceProps, interfaceClass);
if (this.interfaceName == null) {
this.interfaceName = (String) referenceProps.get(ReferenceAttributes.INTERFACE);
}
} else {
propertyValues = beanDefinition.getPropertyValues();
}
}
Assert.notNull(this.interfaceName, "The interface name of ReferenceBean is not initialized");
ReferenceBeanManager referenceBeanManager = beanFactory.getBean(ReferenceBeanManager.BEAN_NAME, ReferenceBeanManager.class);
referenceBeanManager.addReference(this);
}
afterPropertiesSet 方法中除了根据BeanDefinition 给相关字段赋值外,最重要的就是向ReferenceBeanManager 中添加本Reference,代码如下:
public void addReference(ReferenceBean referenceBean) throws Exception {
String referenceBeanName = referenceBean.getId();
Assert.notEmptyString(referenceBeanName, "The id of ReferenceBean cannot be empty");
if (!initialized) {
logger.warn("Early initialize reference bean before DubboConfigBeanInitializer," +
" the BeanPostProcessor has not been loaded at this time, which may cause abnormalities in some components (such as seata): " +
referenceBeanName + " = " + ReferenceBeanSupport.generateReferenceKey(referenceBean, applicationContext));
}
String referenceKey = ReferenceBeanSupport.generateReferenceKey(referenceBean, applicationContext);
ReferenceBean oldReferenceBean = referenceBeanMap.get(referenceBeanName);
if (oldReferenceBean != null) {
if (referenceBean != oldReferenceBean) {
String oldReferenceKey = ReferenceBeanSupport.generateReferenceKey(oldReferenceBean, applicationContext);
throw new IllegalStateException("Found duplicated ReferenceBean with id: " + referenceBeanName +
", old: " + oldReferenceKey + ", new: " + referenceKey);
}
return;
}
referenceBeanMap.put(referenceBeanName, referenceBean);
this.registerReferenceKeyAndBeanName(referenceKey, referenceBeanName);
if (initialized) {
initReferenceBean(referenceBean);
}
}
private synchronized void initReferenceBean(ReferenceBean referenceBean) throws Exception {
if (referenceBean.getReferenceConfig() != null) {
return;
}
String referenceKey = ReferenceBeanSupport.generateReferenceKey(referenceBean, applicationContext);
ReferenceConfig referenceConfig = referenceConfigMap.get(referenceKey);
if (referenceConfig == null) {
Map<String, Object> referenceAttributes = ReferenceBeanSupport.getReferenceAttributes(referenceBean);
referenceConfig = ReferenceCreator.create(referenceAttributes, applicationContext)
.defaultInterfaceClass(referenceBean.getObjectType())
.build();
if (referenceBean.getId() != null && !referenceBean.getId().contains("#")) {
referenceConfig.setId(referenceBean.getId());
}
referenceConfigMap.put(referenceKey, referenceConfig);
moduleModel.getConfigManager().addReference(referenceConfig);
}
referenceBean.setKeyAndReferenceConfig(referenceKey, referenceConfig);
}
核心便是构建ReferenceConfig (用于后续创建Dubbo Reference),该动作由ReferenceCreator.build() 完成,代码如下:
public final ReferenceConfig build() throws Exception {
ReferenceConfig configBean = new ReferenceConfig();
configureBean(configBean);
if (logger.isInfoEnabled()) {
logger.info("The configBean[type:" + configBean.getClass().getSimpleName() + "] has been built.");
}
return configBean;
}
protected void configureBean(ReferenceConfig configBean) throws Exception {
populateBean(configBean);
configureMonitorConfig(configBean);
configureModuleConfig(configBean);
configureConsumerConfig(configBean);
}
获取真正实例
ReferenceBean 实现了FactoryBean 接口,故在获取实例时,会调用其getObject 方法,代码如下:
public T getObject() {
if (lazyProxy == null) {
createLazyProxy();
}
return (T) lazyProxy;
}
private void createLazyProxy() {
ProxyFactory proxyFactory = new ProxyFactory();
proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
proxyFactory.addInterface(interfaceClass);
Class<?>[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
for (Class<?> anInterface : internalInterfaces) {
proxyFactory.addInterface(anInterface);
}
if (!StringUtils.isEquals(interfaceClass.getName(), interfaceName)) {
try {
Class<?> serviceInterface = ClassUtils.forName(interfaceName, beanClassLoader);
proxyFactory.addInterface(serviceInterface);
} catch (ClassNotFoundException e) {
}
}
this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader);
}
代理的targetSource 为DubboReferenceLazyInitTargetSource ,其getTarget 方法代码如下:
public synchronized Object getTarget() throws Exception {
if (this.lazyTarget == null) {
logger.debug("Initializing lazy target object");
this.lazyTarget = createObject();
}
return this.lazyTarget;
}
protected Object createObject() throws Exception {
return getCallProxy();
}
private Object getCallProxy() throws Exception {
if (referenceConfig == null) {
throw new IllegalStateException("ReferenceBean is not ready yet, please make sure to call reference interface method after dubbo is started.");
}
return referenceConfig.get();
}
至此,ReferenceBean 的实例化流程就结束了,其中最重要的就是为ReferenceBean 构建对应的ReferenceConfig ;而ReferenceConfig 就是用于创建Dubbo Reference;
Dubbo Reference创建
当AbstractApplicationContext 完成fresh 后,就会发布事件ContextRefreshedEvent ,Dubbo中DubboDeployApplicationListener 会关注该事件,并做相关处理,代码如下:
public void onApplicationEvent(ApplicationContextEvent event) {
if (nullSafeEquals(applicationContext, event.getSource())) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
ModuleDeployer deployer = moduleModel.getDeployer();
Assert.notNull(deployer, "Module deployer is null");
Future future = deployer.start();
if (!deployer.isBackground()) {
try {
future.get();
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting for dubbo module start: " + e.getMessage());
} catch (Exception e) {
logger.warn("An error occurred while waiting for dubbo module start: " + e.getMessage(), e);
}
}
}
当Dubbo监听到ContextRefreshedEvent ,便启动ModuleModel ,代码如下:
public synchronized Future start() throws IllegalStateException {
if (isStopping() || isStopped() || isFailed()) {
throw new IllegalStateException(getIdentifier() + " is stopping or stopped, can not start again");
}
try {
if (isStarting() || isStarted()) {
return startFuture;
}
onModuleStarting();
applicationDeployer.initialize();
initialize();
exportServices();
if (moduleModel != moduleModel.getApplicationModel().getInternalModule()) {
applicationDeployer.prepareInternalModule();
}
referServices();
if (asyncExportingFutures.isEmpty() && asyncReferringFutures.isEmpty()) {
onModuleStarted();
} else {
executorRepository.getSharedExecutor().submit(() -> {
try {
waitExportFinish();
waitReferFinish();
} catch (Throwable e) {
logger.warn("wait for export/refer services occurred an exception", e);
} finally {
onModuleStarted();
}
});
}
} catch (Throwable e) {
onModuleFailed(getIdentifier() + " start failed: " + e, e);
throw e;
}
return startFuture;
}
private void referServices() {
configManager.getReferences().forEach(rc -> {
try {
ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;
if (!referenceConfig.isRefreshed()) {
referenceConfig.refresh();
}
if (rc.shouldInit()) {
if (referAsync || rc.shouldReferAsync()) {
ExecutorService executor = executorRepository.getServiceReferExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
referenceCache.get(rc);
} catch (Throwable t) {
logger.error(getIdentifier() + " refer async catch error : " + t.getMessage(), t);
}
}, executor);
asyncReferringFutures.add(future);
} else {
referenceCache.get(rc);
}
}
} catch (Throwable t) {
logger.error(getIdentifier() + " refer catch error.");
referenceCache.destroy(rc);
throw t;
}
});
}
referenceCache.get(rc) 动作会调用到ReferenceConfig.get() 方法,从而触发Dubbo Reference的创建,代码如下:
public T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
getScopeModel().getDeployer().start();
synchronized (this) {
if (ref == null) {
init();
}
}
}
return ref;
}
protected synchronized void init() {
if (initialized) {
return;
}
initialized = true;
if (!this.isRefreshed()) {
this.refresh();
}
initServiceMetadata(consumer);
serviceMetadata.setServiceType(getServiceInterfaceClass());
serviceMetadata.setServiceKey(URL.buildKey(interfaceName, group, version));
Map<String, String> referenceParameters = appendConfig();
initServiceAppsMapping(referenceParameters);
ModuleServiceRepository repository = getScopeModel().getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(interfaceClass);
consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor, this,
getScopeModel(), serviceMetadata, createAsyncMethodInfo());
repository.registerConsumer(consumerModel);
serviceMetadata.getAttachments().putAll(referenceParameters);
ref = createProxy(referenceParameters);
serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
consumerModel.setProxyObject(ref);
consumerModel.initMethodModels();
checkInvokerAvailable();
}
private T createProxy(Map<String, String> referenceParameters) {
if (shouldJvmRefer(referenceParameters)) {
createInvokerForLocal(referenceParameters);
} else {
urls.clear();
if (StringUtils.isNotEmpty(url)) {
parseUrl(referenceParameters);
} else {
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
aggregateUrlFromRegistry(referenceParameters);
}
}
createInvokerForRemote();
}
if (logger.isInfoEnabled()) {
logger.info("Referred dubbo service: [" + referenceParameters.get(INTERFACE_KEY) + "]." +
(Boolean.parseBoolean(referenceParameters.get(GENERIC_KEY)) ?
" it's GenericService reference" : " it's not GenericService reference"));
}
URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
referenceParameters.get(INTERFACE_KEY), referenceParameters);
consumerUrl = consumerUrl.setScopeModel(getScopeModel());
consumerUrl = consumerUrl.setServiceModel(consumerModel);
MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
总结
接口调用流程如下:
- 标注了注解
DubboReference 的Bean最终在Spring容器中是以JdkDynamicAopProxy (targetSource为DubboReferenceLazyInitTargetSource )形式存在; JdkDynamicAopProxy.invoke 中,通过targetSource.getTarget() 最终调用到ReferenceConfig.get() ,从而找到真实服务的Dubbo Reference并进行调用;
|