IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Dubbo学习之ReferenceBean -> 正文阅读

[Java知识库]Dubbo学习之ReferenceBean

相关阅读

简介

本文基于Spring Boot 2.6.6dubbo-spring-boot-starter 3.0.6环境。

由上文Dubbo学习之DubboReference可知,标注了注解DubboReference的Bean最终会被注册为ReferenceBean;本文主要分析ReferenceBean的实例化过程,以及Dubbo Reference的创建过程;

Demo

核心依赖:

<dependency>
    <groupId>org.apache.dubbo</groupId>
    <artifactId>dubbo-spring-boot-starter</artifactId>
    <version>3.0.6</version>
</dependency>

<dependency>
    <groupId>org.apache.dubbo</groupId>
    <artifactId>dubbo-registry-zookeeper</artifactId>
    <version>3.0.6</version>
</dependency>

消费者核心配置参数:

server:
  port: 8080
dubbo:
  application:
    id: dubbo-consumer
    name: dubbo-consumer
  registry:
    address: zookeeper://127.0.0.1:2181
  protocol:
    port: 20880

消费者 Reference 配置代码:
方式一:

@DubboReference
private DemoService demoService;

方式二(推荐):

@Configuration
public class ReferenceConfig {

    @Bean
    @DubboReference
    public ReferenceBean<DemoService> demoService() {
        return new ReferenceBean<>();
    }
}

ReferenceBean实例化

初始化

ReferenceBean实现了InitializingBean接口,故在其被创建后进行初始化时,会调用其afterPropertiesSet方法,代码如下:

public void afterPropertiesSet() throws Exception {
    ConfigurableListableBeanFactory beanFactory = getBeanFactory();

    Assert.notEmptyString(getId(), "The id of ReferenceBean cannot be empty");
    BeanDefinition beanDefinition = beanFactory.getBeanDefinition(getId());
    this.interfaceClass = (Class<?>) beanDefinition.getAttribute(ReferenceAttributes.INTERFACE_CLASS);
    this.interfaceName = (String) beanDefinition.getAttribute(ReferenceAttributes.INTERFACE_NAME);
    Assert.notNull(this.interfaceClass, "The interface class of ReferenceBean is not initialized");

    if (beanDefinition.hasAttribute(Constants.REFERENCE_PROPS)) {
        referenceProps = (Map<String, Object>) beanDefinition.getAttribute(Constants.REFERENCE_PROPS);
    } else {
        if (beanDefinition instanceof AnnotatedBeanDefinition) {
            if (referenceProps == null) {
                referenceProps = new LinkedHashMap<>();
            }
            ReferenceBeanSupport.convertReferenceProps(referenceProps, interfaceClass);
            if (this.interfaceName == null) {
                this.interfaceName = (String) referenceProps.get(ReferenceAttributes.INTERFACE);
            }
        } else {
            propertyValues = beanDefinition.getPropertyValues();
        }
    }
    Assert.notNull(this.interfaceName, "The interface name of ReferenceBean is not initialized");

    ReferenceBeanManager referenceBeanManager = beanFactory.getBean(ReferenceBeanManager.BEAN_NAME, ReferenceBeanManager.class);
    // 添加Reference
    referenceBeanManager.addReference(this);
}

afterPropertiesSet方法中除了根据BeanDefinition给相关字段赋值外,最重要的就是向ReferenceBeanManager中添加本Reference,代码如下:

public void addReference(ReferenceBean referenceBean) throws Exception {
    String referenceBeanName = referenceBean.getId();
    Assert.notEmptyString(referenceBeanName, "The id of ReferenceBean cannot be empty");

    // 在DubboConfigBeanInitializer.afterPropertiesSet中完成初始化
    if (!initialized) {
        logger.warn("Early initialize reference bean before DubboConfigBeanInitializer," +
                " the BeanPostProcessor has not been loaded at this time, which may cause abnormalities in some components (such as seata): " +
                referenceBeanName + " = " + ReferenceBeanSupport.generateReferenceKey(referenceBean, applicationContext));
    }

    String referenceKey = ReferenceBeanSupport.generateReferenceKey(referenceBean, applicationContext);
    ReferenceBean oldReferenceBean = referenceBeanMap.get(referenceBeanName);
    // 此时并不存在oldReferenceBean
    if (oldReferenceBean != null) {
        if (referenceBean != oldReferenceBean) {
            String oldReferenceKey = ReferenceBeanSupport.generateReferenceKey(oldReferenceBean, applicationContext);
            throw new IllegalStateException("Found duplicated ReferenceBean with id: " + referenceBeanName +
                    ", old: " + oldReferenceKey + ", new: " + referenceKey);
        }
        return;
    }
    // 缓存本Reference
    referenceBeanMap.put(referenceBeanName, referenceBean);
    this.registerReferenceKeyAndBeanName(referenceKey, referenceBeanName);

    if (initialized) {
        // 初始化本Reference
        initReferenceBean(referenceBean);
    }
}

private synchronized void initReferenceBean(ReferenceBean referenceBean) throws Exception {

    if (referenceBean.getReferenceConfig() != null) {
        // 如果已存在ReferenceConfig,则无需处理
        return;
    }

    String referenceKey = ReferenceBeanSupport.generateReferenceKey(referenceBean, applicationContext);

    ReferenceConfig referenceConfig = referenceConfigMap.get(referenceKey);
    if (referenceConfig == null) {
        // ReferenceConfig还不存在,则创建

        Map<String, Object> referenceAttributes = ReferenceBeanSupport.getReferenceAttributes(referenceBean);
        referenceConfig = ReferenceCreator.create(referenceAttributes, applicationContext)
                .defaultInterfaceClass(referenceBean.getObjectType())
                .build();

        if (referenceBean.getId() != null && !referenceBean.getId().contains("#")) {
            referenceConfig.setId(referenceBean.getId());
        }

        // 缓存ReferenceConfig
        referenceConfigMap.put(referenceKey, referenceConfig);

        // 注册ReferenceConfig
        moduleModel.getConfigManager().addReference(referenceConfig);
    }

    // 关联ReferenceBean、ReferenceConfig
    referenceBean.setKeyAndReferenceConfig(referenceKey, referenceConfig);
}

核心便是构建ReferenceConfig(用于后续创建Dubbo Reference),该动作由ReferenceCreator.build()完成,代码如下:

public final ReferenceConfig build() throws Exception {
    ReferenceConfig configBean = new ReferenceConfig();

    // 配置ReferenceConfig
    configureBean(configBean);

    if (logger.isInfoEnabled()) {
        logger.info("The configBean[type:" + configBean.getClass().getSimpleName() + "] has been built.");
    }

    return configBean;
}

protected void configureBean(ReferenceConfig configBean) throws Exception {
    // 填充属性
    populateBean(configBean);

    // deprecate application reference
    //configureApplicationConfig(configBean);

    // 配置Monitor
    configureMonitorConfig(configBean);
    // 配置Module
    configureModuleConfig(configBean);
    // 配置Consumer
    configureConsumerConfig(configBean);
}

获取真正实例

ReferenceBean实现了FactoryBean接口,故在获取实例时,会调用其getObject方法,代码如下:

public T getObject() {
    if (lazyProxy == null) {
        // 懒创建Reference的代理
        createLazyProxy();
    }
    return (T) lazyProxy;
}

private void createLazyProxy() {
    ProxyFactory proxyFactory = new ProxyFactory();
    proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
    proxyFactory.addInterface(interfaceClass);
    Class<?>[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
    for (Class<?> anInterface : internalInterfaces) {
        proxyFactory.addInterface(anInterface);
    }
    if (!StringUtils.isEquals(interfaceClass.getName(), interfaceName)) {
        try {
            Class<?> serviceInterface = ClassUtils.forName(interfaceName, beanClassLoader);
            proxyFactory.addInterface(serviceInterface);
        } catch (ClassNotFoundException e) {
        }
    }
    // 创建代理
    this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader);
}

代理的targetSourceDubboReferenceLazyInitTargetSource,其getTarget方法代码如下:

public synchronized Object getTarget() throws Exception {
    if (this.lazyTarget == null) {
        // 懒加载
        logger.debug("Initializing lazy target object");
        this.lazyTarget = createObject();
    }
    return this.lazyTarget;
}

protected Object createObject() throws Exception {
    // 获取真实Service引用
    return getCallProxy();
}


// ReferenceBean.java
private Object getCallProxy() throws Exception {
    if (referenceConfig == null) {
        throw new IllegalStateException("ReferenceBean is not ready yet, please make sure to call reference interface method after dubbo is started.");
    }
    // 可见Dubbo Reference由referenceConfig管理
    return referenceConfig.get();
}

至此,ReferenceBean的实例化流程就结束了,其中最重要的就是为ReferenceBean构建对应的ReferenceConfig;而ReferenceConfig就是用于创建Dubbo Reference;

Dubbo Reference创建

AbstractApplicationContext完成fresh后,就会发布事件ContextRefreshedEvent,Dubbo中DubboDeployApplicationListener会关注该事件,并做相关处理,代码如下:

public void onApplicationEvent(ApplicationContextEvent event) {
    if (nullSafeEquals(applicationContext, event.getSource())) {
        if (event instanceof ContextRefreshedEvent) {
            onContextRefreshedEvent((ContextRefreshedEvent) event);
        } else if (event instanceof ContextClosedEvent) {
            onContextClosedEvent((ContextClosedEvent) event);
        }
    }
}

private void onContextRefreshedEvent(ContextRefreshedEvent event) {
    ModuleDeployer deployer = moduleModel.getDeployer();
    Assert.notNull(deployer, "Module deployer is null");
    // 启动Module
    Future future = deployer.start();

    if (!deployer.isBackground()) {
        try {
            // 同步等待Module启动完成
            future.get();
        } catch (InterruptedException e) {
            logger.warn("Interrupted while waiting for dubbo module start: " + e.getMessage());
        } catch (Exception e) {
            logger.warn("An error occurred while waiting for dubbo module start: " + e.getMessage(), e);
        }
    }
}

当Dubbo监听到ContextRefreshedEvent,便启动ModuleModel,代码如下:

public synchronized Future start() throws IllegalStateException {
    if (isStopping() || isStopped() || isFailed()) {
        throw new IllegalStateException(getIdentifier() + " is stopping or stopped, can not start again");
    }

    try {
        if (isStarting() || isStarted()) {
            // 正在启动或启动完成,则直接返回
            return startFuture;
        }

        // 设置STARTING状态
        onModuleStarting();

        applicationDeployer.initialize();
        initialize();

        // 消费者端,无Service需要暴露
        exportServices();

    // 先启动内部ModuleModel若两者不一致的话
    if (moduleModel != moduleModel.getApplicationModel().getInternalModule()) {
        applicationDeployer.prepareInternalModule();
    }

        // 引用Service
        // 即处理@DubboReference产生的ReferenceBean相对应的ReferenceConfig
        referServices();

        if (asyncExportingFutures.isEmpty() && asyncReferringFutures.isEmpty()) {
            // 直接设置STARTED状态
            onModuleStarted();
        } else {
            executorRepository.getSharedExecutor().submit(() -> {
                try {
                    // 等待暴露Service完成
                    waitExportFinish();
                    // 等待引用Service完成
                    waitReferFinish();
                } catch (Throwable e) {
                    logger.warn("wait for export/refer services occurred an exception", e);
                } finally {
                    // 最后设置STARTED状态
                    onModuleStarted();
                }
            });
        }
    } catch (Throwable e) {
        onModuleFailed(getIdentifier() + " start failed: " + e, e);
        throw e;
    }
    return startFuture;
}

private void referServices() {
    // 遍历ReferenceConfig
    configManager.getReferences().forEach(rc -> {
        try {
            ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;
            if (!referenceConfig.isRefreshed()) {
                // 还没刷新则刷新ReferenceConfig的属性若需要的话
                referenceConfig.refresh();
            }

            if (rc.shouldInit()) {
                if (referAsync || rc.shouldReferAsync()) {
                    // 异步方式
                    ExecutorService executor = executorRepository.getServiceReferExecutor();
                    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                        try {
                            referenceCache.get(rc);
                        } catch (Throwable t) {
                            logger.error(getIdentifier() + " refer async catch error : " + t.getMessage(), t);
                        }
                    }, executor);

                    asyncReferringFutures.add(future);
                } else {
                    // 同步方式
                    referenceCache.get(rc);
                }
            }
        } catch (Throwable t) {
            logger.error(getIdentifier() + " refer catch error.");
            referenceCache.destroy(rc);
            throw t;
        }
    });
}

referenceCache.get(rc)动作会调用到ReferenceConfig.get()方法,从而触发Dubbo Reference的创建,代码如下:

public T get() {
    if (destroyed) {
        throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    }

    if (ref == null) {
        // 确保Module启动
        getScopeModel().getDeployer().start();

        synchronized (this) {
            // DCL
            if (ref == null) {
                // 引用还未创建,则创建
                init();
            }
        }
    }

    return ref;
}

protected synchronized void init() {
    if (initialized) {
        return;
    }
    initialized = true;

    if (!this.isRefreshed()) {
        this.refresh();
    }

    initServiceMetadata(consumer);

    serviceMetadata.setServiceType(getServiceInterfaceClass());
    serviceMetadata.setServiceKey(URL.buildKey(interfaceName, group, version));

Map<String, String> referenceParameters = appendConfig();
    initServiceAppsMapping(referenceParameters);

    ModuleServiceRepository repository = getScopeModel().getServiceRepository();
    ServiceDescriptor serviceDescriptor = repository.registerService(interfaceClass);
    consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor, this,
        getScopeModel(), serviceMetadata, createAsyncMethodInfo());

    repository.registerConsumer(consumerModel);

    serviceMetadata.getAttachments().putAll(referenceParameters);

    // 基于Invoker创建代理
    ref = createProxy(referenceParameters);

    serviceMetadata.setTarget(ref);
    serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);

    consumerModel.setProxyObject(ref);
    consumerModel.initMethodModels();

    // 校验Invoker是否可访问
    // 如果不可访问,就会抛出异常IllegalStateException,最终终止本项目启动
    checkInvokerAvailable();
}

private T createProxy(Map<String, String> referenceParameters) {
    if (shouldJvmRefer(referenceParameters)) {
        createInvokerForLocal(referenceParameters);
    } else {
        urls.clear();
        if (StringUtils.isNotEmpty(url)) {
            parseUrl(referenceParameters);
        } else {
            if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                aggregateUrlFromRegistry(referenceParameters);
            }
        }
        // 创建Invoker
        createInvokerForRemote();
    }

    if (logger.isInfoEnabled()) {
        logger.info("Referred dubbo service: [" + referenceParameters.get(INTERFACE_KEY) + "]." +
            (Boolean.parseBoolean(referenceParameters.get(GENERIC_KEY)) ?
                " it's GenericService reference" : " it's not GenericService reference"));
    }

    URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
        referenceParameters.get(INTERFACE_KEY), referenceParameters);
    consumerUrl = consumerUrl.setScopeModel(getScopeModel());
    consumerUrl = consumerUrl.setServiceModel(consumerModel);
    MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());

    // 基于Invoker创建Service代理,即Dubbo Reference
    // ProxyFactory$Adaptive
    return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}

总结

接口调用流程如下:

  1. 标注了注解DubboReference的Bean最终在Spring容器中是以JdkDynamicAopProxy(targetSource为DubboReferenceLazyInitTargetSource)形式存在;
  2. JdkDynamicAopProxy.invoke中,通过targetSource.getTarget()最终调用到ReferenceConfig.get(),从而找到真实服务的Dubbo Reference并进行调用;
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-05-11 16:17:11  更:2022-05-11 16:19:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 23:33:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码