IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> dubbo 应用升级 -> 正文阅读

[大数据]dubbo 应用升级


dubbo 应用升级

?????????

官网:https://dubbo.apache.org/zh/docs/migration/
???????????

???????????????

????????????????????????????????????

应用升级

???????????

dubbo 3.x 升级依赖

        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>3.0.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-x-discovery</artifactId>
            <version>5.2.0</version>
        </dependency>

        <!-- tri协议序列化使用,如不使用triple协议,可不导入 -->
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>4.0.0-rc-2</version>
        </dependency>

??????????????

在地址注册与发现环节,3.x(应用粒度、接口粒度)与2.x(接口粒度)是完全兼容的,升级步骤如下:

# 服务端双注册
升级jar包依赖到最新版本,配置双注册开关:dubbo.application.register-mode
register-mode可选值:all(默认)、interface(接口粒度)、
                   instance(应用粒度)

# 消费端双订阅
升级jar包依赖到最新版本,配置双订阅开关:dubbo.application.service-discovery.migration=APPLICATION_FIRST
service-discovery.migration可选值:APPLICATION_FIRST(应用粒度优先)、
                                  FORCE_INTERFACE(强制使用接口粒度)、
                                  FORCE_APPLICATION(强制使用应用粒度)

# 服务端单注册(注册粒度修改为应用粒度)
服务端在所有消费端切换到应用粒度注册后,变换为单注册:register-mode修改为instance

??????????????????

**************

服务端双注册

??????????

??????????????????

?????????????????

相关说明:

# 消费端服务调用影响
服务端升级到3.x后,同时向注册中心注册接口级、应用级的服务;
2.x、3.x的消费端均可从注册中心拉取到服务信息,不影响服务使用;

# 注册中心存储空间影响
双注册不可避免带来额外的存储需求,但是应用级别服务相对于接口粒度所需的存储空间很小;
对于一个普通集群,数应用粒度注册数据所需的存储空间大致为接口粒度注册数据1/100~1/1000

?????????????????

**************

消费端双订阅

???????

??????????????????

? ? ? ? ? ? ? ? ? ? ? ??

相关说明:

dubbo3.x 默认支持双订阅,可通过参数dubbo.application.service-discovery.migration设置;

# dubbo.application.service-discovery.migration可选值:
APPLICATION_FIRST:智能决策接口级、应用级地址(MigrationAddressComparator.shouldMigrate判断),双订阅(默认)
FORCE_INTERFACE:强制消费接口级地址,如无地址则报错,单订阅2.x地址
FORCE_APPLICATION:强制消费应用级地址,如无地址则报错,单订阅3.x地址

# APPLICATION_FIRST:双订阅说明
对于双订阅的场景,消费端虽然可同时持有 2.x 地址与 3.x 地址,
但选址过程中两份地址是完全隔离的:要么用2.x 地址,要么用3.x 地址,
不存在两份地址混合调用的情况,这个决策过程是在收到第一次地址通知后就完成了的。

???????????? ?????????????

??????????????????????? ???????????

????????????????????????????????????

订阅策略实现原理

???

application.yml

dubbo:
  application:
    name: dubbo-consumer
    service-discovery:
      migration: FORCE_INTERFACE     #消费端订阅策略设置为接口粒度
  registry:
    protocol: zookeeper
    address: localhost:2181
    group: dubbo
    #register-mode: instance
  protocol:
    name: dubbo
    #port: 20880

server:
  port: 8081

??????????

消费端获取订阅策略调用栈

# MigrationRule.getStep:获取订阅策略
at org.apache.dubbo.registry.client.migration.model.MigrationRule.getStep(MigrationRule.java:183)
at org.apache.dubbo.registry.client.migration.MigrationRuleHandler.doMigrate(MigrationRuleHandler.java:51)
	  - locked <0x259c> (a org.apache.dubbo.registry.client.migration.MigrationRuleHandler)

# refer操作
at org.apache.dubbo.registry.client.migration.MigrationRuleListener.onRefer(MigrationRuleListener.java:241)
at org.apache.dubbo.registry.integration.RegistryProtocol.interceptInvoker(RegistryProtocol.java:531)
at org.apache.dubbo.registry.integration.RegistryProtocol.doRefer(RegistryProtocol.java:500)
at org.apache.dubbo.registry.integration.RegistryProtocol.refer(RegistryProtocol.java:485)
at org.apache.dubbo.qos.protocol.QosProtocolWrapper.refer(QosProtocolWrapper.java:83)
at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:74)
at org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:71)
at org.apache.dubbo.rpc.protocol.ProtocolSerializationWrapper.refer(ProtocolSerializationWrapper.java:52)
at org.apache.dubbo.rpc.Protocol$Adaptive.refer(Protocol$Adaptive.java:-1)

# ReferenceConfig类
at org.apache.dubbo.config.ReferenceConfig.createInvokerForRemote(ReferenceConfig.java:481)
at org.apache.dubbo.config.ReferenceConfig.createProxy(ReferenceConfig.java:386)
at org.apache.dubbo.config.ReferenceConfig.init(ReferenceConfig.java:275)
	  - locked <0x259d> (a org.apache.dubbo.config.ReferenceConfig)
at org.apache.dubbo.config.ReferenceConfig.get(ReferenceConfig.java:216)

# SimpleReferenceCache类
at org.apache.dubbo.config.utils.SimpleReferenceCache.get(SimpleReferenceCache.java:110)

# DefaultModuleDeployer类
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.lambda$referServices$6(DefaultModuleDeployer.java:384)
at org.apache.dubbo.config.deploy.DefaultModuleDeployer$$Lambda$905/0x0000000801116b38.accept(Unknown Source:-1)
	  at java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4780)
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.referServices(DefaultModuleDeployer.java:364)
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.start(DefaultModuleDeployer.java:151)
	  - locked <0x259e> (a org.apache.dubbo.config.deploy.DefaultModuleDeployer)

# DubboDeployApplicationListener类
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onContextRefreshedEvent(DubboDeployApplicationListener.java:108)
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:98)
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:44)

# SimpleApplicationEventMulticaster类
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143)

# AbstractApplicationContext类
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:421)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:378)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:938)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586)
	  - locked <0x259f> (a java.lang.Object)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145)

# SpringApplication类
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:730)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:302)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1290)

# 应用启动入口
at com.example.demo.DemoApplication.main(DemoApplication.java:14)

???????????????

MigrationRule:获取订阅规则

public class MigrationRule {

    public MigrationStep getStep(URL consumerURL) {
        if (interfaceRules != null) {
            SubMigrationRule rule = interfaceRules.get(consumerURL.getDisplayServiceKey());
            if (rule != null) {
                if (rule.getStep() != null) {
                    return rule.getStep();
                }
            }
        }

        if (applications != null) {
            ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension(consumerURL.getScopeModel());
            Set<String> services = serviceNameMapping.getServices(consumerURL);
            if (CollectionUtils.isNotEmpty(services)) {
                for (String service : services) {
                    SubMigrationRule rule = applicationRules.get(service);
                    if (rule.getStep() != null) {
                        return rule.getStep();
                    }
                }
            }
        }

        /**
         * FIXME, it's really hard to follow setting default values here.
         */
        if (step == null) {        //如果step为null,默认将其设置为APPLICATION_FIRST
            // initial step : APPLICATION_FIRST
            step = MigrationStep.APPLICATION_FIRST;
            step = Enum.valueOf(MigrationStep.class,
                consumerURL.getParameter(MIGRATION_STEP_KEY,
                    ConfigurationUtils.getCachedDynamicProperty(consumerURL.getScopeModel(), DUBBO_SERVICEDISCOVERY_MIGRATION, step.name())));
                                   //从配置中心读取step配置,key为dubbo.application.service-discovery.migration
                                   
        }

        return step;
    }

??????????????????

???????????????

MigrationStep:订阅策略

public enum MigrationStep {
    FORCE_INTERFACE,
    APPLICATION_FIRST,
    FORCE_APPLICATION
}

???????????????

????????????????

????????????????????????????????????

双订阅选址实现原理

???

application.yml

dubbo:
  application:
    name: dubbo-consumer
    #service-discovery:
     # migration: FORCE_INTERFACE      #设置消费端订阅模式,默认APPLICATION_FIRST
  registry:
    protocol: zookeeper
    address: localhost:2181
    group: dubbo
    #register-mode: instance
  protocol:
    name: dubbo
    #port: 20880

server:
  port: 8081

??????????????

消费端双订阅地址选址决策调用栈

# MigrationInvoker.calcPreferredInvoker:选择使用的invoker
at org.apache.dubbo.registry.client.migration.MigrationInvoker.calcPreferredInvoker(MigrationInvoker.java:472)
	  - locked <0x290a> (a org.apache.dubbo.registry.client.migration.MigrationInvoker)
at org.apache.dubbo.registry.client.migration.MigrationInvoker.migrateToApplicationFirstInvoker(MigrationInvoker.java:244)

# MigrationRuleHandler类
at org.apache.dubbo.registry.client.migration.MigrationRuleHandler.refreshInvoker(MigrationRuleHandler.java:73)
at org.apache.dubbo.registry.client.migration.MigrationRuleHandler.doMigrate(MigrationRuleHandler.java:57)
	  - locked <0x2947> (a org.apache.dubbo.registry.client.migration.MigrationRuleHandler)

# refer操作
at org.apache.dubbo.registry.client.migration.MigrationRuleListener.onRefer(MigrationRuleListener.java:241)
at org.apache.dubbo.registry.integration.RegistryProtocol.interceptInvoker(RegistryProtocol.java:531)
at org.apache.dubbo.registry.integration.RegistryProtocol.doRefer(RegistryProtocol.java:500)
at org.apache.dubbo.registry.integration.RegistryProtocol.refer(RegistryProtocol.java:485)
at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:74)
at org.apache.dubbo.qos.protocol.QosProtocolWrapper.refer(QosProtocolWrapper.java:83)
at org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:71)
at org.apache.dubbo.rpc.protocol.ProtocolSerializationWrapper.refer(ProtocolSerializationWrapper.java:52)
at org.apache.dubbo.rpc.Protocol$Adaptive.refer(Protocol$Adaptive.java:-1)

# ReferenceConfig类
at org.apache.dubbo.config.ReferenceConfig.createInvokerForRemote(ReferenceConfig.java:481)
at org.apache.dubbo.config.ReferenceConfig.createProxy(ReferenceConfig.java:386)
at org.apache.dubbo.config.ReferenceConfig.init(ReferenceConfig.java:275)
	  - locked <0x2948> (a org.apache.dubbo.config.ReferenceConfig)
at org.apache.dubbo.config.ReferenceConfig.get(ReferenceConfig.java:216)

# SimpleReferenceCache类
at org.apache.dubbo.config.utils.SimpleReferenceCache.get(SimpleReferenceCache.java:110)

# DefaultModuleDeployer类
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.lambda$referServices$6(DefaultModuleDeployer.java:384)
at org.apache.dubbo.config.deploy.DefaultModuleDeployer$$Lambda$905/0x0000000801117538.accept(Unknown Source:-1)
	  at java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4780)
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.referServices(DefaultModuleDeployer.java:364)
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.start(DefaultModuleDeployer.java:151)
	  - locked <0x2949> (a org.apache.dubbo.config.deploy.DefaultModuleDeployer)
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onContextRefreshedEvent(DubboDeployApplicationListener.java:108)
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:98)
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:44)

# SimpleApplicationEventMulticaster类
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143)

# AbstractApplicationContext类
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:421)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:378)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:938)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586)
	  - locked <0x294a> (a java.lang.Object)

# ServletWebServerApplicationContext类
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145)

# SpringApplication类
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:730)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:302)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1290)

# 项目启动入口
at com.example.demo.DemoApplication.main(DemoApplication.java:14)

? ? ? ? ? ? ? ? ? ?

MigrationInvoker:是否使用应用粒度服务发现invoker

public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {

    private synchronized void calcPreferredInvoker(MigrationRule migrationRule) {
        if (serviceDiscoveryInvoker == null || invoker == null) {
            return;
        }
        Set<MigrationAddressComparator> detectors = ScopeModelUtil.getApplicationModel(consumerUrl == null ? null : consumerUrl.getScopeModel())
            .getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
        if (CollectionUtils.isNotEmpty(detectors)) {
            // pick preferred invoker
            // the real invoker choice in invocation will be affected by promotion
            if (detectors.stream().allMatch(comparator -> comparator.shouldMigrate(serviceDiscoveryInvoker, invoker, migrationRule))) {
                                                                   //是否使用应用粒度服务发现invoker
                this.currentAvailableInvoker = serviceDiscoveryInvoker;
            } else {
                this.currentAvailableInvoker = invoker;
            }
        }
    }

????????????

DefaultMigrationAddressComparator:判断是否使用新地址

public class DefaultMigrationAddressComparator implements MigrationAddressComparator {
    private static final Logger logger = LoggerFactory.getLogger(DefaultMigrationAddressComparator.class);
    private static final String MIGRATION_THRESHOLD = "dubbo.application.migration.threshold";
    private static final String DEFAULT_THRESHOLD_STRING = "0.0";
    private static final float DEFAULT_THREAD = 0f;

    public static final String OLD_ADDRESS_SIZE = "OLD_ADDRESS_SIZE";
    public static final String NEW_ADDRESS_SIZE = "NEW_ADDRESS_SIZE";

    private Map<String, Map<String, Integer>> serviceMigrationData = new ConcurrentHashMap<>();

    @Override
    public <T> boolean shouldMigrate(ClusterInvoker<T> newInvoker, ClusterInvoker<T> oldInvoker, MigrationRule rule) {
        Map<String, Integer> migrationData = serviceMigrationData.computeIfAbsent(oldInvoker.getUrl().getDisplayServiceKey(), _k -> new ConcurrentHashMap<>());

        if (!newInvoker.hasProxyInvokers()) {
            migrationData.put(OLD_ADDRESS_SIZE, getAddressSize(oldInvoker));
            migrationData.put(NEW_ADDRESS_SIZE, -1);
            logger.info("No " + getInvokerType(newInvoker) + " address available, stop compare.");
            return false;
        }
        if (!oldInvoker.hasProxyInvokers()) {
            migrationData.put(OLD_ADDRESS_SIZE, -1);
            migrationData.put(NEW_ADDRESS_SIZE, getAddressSize(newInvoker));
            logger.info("No " + getInvokerType(oldInvoker) + " address available, stop compare.");
            return true;
        }

        int newAddressSize = getAddressSize(newInvoker);  //新地址大小
        int oldAddressSize = getAddressSize(oldInvoker);  //旧地址大小

        migrationData.put(OLD_ADDRESS_SIZE, oldAddressSize);
        migrationData.put(NEW_ADDRESS_SIZE, newAddressSize);

        String rawThreshold = null;
        Float configedThreshold = rule == null ? null : rule.getThreshold(oldInvoker.getUrl());
        if (configedThreshold != null && configedThreshold >= 0) {
            rawThreshold = String.valueOf(configedThreshold);
        }
        rawThreshold = StringUtils.isNotEmpty(rawThreshold) ? rawThreshold : ConfigurationUtils.getCachedDynamicProperty(MIGRATION_THRESHOLD, DEFAULT_THRESHOLD_STRING);
                       //比较阀值,dubbo.application.migration.threshold设置,默认为0.0
        float threshold;
        try {
            threshold = Float.parseFloat(rawThreshold);
        } catch (Exception e) {
            logger.error("Invalid migration threshold " + rawThreshold);
            threshold = DEFAULT_THREAD;
        }

        logger.info("serviceKey:" + oldInvoker.getUrl().getServiceKey() + " Instance address size " + newAddressSize + ", interface address size " + oldAddressSize + ", threshold " + threshold);

        if (newAddressSize != 0 && oldAddressSize == 0) {
            return true;   //新地址不等于0,旧地址等于0,表示只有3.x的地址,返回true
        }
        if (newAddressSize == 0 && oldAddressSize == 0) {
            return false;  //新地址等于0,旧地址等于0,,返回false
        }

        if (((float) newAddressSize / (float) oldAddressSize) >= threshold) {
            return true;   //新地址大小/旧地址大小 >=0,返回true
        }
        return false;  //如果都不满足,返回false,表示使用旧地址,不进行迁移
    }

    private <T> int getAddressSize(ClusterInvoker<T> invoker) {
        if (invoker == null) {
            return -1;
        }
        List<Invoker<T>> invokers = invoker.getDirectory().getAllInvokers();
        return CollectionUtils.isNotEmpty(invokers) ? invokers.size() : 0;
    }

    public Map<String, Integer> getAddressSize(String displayServiceKey) {
        return serviceMigrationData.get(displayServiceKey);
    }

    private String getInvokerType(ClusterInvoker<?> invoker) {
        if (invoker.isServiceDiscovery()) {
            return "instance";
        }
        return "interface";
    }


}

?? ? ? ? ? ? ? ?? ?

????????????????????????

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-26 11:37:32  更:2022-02-26 11:38:52 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 0:11:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码