IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Dubbo系列讲解之服务发现【3万字长文分享】 -> 正文阅读

[Java知识库]Dubbo系列讲解之服务发现【3万字长文分享】

请添加图片描述

服务注册的几个步骤

??对于RPC框架的服务注册,一般包含了如下的流程:

  • 加载服务提供者,可能是通过xml配置的,也可能是通过扫描注解的
  • 实例化服务提供者,并以服务接口作为key,实现类作为value存储到一个map容器中
  • 开启网络监听
  • 将服务提供者的地址路径(ip:port/服务名?参数等)注册到注册中心
  • 当网络监听接收到请求时,根据请求过来的服务名及参数等,从容器中获取到服务提供者实现,通过消费端调用时传送的方法名称反射调用服务提供者的相关方法

Dubbo源码分析

Dubbo与Spring的整合

??在实际的开发过程中,Dubbo大部分情况都是与Spring的生态进行整合使用的,所以在真正进入Dubbo的服务注册之前,我们需要先了解Dubbo是怎么将自己的环境嵌入到Spring生态中的。

??在Spring中使用Dubbo的方式有两种,一种是通过XML配置文件,一种是通过注解的方式,由于当下Spring Boot盛行,所以这里会比较深入的分析Dubbo在Spring Boot中的整合。不过其实两种方式最终的都是将Dubbo的相关组件注入到Spring 的容器中

??在Spring 中提供了一种NamespaceHandler的机制,用于对Spring标签的扩展,所以在Spring使用xml的方式时,Dubbo中会提供一个名为DubboNamespaceHandler的处理器,用于解析spring 的xml中的各种dubbo标签,并注入到容器中,这里不再深入。DubboNamespaceHandler源码如下:

public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanMetadataElement {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    @Override
    public void init() {
      // 将xml中的相关标签注入到spring中
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
        registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
        registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }

    /**
     * Override {@link NamespaceHandlerSupport#parse(Element, ParserContext)} method
     *
     * @param element       {@link Element}
     * @param parserContext {@link ParserContext}
     * @return
     * @since 2.7.5
     */
    @Override
    public BeanDefinition parse(Element element, ParserContext parserContext) {
        BeanDefinitionRegistry registry = parserContext.getRegistry();
        registerAnnotationConfigProcessors(registry);
        /**
         * @since 2.7.8
         * issue : https://github.com/apache/dubbo/issues/6275
         */
        registerCommonBeans(registry);
        BeanDefinition beanDefinition = super.parse(element, parserContext);
        setSource(beanDefinition);
        return beanDefinition;
    }

    /**
     * Register the processors for the Spring Annotation-Driven features
     *
     * @param registry {@link BeanDefinitionRegistry}
     * @see AnnotationConfigUtils
     * @since 2.7.5
     */
    private void registerAnnotationConfigProcessors(BeanDefinitionRegistry registry) {
        AnnotationConfigUtils.registerAnnotationConfigProcessors(registry);
    }
}

??接下来重点来看在Spring Boot中的整合。

??在Dubbo与Spring的整合,有两个入口可以让我们进入到Dubbo主见初始化的,第一种就是通过@EnableDubbo注解上的DubboComponentScan注解,它是一个@Import注解,Spring会通过引入一个DubboComponentScanRegistrar注册器在其registerBeanDefinitions方法上注入一个ServiceAnnotationBeanPostProcessor后置处理器。基于Spring的后置处理器原理,我们可以知道,它将会在bean实例化完成,初始化之前和之后各自产生回调,具体稍后再叙。注入方法如下:

private void registerServiceAnnotationBeanPostProcessor(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
    BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(ServiceAnnotationBeanPostProcessor.class);
    builder.addConstructorArgValue(packagesToScan);
    builder.setRole(2);
    AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
    BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry);
}

??另一个入口就是Spring Boot的自动装配,在Dubbo中存在一个DubboAutoConfiguration类,该类通过Spring Boot中的自动装配原理注册到IOC容器中,同时该类是一个配置类,在该类中同时也装配了一个ServiceAnnotationBeanPostProcessor的bean,方法如下:

@ConditionalOnProperty(prefix = DUBBO_SCAN_PREFIX, name = BASE_PACKAGES_PROPERTY_NAME)
    @ConditionalOnBean(name = BASE_PACKAGES_PROPERTY_RESOLVER_BEAN_NAME)
    @Bean
    public ServiceAnnotationBeanPostProcessor serviceAnnotationBeanPostProcessor(
            @Qualifier(BASE_PACKAGES_PROPERTY_RESOLVER_BEAN_NAME) PropertyResolver propertyResolver) {
        Set<String> packagesToScan = propertyResolver.getProperty(BASE_PACKAGES_PROPERTY_NAME, Set.class, emptySet());
        return new ServiceAnnotationBeanPostProcessor(packagesToScan);
    }

??接下来就进入到ServiceAnnotationBeanPostProcessor一探究竟。

??首先进入构造方法

 public ServiceAnnotationBeanPostProcessor(Set<String> packagesToScan) {
        super(packagesToScan);
 }

??这里将传入的packagesToScan往父类进行传递,由于它继承了ServiceClassPostProcessor,现在进入ServiceClassPostProcessor类的构造方法:

public ServiceClassPostProcessor(Set<String> packagesToScan) {
    this.packagesToScan = packagesToScan;
}

ServiceClassPostProcessor只是将传入的扫包路径赋值给packagesToScan

根据BeanPostProcessor的特性,现在进入到postProcessBeanDefinitionRegistry方法

@Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {

        // @since 2.7.5
        registerBeans(registry, DubboBootstrapApplicationListener.class);

        Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);

        if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
            registerServiceBeans(resolvedPackagesToScan, registry);
        } else {
            if (logger.isWarnEnabled()) {
                logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
            }
        }

    }

该方法主要做了以下几件事:

  • 注册了一个DubboBootstrapApplicationListener监听,具体作用稍后再叙
  • 调用resolvePackagesToScan方法解析所有包名的路径。可能包名中存在一Placeholders的特殊定义
  • 调用registerServiceBeans方法进行注册

??具体怎么解析包路径不在本次讨论范围,所有就先不深入了,现在直接进入到registerServiceBeans方法中

private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {

        DubboClassPathBeanDefinitionScanner scanner =
                new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);

        BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);

        scanner.setBeanNameGenerator(beanNameGenerator);

        // refactor @since 2.7.7
        serviceAnnotationTypes.forEach(annotationType -> {
            scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType));
        });

        for (String packageToScan : packagesToScan) {

            // Registers @Service Bean first
            scanner.scan(packageToScan);

            // Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
            Set<BeanDefinitionHolder> beanDefinitionHolders =
                    findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);

            if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {

                for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
                    registerServiceBean(beanDefinitionHolder, registry, scanner);
                }

                if (logger.isInfoEnabled()) {
                    logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
                            beanDefinitionHolders +
                            " } were scanned under package[" + packageToScan + "]");
                }

            } else {

                if (logger.isWarnEnabled()) {
                    logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["
                            + packageToScan + "]");
                }

            }

        }

    }

??该方法有主要做了以下几件事

  • 构建了一个DubboClassPathBeanDefinitionScanner对象,该对象继承自Spring的ClassPathBeanDefinitionScanner。在Spring中,ClassPathBeanDefinitionScanner是一个扫描程序,主要用来扫描Classpath下符合条件的对象,然后将对象注入到给定的registry中

  • 定义一个为Bean生成名称的BeanNameGenerator,这里生成的是AnnotationBeanNameGenerator这个策略

  • 将bean名称策略set到scanner中

  • 添加过滤Filter,这里遍历serviceAnnotationTypes,获取到所有的过滤条件,这里是基于注解的拦截,到serviceAnnotationTypes赋值的地方,可以看到。初始化了以下三种注解作为拦截

private final static List<Class<? extends Annotation>> serviceAnnotationTypes = asList(
        // @since 2.7.7 Add the @DubboService , the issue : https://github.com/apache/dubbo/issues/6007
        DubboService.class,
        // @since 2.7.0 the substitute @com.alibaba.dubbo.config.annotation.Service
        Service.class,
        // @since 2.7.3 Add the compatibility for legacy Dubbo's @Service , the issue : https://github.com/apache/dubbo/issues/4330
        com.alibaba.dubbo.config.annotation.Service.class
);
  • 遍历解析后的扫描的包,调用scanner.scan(packageToScan)注册所有标注了@Service的bean注入到ioc容器中

  • 调用findServiceBeanDefinitionHolders查找所有标注了@Service的Class封装成BeanDefinitionHolders,不管是否被@ComponentScan扫描

  • 如果beanDefinitionHolders存在元素,遍历beanDefinitionHolders,调用registerServiceBean注册

??将标注了@Service注解的bean注入到ioc容器不属于本次讨论内容,这里也不做详细说明
下面进入到findServiceBeanDefinitionHolders方法,了解一下该方法都返回了那些类型的BeanDefinitionHolder

private Set<BeanDefinitionHolder> findServiceBeanDefinitionHolders(
            ClassPathBeanDefinitionScanner scanner, String packageToScan, BeanDefinitionRegistry registry,
            BeanNameGenerator beanNameGenerator) {

        Set<BeanDefinition> beanDefinitions = scanner.findCandidateComponents(packageToScan);

        Set<BeanDefinitionHolder> beanDefinitionHolders = new LinkedHashSet<>(beanDefinitions.size());

        for (BeanDefinition beanDefinition : beanDefinitions) {

            String beanName = beanNameGenerator.generateBeanName(beanDefinition, registry);
            BeanDefinitionHolder beanDefinitionHolder = new BeanDefinitionHolder(beanDefinition, beanName);
            beanDefinitionHolders.add(beanDefinitionHolder);

        }

        return beanDefinitionHolders;

    }
  • 首先扫描传入的packageToScan包下的所有的符合在scanner中定义的过滤注解的.class文件,封装成BeanDefinition
  • 遍历扫描到的beanDefinitions,通过名称策略,为Bean生成名称,同时用Bean和名称构建成BeanDefinitionHolder,加入到beanDefinitionHolders中,返回该集合

??beanDefinitionHolders的获取到这里就已经完成了,接下来进入到 registerServiceBean方法中,看看具体的注册流程

private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,
                                 DubboClassPathBeanDefinitionScanner scanner) {

  // 通过beanDefinitionHolder中的BeanDefinition中保存的全类名通过Class.forName加载成class对象
    Class<?> beanClass = resolveClass(beanDefinitionHolder);

    Annotation service = findServiceAnnotation(beanClass);

    /**
     * The {@link AnnotationAttributes} of @Service annotation
     */
    AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false);

    Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);

    String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();

    AbstractBeanDefinition serviceBeanDefinition =
            buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);

    // ServiceBean Bean name
    String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);

    if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean
        registry.registerBeanDefinition(beanName, serviceBeanDefinition);

        if (logger.isInfoEnabled()) {
            logger.info("The BeanDefinition[" + serviceBeanDefinition +
                    "] of ServiceBean has been registered with name : " + beanName);
        }

    } else {

        if (logger.isWarnEnabled()) {
            logger.warn("The Duplicated BeanDefinition[" + serviceBeanDefinition +
                    "] of ServiceBean[ bean name : " + beanName +
                    "] was be found , Did @DubboComponentScan scan to same package in many times?");
        }

    }

}
  • 获取扫描到的类的字节码的class对象
  • 获取 beanClass上的注解,会跟初始化时的serviceAnnotationTypes属性中的注解进行匹配,返回匹配到的注解
  • 获取匹配到的注解service上的所有属性及其属性值serviceAnnotationAttributes
  • 获取扫描到的.class对象实现的接口的class对象interfaceClass
  • 获取组装BeanDefinitionHolder是为Bean生成的名称
  • 将所有参数传入到buildServiceBeanDefinition方法中,构建一个AbstractBeanDefinition的对象,这里我们暂时是不知道AbstractBeanDefinition保存的是哪个Bean的定义
  • 通过generateServiceBeanName方法构建一个ServiceBean的名称。
  • 检查是否存在重复名称的bean,如果不存在,则直接注入AbstractBeanDefinition的定义到ioc容器中

??现在我们先来探索一下在buildServiceBeanDefinition中构建的是一个什么Bean的定义,由于方法比较长,这里就不贴代码了,该方法的大概的流程就是创建了一个ServiceBean的BeanDefinition。然后组装前面解析到的注解的参数和获取到的实现类的接口等为ServiceBean的属性进行赋值,然后最后返回一个ServiceBean的BeanDefinition。

??然后再来看看ServiceBean的命名规则是怎么样的

private String generateServiceBeanName(AnnotationAttributes serviceAnnotationAttributes, Class<?> interfaceClass) {
    ServiceBeanNameBuilder builder = create(interfaceClass, environment)
            .group(serviceAnnotationAttributes.getString("group"))
            .version(serviceAnnotationAttributes.getString("version"));
    return builder.build();
}

??可以看到,ServiceBean的命名规则是通过接口的全类名以及group,version等一起来保证唯一名称的,或许是长这样的ServiceBean:com.bobo.dubbo.api.HelloService:2.0.1这一系列操作下来,就是为了构建一个ServiceBean。而我们在DubboNamespaceHandler的方式中,也可以看到,最终也注入了一个ServiceBean。那么ServiceBean到底有何神奇之处呢?马上揭晓!进入到ServiceBean,看到其继承了ServiceConfig,同时实现了InitializingBean,DisposableBean,ApplicationContextAware,BeanNameAware,ApplicationEventPublisherAware等接口。而ServiceConfig又继承了AbstractConfig类,它是,比如Service,Refrence,application,Monitor等配置类的父类,我们进入AbstractConfig类,发现它存在一个@PostConstruct注解标注的方法,它会在spring的bean初始化完成之后执行,我们进入该方法

@PostConstruct
public void addIntoConfigManager() {
    ApplicationModel.getConfigManager().addConfig(this);
}

进入到ApplicationModel.getConfigManager()方法

public static ConfigManager getConfigManager() {
    return (ConfigManager) LOADER.getExtension(ConfigManager.NAME);
}

??看到这里,上一篇的SPI知识就排上了用场了,这是一个用来获取FrameworkExt接口的扩展实现的扩展点,同时ConfigManager.NAME指定了需要获取的扩展点的名称为config。所以就到FrameworkExt的实现类中查找一个名为config的扩展点实现即可得到。其实这里得到的就是ConfigManager本身。

??所以在addIntoConfigManager方法中,实际上是将当前的bean保存到了ConfigManager的对象中,最终保存到了ConfigManager的configsCache中。ConfigManager主要是用来管理Dubbo中的所有继承了AbstractConfig的配置

??而在ServiceBean中,我们并没有看到有任何任何有价值的东西,到这里看起来似乎前路已断,不知道怎么样入手了。

??此时突然想起来我们在进行扫包的一系列操作之前,貌似注册了一个监听器,是不是可以从监听器入手呢?

进入到前面注册的DubboBootstrapApplicationListener监听器中

public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
        implements Ordered {

    /**
     * The bean name of {@link DubboBootstrapApplicationListener}
     *
     * @since 2.7.6
     */
    public static final String BEAN_NAME = "dubboBootstrapApplicationListener";

    private final DubboBootstrap dubboBootstrap;

    public DubboBootstrapApplicationListener() {
        this.dubboBootstrap = DubboBootstrap.getInstance();
    }

    @Override
    public void onApplicationContextEvent(ApplicationContextEvent event) {
        if (event instanceof ContextRefreshedEvent) {
            onContextRefreshedEvent((ContextRefreshedEvent) event);
        } else if (event instanceof ContextClosedEvent) {
            onContextClosedEvent((ContextClosedEvent) event);
        }
    }

    private void onContextRefreshedEvent(ContextRefreshedEvent event) {
        dubboBootstrap.start();
    }

    private void onContextClosedEvent(ContextClosedEvent event) {
        dubboBootstrap.stop();
    }

    @Override
    public int getOrder() {
        return LOWEST_PRECEDENCE;
    }
}

??可以看到,该监听器监听了容器的容器的刷新和关闭,我们前面的操作已经将ServiceBean注入到了ioc容器中,根据ioc的容器初始化的几个周期,可以知道在Refreshd容器时,我们所有的服务提供者对应的ServiceBean已经全部装载到了容器中。

??继续往下,当产生ContextRefreshedEvent事件时,调用了onContextRefreshedEvent方法,该方法中调用dubboBootstrap.start();

??到这里,跟Spring相关的东西已经走完了,下面做一个总结

  • 通过Spring Boot的自动装配或@EnableDubbo注解自动注入一个ServiceAnnotationBeanPostProcessor传入需要扫描的包的路径
  • 注册了一个``DubboBootstrapApplicationListener`监听
  • 根据BeanPostProcessor的特性,调用postProcessBeanDefinitionRegistry方法,根据传入的扫包路径进行扫描,然后将所有的标注了@Service注解的bean注入到ioc容器中
  • 继续扫描包,获得标注了Service/DubboService等注解的所有BeanDefinitionHolders
  • 遍历BeanDefinitionHolders,解析出每个BeanDefinition中的接口,标注的注解,及注解上定义的参数等。
  • 通过解析出来的一系列信息生成一个ServiceBean.然后将ServiceBean注入到ioc容器中。
  • 同时在ServiceBean的父类AbstractConfig中,会存在一个标注了@PostConstruct注解的方法,它会在bean初始化完成之后,将当前bean保存到一个ConfigManager对象中,它dubbo环境中是一个单例的存在。
  • 在Spring进行Refresh容器时,会触发一个事件,调用dubboBootstrap.start();方法,启动

??接下来就真正的进入到Dubbo的服务发布,注册的世界,一探究竟吧

Dubbo的服务注册与发布

??在进入start()方法之前,首先需要看看dubboBootstrap的初始化过程,它是一个单例的对象,直接进入DubboBootstrap的构造方法

private DubboBootstrap() {
    configManager = ApplicationModel.getConfigManager();
    environment = ApplicationModel.getEnvironment();

    DubboShutdownHook.getDubboShutdownHook().register();
    ShutdownHookCallbacks.INSTANCE.addCallback(new ShutdownHookCallback() {
        @Override
        public void callback() throws Throwable {
            DubboBootstrap.this.destroy();
        }
    });
}

??可以看到,初始化时构建了configManager和environment,其中configManager主要用于管理Dubbo中的所有配置。Environment展示先不关注

??根据以上的分析,我们现在进入到start()方法

public DubboBootstrap start() {
  	// 已经启动过后,就不用再次启动了
    if (started.compareAndSet(false, true)) {
        ready.set(false);
      // 初始化方法,就是检查一些配置,启动配置中心等等
        initialize();
        if (logger.isInfoEnabled()) {
            logger.info(NAME + " is starting...");
        }
        // 1. export Dubbo Services
      // 真正执行发布服务的方法
        exportServices();

        // Not only provider register
        if (!isOnlyRegisterProvider() || hasExportedServices()) {
            // 2. export MetadataService
            exportMetadataService();
            //3. Register the local ServiceInstance if required
            registerServiceInstance();
        }

        referServices();
        if (asyncExportingFutures.size() > 0) {
            new Thread(() -> {
                try {
                    this.awaitFinish();
                } catch (Exception e) {
                    logger.warn(NAME + " exportAsync occurred an exception.");
                }
                ready.set(true);
                if (logger.isInfoEnabled()) {
                    logger.info(NAME + " is ready.");
                }
            }).start();
        } else {
            ready.set(true);
            if (logger.isInfoEnabled()) {
                logger.info(NAME + " is ready.");
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info(NAME + " has started.");
        }
    }
    return this;
}

??调用了exportServices方法进行了服务的发布和注册,调用referServices方法进行服务的发现,服务发现将留到下一篇去,今天只对服务的注册进行探索。

??进入到exportServices方法

private void exportServices() {
    configManager.getServices().forEach(sc -> {
        // TODO, compatible with ServiceConfig.export()
        ServiceConfig serviceConfig = (ServiceConfig) sc;
        serviceConfig.setBootstrap(this);

        if (exportAsync) {
            ExecutorService executor = executorRepository.getServiceExporterExecutor();
            Future<?> future = executor.submit(() -> {
                sc.export();
                exportedServices.add(sc);
            });
            asyncExportingFutures.add(future);
        } else {
            sc.export();
            exportedServices.add(sc);
        }
    });
}

??遍历我们在Spring Boot环节时添加到configManager的所有ServiceConfig,将当前的对象传入到ServiceConfig中,同步或异步调用ServiceConfig的export方法。

??进入到export方法

public synchronized void export() {
    if (!shouldExport()) {
        return;
    }

    if (bootstrap == null) {
        bootstrap = DubboBootstrap.getInstance();
        bootstrap.init();
    }

    checkAndUpdateSubConfigs();

    //init serviceMetadata
    serviceMetadata.setVersion(version);
    serviceMetadata.setGroup(group);
    serviceMetadata.setDefaultGroup(group);
    serviceMetadata.setServiceType(getInterfaceClass());
    serviceMetadata.setServiceInterfaceName(getInterface());
    serviceMetadata.setTarget(getRef());

    if (shouldDelay()) {
        DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
    } else {
        doExport();
    }

    exported();
}

??该方法算是Dubbo服务发布的入口流程方法了。

  • 判断是否应该发布本服务
  • 如果DubboBootstrap对象为null,初始化一个DubboBootstrap对象
  • 检查是否更新存根配置
  • 初始化ServiceMetadata,将注入Bean时初始化的一些参数保存到serviceMetadata中
  • 延时或同步调用doExport

??进入doExport

protected synchronized void doExport() {
    if (unexported) {
        throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
    }
    if (exported) {
        return;
    }
    exported = true;

    if (StringUtils.isEmpty(path)) {
        path = interfaceName;
    }
    doExportUrls();
}

??做了一系列判断,标识等初始化之后,再调用doExportUrls方法

private void doExportUrls() {
    ServiceRepository repository = ApplicationModel.getServiceRepository();
    ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
    repository.registerProvider(
            getUniqueServiceName(),
            ref,
            serviceDescriptor,
            this,
            serviceMetadata
    );

    List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

    for (ProtocolConfig protocolConfig : protocols) {
        String pathKey = URL.buildKey(getContextPath(protocolConfig)
                .map(p -> p + "/" + path)
                .orElse(path), group, version);
        // In case user specified path, register service one more time to map it to path.
        repository.registerService(pathKey, interfaceClass);
        // TODO, uncomment this line once service key is unified
        serviceMetadata.setServiceKey(pathKey);
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}
  • 获取到一个ServiceRepository,根据前面分析的经验,可以看出这里得到的就是一个ServiceRepository对象。
  • 根据服务的接口名称和字节码封装一个ServiceDescriptor保存到repository的services中。并返回ServiceDescriptor
  • 调用registerProvider方法,将唯一服务名,服务的Provider,serviceDescriptor,当前对象,ServiceMetadata等传入方法。

??解析注册中心的URLregistryURLs,这里返回的是如下的URL

registry://192.168.100.127:2181/org.apache.dubbo.registry.RegistryService?application=spring-cloud-alibaba-boot-dubbo-provider&default=true&dubbo=2.0.2&pid=48053&preferred=true&qos.enable=false&registry=zookeeper&release=2.7.7&timeout=10000&timestamp=1598411022992
  • 遍历protocols,根据遍历到的协议拼接成不同的pathKey,调用registerService进行注册,保存服务源信息
  • 根据不同的协议,调用doExportUrlsFor1Protocol方法进行注册

??进入registerProvider方法,该方法会将传入的对象构建成一个ProviderModel对象。保存到相应的集合中,同时在ProviderModel对象初始化时,会调用将该接口的所有方法遍历,构建成一个ProviderMethodModel保存到methods中。

??然后进入到doExportUrlsFor1Protocol,方法过长,这里就不贴代码了。其主要完成了以下功能

  • 根据初始化ServiceBean时传入的各个参数,封装成一个map
  • 获取当前服务器的host
  • 获取当前服务需要监听的port
  • 根据封装的参数,协议,host,port构建一个URL
  • 发布一个本地服务-injvm
  • 获取到配置的注册中心的URL,可以存在多个注册中心,这就是Dubbo对多注册中心的支持
  • 添加注册中心的URL的参数
  • 生成monitor的URL
  • 再次封装发布服务的URL的参数
  • 通过Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));将当前服务的url作为参数添加到注册中心的url上,然后使用registryURL和当前服务接口字节码,服务实现构建一个invoker,这是一个属于注册中心的invoker;
  • 使用Invoker和当前的ServiceConfig构建一个 DelegateProviderMetaDataInvoker对象
  • 调用Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);进行服务的发布
  • 将返回的exporter添加到exporters中

??接下来看看是怎么获取到Invoker的,根据我们的SPI的知识,在没有参数中没有指定扩展点时,会使用默认@SPI注解上默认指定的扩展点,由于在ProxyFactory类上的注解为@SPI(“javassist”),所以可以知道这里获取到的扩展点为JavassistProxyFactory的对象,在进入JavassistProxyFactory的getInvoker()方法之前,根据我们学习SPI的知识,或许该扩展点存在一些包装,这里就不详细说明了,主要讲服务发布的主要流程。进入该类的getInvoker()方法。

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

??根据传入的服务接口的class对象,动态生成的一个包装器,该包装器继承了Wrapper了,重写了invokeMethod()方法。重写方法如下

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
        com.wangx.spring.cloud.alibaba.provider.HelloServiceImpl w;
        try {
            w = ((com.wangx.spring.cloud.alibaba.provider.HelloServiceImpl) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        try {
            if ("hello".equals($2) && $3.length == 1) {
                return ($w) w.hello((java.lang.String) $4[0]);
            }
        } catch (Throwable e) {
            throw new java.lang.reflect.InvocationTargetException(e);
        }
        throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.wangx.spring.cloud.alibaba.provider.HelloServiceImpl.");
    }

??所以,当AbstractProxyInvoker的doInvoke方法被调用的时候,会直接执行被传入服务提供者的具体方法。这样做的好处就是在服务启动时就将方法调用准备好,在被远程调用时,直接通过引用调用,而不需要通过反射调用。提高性能。回到doExportUrlsFor1Protocol方法,根据返回的invoker和当前对象包装一个DelegateProviderMetaDataInvoker对象,接下来调用Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);方法。

??现在先来确定PROCTOL的具体实现是什么,PROCTOL是一个自适应的扩展点,它会生成一个Proctol&Adaptie的类,该类实现了Protocol接口,重写了Protocol的export和refer()方法。这里只讨论生成的export方法,如下:

public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
    if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
    if (arg0.getUrl() == null)
        throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
    org.apache.dubbo.common.URL url = arg0.getUrl();
    String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
    if (extName == null)
        throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
    org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.export(arg0);
}

??该方法会根据传入的invoker对象中的 protocol作为扩展名,获取Protocol的扩展实现,因为我上一步我们传入的是registryURl,根据注解的协议可以知道,通过自适应扩展对象获取到的扩展实现为RegistryProtocol的对象,进入该类的export方法。

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    URL registryUrl = getRegistryUrl(originInvoker);
    // url to export locally
    URL providerUrl = getProviderUrl(originInvoker);

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
    //  the same service. Because the subscribed is cached key with the name of the service, it causes the
    //  subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // url to registry
    final Registry registry = getRegistry(originInvoker);
    final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

    // decide if we need to delay publish
    boolean register = providerUrl.getParameter(REGISTER_KEY, true);
    if (register) {
        register(registryUrl, registeredProviderUrl);
    }

    // register stated url on provider model
    registerStatedUrl(registryUrl, registeredProviderUrl, register);

    // Deprecated! Subscribe to override rules in 2.6.x or before.
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);

    notifyExport(exporter);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter);
}
  • 调用getRegistryUrl(originInvoker);方法获取注册中心的真正的URL,这里会将registry替换成我们配置的zookeeper,如果配置的是nacos,则返回nacos
  • 调用getProviderUrl方法获取服务发布的url,稍后会将该服务发布到注册中心上
  • 调用doLocalExport(originInvoker, providerUrl);发布一个本地服务
  • 调用getRegistry(originInvoker);获取真正配置的注册中心的Registry.
  • 将服务注册到注册中心
    进入doLocalExport(originInvoker, providerUrl);方法
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);

    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
    });
}

??首先获取服务的的key,格式如下:

dubbo://192.168.100.127:20880/com.wangx.dubbo.api.HelloService?anyhost=true&application=spring-cloud-alibaba-boot-dubbo-provider&bind.ip=192.168.100.127&bind.port=20880&deprecated=false&dubbo=2.0.2&generic=false&interface=com.wangx.dubbo.api.HelloService&methods=hello&pid=58949&qos.enable=false&release=2.7.7&revision=2.0.1&side=provider&timestamp=1598493906319&version=2.0.1

??包含了服务的协议,端口,服务名称,应用名称,版本等信息。将入传入的Invoker和providerUrl构建一个InvokerDelegate类型的对象。并将服务提供者URL赋值为InvokerDelegate对象的url属性,将该对象传入到protocol.export(invokerDelegate)方法中。根据SPI的原理,这里会依赖注入一个Procotol$Adaptive的自适应扩展类,此时传入的是Dubbo协议的URL,所以这里实际会执行的是DubboProtocol中的方法。进入DubboProtocol类中的export方法

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }

        }
    }

    openServer(url);
    optimizeSerialization(url);

    return exporter;
}
  • 根据URL获取一个service的key,构建一个DubboExporter对象,该对象包含了传入的key,invoker,exporterMap等,在invoker中有嵌套了最开始生成的能够实际执行服务提供者方法的代理对象。
  • 将DubboExporter的对象保存到exporterMap集合中。
  • 调用openServer方法发布服务
  • 序列化url

进入openServer(url)

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client can export a service which's only for server to invoke
    boolean isServer = url.getParameter(IS_SERVER_KEY, true);
    if (isServer) {
        ProtocolServer server = serverMap.get(key);
        if (server == null) {
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url));
                }
            }
        } else {
            // server supports reset, use together with override
            server.reset(url);
        }
    }
}

??刚方法主要双重检查是否存在当前地址的ProtocolServer,不存在创建一个保存到serverMap中。进入createServer方法该方法主要是调用Exchangers.bind(url, requestHandler);方法对地址和端口进行监听,然后转入一个requestHandler对象,当接收到请求时,调用requestHandler对象的reply方法进行处理

??这里不再深入到netty网络部分,直接来看看接收到请求的时候,是怎么处理的,进入到reply方法

@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

    if (!(message instanceof Invocation)) {
        throw new RemotingException(channel, "Unsupported request: "
                + (message == null ? null : (message.getClass().getName() + ": " + message))
                + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }

    Invocation inv = (Invocation) message;
    Invoker<?> invoker = getInvoker(channel, inv);
    // need to consider backward-compatibility if it's a callback
    if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
        String methodsStr = invoker.getUrl().getParameters().get("methods");
        boolean hasMethod = false;
        if (methodsStr == null || !methodsStr.contains(",")) {
            hasMethod = inv.getMethodName().equals(methodsStr);
        } else {
            String[] methods = methodsStr.split(",");
            for (String method : methods) {
                if (inv.getMethodName().equals(method)) {
                    hasMethod = true;
                    break;
                }
            }
        }
        if (!hasMethod) {
            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                    + " not found in callback service interface ,invoke will be ignored."
                    + " please update the api interface. url is:"
                    + invoker.getUrl()) + " ,invocation is :" + inv);
            return null;
        }
    }
    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
    Result result = invoker.invoke(inv);
    return result.thenApply(Function.identity());
    }
   

??将接收到的message转成Invocation对象,这是rpc在传输过程中对于调用端的一些参数的封装,包含了服务名,端口,方法名,方法参数等,通过inv获取到invoker,进入getInvoker该方法主要通过inv封装成一个serviceKey,然后通过该serviceKey从exporterMap容器中获取到对应的DubboExporter,然后从该DubboExporter中获取到初始化时传入的invoker回到reply方法,会调用返回的invoker.invoker方法,该invoker中最底层封装了一个最原始的AbstractProxyInvoker的invoker,所以最终会调用到该类型的对象的invoker,如下:

public Result invoke(Invocation invocation) throws RpcException {
     try {
         Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value);
         CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
             AppResponse result = new AppResponse();
             if (t != null) {
                 if (t instanceof CompletionException) {
                     result.setException(t.getCause());
                 } else {
                     result.setException(t);
                 }
             } else {
                 result.setValue(obj);
             }
             return result;
         });
         return new AsyncRpcResult(appResponseFuture, invocation);
     } catch (InvocationTargetException e) {
         if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
             logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
         }
         return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
     } catch (Throwable e) {
         throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
     }
 }

??然后这里会调用一个doInvoker()方法,这是一个模板方法,正好对应前面创建的如下代码

 return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };

??所以最终调用wrapper.invokerMethod方法,然后调用对应服务的对应的方法。

??到这里服务本地监听和当请求过来时的处理就大致聊完了,接下来重新回到RegistryProtocol类的export方法

??进入到register(registryUrl, registeredProviderUrl);方法

private void register(URL registryUrl, URL registeredProviderUrl) {
    Registry registry = registryFactory.getRegistry(registryUrl);
    registry.register(registeredProviderUrl);
}

??这里的registryUrl经过转换,已经变成了我们配置的zokeeper协议的url,所以这里我们将会获得一个ZookeeperRegistry的对象。将registeredProviderUrl传入到register方法中,在进入到ZookeeperRegistry中时,发现并没有register方法,那么只可能存在于它的父类中,在它的父类FailbackRegistry中,我们找到了这个方法,进入

public void register(URL url) {
    if (!acceptable(url)) {
        logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
        return;
    }
    super.register(url);
    removeFailedRegistered(url);
    removeFailedUnregistered(url);
    try {
        // Sending a registration request to the server side
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        addFailedRegistered(url);
    }
}

??该发放做了一系列操作之后,实际调用了doRegister()方法,该方法是一个抽象方法,由子类实现,这里就是有zookeeperRegistry进行实现的。

??进入到ZookeeperRegistry中的doRegister()方法

@Override
public void doRegister(URL url) {
    try {
        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

??发现就是调用zkClient在注册中心上创建一些节点,至此,我们的整个服务发布和注册功能都已经完成了

接下来总结一下在Dubbo的整个阶段,有哪些主要的过程

  • ServiceBean注册到IOC容器时会将自身对象保存到ConfigManager的对象中
  • 在Dubbo调用DubboBoostrap.start()中的发布方法时,遍历ConfigManager对象中保存的ServiceBean,开始了服务注册之旅
  • 调用ServiceBean的父类的export()方法,进行真正的服务注册,该对象中会保存一个ref属性,这是一个服务真正的提供者
  • 获取配置的注册中心,因为可能存在多个注册中心,需要遍历多个,这就是多注册中心的实现,拿到注册中心的url
  • 构建服务提供这的URL并封装各项参数,根据参数和注册中心的url接服务提供者实现生成一个Invoker,该invoker中会保存一个代理对象,该对象那个的invokerMethod()方法将会直接调用服务提供者的对应的方法
  • 将该invoker进行一些列封装,然后保存到一个map集合中,使用netty开启一个服务提供者的端口监听,并将一个requestHandler进行绑定,当接收到请求时,调用该对象的reply方法,该方法最终会执行上一步骤生成的invokerMethod()方法,这就形成了一个发布和调用的闭环
  • 在发布完成本地服务,开启了端口的监听之后,需要将服务提供者的URL注册到注册中心,根据注册中心配置的协议,这里获取到了Zookeeper的注册中心,然后将服务提供者的URL通过zkClint在注册中心上创建一个node,表示注册完成

搞定~

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-08-17 15:15:56  更:2021-08-17 15:16:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 9:21:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码