dubbo 应用升级
?????????
官网:https://dubbo.apache.org/zh/docs/migration/ ???????????
???????????????
????????????????????????????????????
应用升级
???????????
dubbo 3.x 升级依赖
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>3.0.5</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>5.2.0</version>
</dependency>
<!-- tri协议序列化使用,如不使用triple协议,可不导入 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>4.0.0-rc-2</version>
</dependency>
??????????????
在地址注册与发现环节,3.x(应用粒度、接口粒度)与2.x(接口粒度)是完全兼容的,升级步骤如下:
# 服务端双注册
升级jar包依赖到最新版本,配置双注册开关:dubbo.application.register-mode
register-mode可选值:all(默认)、interface(接口粒度)、
instance(应用粒度)
# 消费端双订阅
升级jar包依赖到最新版本,配置双订阅开关:dubbo.application.service-discovery.migration=APPLICATION_FIRST
service-discovery.migration可选值:APPLICATION_FIRST(应用粒度优先)、
FORCE_INTERFACE(强制使用接口粒度)、
FORCE_APPLICATION(强制使用应用粒度)
# 服务端单注册(注册粒度修改为应用粒度)
服务端在所有消费端切换到应用粒度注册后,变换为单注册:register-mode修改为instance
??????????????????
**************
服务端双注册
??????????
??????????????????
?????????????????
相关说明:
# 消费端服务调用影响
服务端升级到3.x后,同时向注册中心注册接口级、应用级的服务;
2.x、3.x的消费端均可从注册中心拉取到服务信息,不影响服务使用;
# 注册中心存储空间影响
双注册不可避免带来额外的存储需求,但是应用级别服务相对于接口粒度所需的存储空间很小;
对于一个普通集群,数应用粒度注册数据所需的存储空间大致为接口粒度注册数据1/100~1/1000
?????????????????
**************
消费端双订阅
???????
??????????????????
? ? ? ? ? ? ? ? ? ? ? ??
相关说明:
dubbo3.x 默认支持双订阅,可通过参数dubbo.application.service-discovery.migration设置;
# dubbo.application.service-discovery.migration可选值:
APPLICATION_FIRST:智能决策接口级、应用级地址(MigrationAddressComparator.shouldMigrate判断),双订阅(默认)
FORCE_INTERFACE:强制消费接口级地址,如无地址则报错,单订阅2.x地址
FORCE_APPLICATION:强制消费应用级地址,如无地址则报错,单订阅3.x地址
# APPLICATION_FIRST:双订阅说明
对于双订阅的场景,消费端虽然可同时持有 2.x 地址与 3.x 地址,
但选址过程中两份地址是完全隔离的:要么用2.x 地址,要么用3.x 地址,
不存在两份地址混合调用的情况,这个决策过程是在收到第一次地址通知后就完成了的。
???????????? ?????????????
??????????????????????? ???????????
????????????????????????????????????
订阅策略实现原理
???
application.yml
dubbo:
application:
name: dubbo-consumer
service-discovery:
migration: FORCE_INTERFACE #消费端订阅策略设置为接口粒度
registry:
protocol: zookeeper
address: localhost:2181
group: dubbo
#register-mode: instance
protocol:
name: dubbo
#port: 20880
server:
port: 8081
??????????
消费端获取订阅策略调用栈
# MigrationRule.getStep:获取订阅策略
at org.apache.dubbo.registry.client.migration.model.MigrationRule.getStep(MigrationRule.java:183)
at org.apache.dubbo.registry.client.migration.MigrationRuleHandler.doMigrate(MigrationRuleHandler.java:51)
- locked <0x259c> (a org.apache.dubbo.registry.client.migration.MigrationRuleHandler)
# refer操作
at org.apache.dubbo.registry.client.migration.MigrationRuleListener.onRefer(MigrationRuleListener.java:241)
at org.apache.dubbo.registry.integration.RegistryProtocol.interceptInvoker(RegistryProtocol.java:531)
at org.apache.dubbo.registry.integration.RegistryProtocol.doRefer(RegistryProtocol.java:500)
at org.apache.dubbo.registry.integration.RegistryProtocol.refer(RegistryProtocol.java:485)
at org.apache.dubbo.qos.protocol.QosProtocolWrapper.refer(QosProtocolWrapper.java:83)
at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:74)
at org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:71)
at org.apache.dubbo.rpc.protocol.ProtocolSerializationWrapper.refer(ProtocolSerializationWrapper.java:52)
at org.apache.dubbo.rpc.Protocol$Adaptive.refer(Protocol$Adaptive.java:-1)
# ReferenceConfig类
at org.apache.dubbo.config.ReferenceConfig.createInvokerForRemote(ReferenceConfig.java:481)
at org.apache.dubbo.config.ReferenceConfig.createProxy(ReferenceConfig.java:386)
at org.apache.dubbo.config.ReferenceConfig.init(ReferenceConfig.java:275)
- locked <0x259d> (a org.apache.dubbo.config.ReferenceConfig)
at org.apache.dubbo.config.ReferenceConfig.get(ReferenceConfig.java:216)
# SimpleReferenceCache类
at org.apache.dubbo.config.utils.SimpleReferenceCache.get(SimpleReferenceCache.java:110)
# DefaultModuleDeployer类
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.lambda$referServices$6(DefaultModuleDeployer.java:384)
at org.apache.dubbo.config.deploy.DefaultModuleDeployer$$Lambda$905/0x0000000801116b38.accept(Unknown Source:-1)
at java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4780)
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.referServices(DefaultModuleDeployer.java:364)
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.start(DefaultModuleDeployer.java:151)
- locked <0x259e> (a org.apache.dubbo.config.deploy.DefaultModuleDeployer)
# DubboDeployApplicationListener类
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onContextRefreshedEvent(DubboDeployApplicationListener.java:108)
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:98)
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:44)
# SimpleApplicationEventMulticaster类
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143)
# AbstractApplicationContext类
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:421)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:378)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:938)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586)
- locked <0x259f> (a java.lang.Object)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145)
# SpringApplication类
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:730)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:302)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1290)
# 应用启动入口
at com.example.demo.DemoApplication.main(DemoApplication.java:14)
???????????????
MigrationRule:获取订阅规则
public class MigrationRule {
public MigrationStep getStep(URL consumerURL) {
if (interfaceRules != null) {
SubMigrationRule rule = interfaceRules.get(consumerURL.getDisplayServiceKey());
if (rule != null) {
if (rule.getStep() != null) {
return rule.getStep();
}
}
}
if (applications != null) {
ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension(consumerURL.getScopeModel());
Set<String> services = serviceNameMapping.getServices(consumerURL);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
SubMigrationRule rule = applicationRules.get(service);
if (rule.getStep() != null) {
return rule.getStep();
}
}
}
}
/**
* FIXME, it's really hard to follow setting default values here.
*/
if (step == null) { //如果step为null,默认将其设置为APPLICATION_FIRST
// initial step : APPLICATION_FIRST
step = MigrationStep.APPLICATION_FIRST;
step = Enum.valueOf(MigrationStep.class,
consumerURL.getParameter(MIGRATION_STEP_KEY,
ConfigurationUtils.getCachedDynamicProperty(consumerURL.getScopeModel(), DUBBO_SERVICEDISCOVERY_MIGRATION, step.name())));
//从配置中心读取step配置,key为dubbo.application.service-discovery.migration
}
return step;
}
??????????????????
???????????????
MigrationStep:订阅策略
public enum MigrationStep {
FORCE_INTERFACE,
APPLICATION_FIRST,
FORCE_APPLICATION
}
???????????????
????????????????
????????????????????????????????????
双订阅选址实现原理
???
application.yml
dubbo:
application:
name: dubbo-consumer
#service-discovery:
# migration: FORCE_INTERFACE #设置消费端订阅模式,默认APPLICATION_FIRST
registry:
protocol: zookeeper
address: localhost:2181
group: dubbo
#register-mode: instance
protocol:
name: dubbo
#port: 20880
server:
port: 8081
??????????????
消费端双订阅地址选址决策调用栈
# MigrationInvoker.calcPreferredInvoker:选择使用的invoker
at org.apache.dubbo.registry.client.migration.MigrationInvoker.calcPreferredInvoker(MigrationInvoker.java:472)
- locked <0x290a> (a org.apache.dubbo.registry.client.migration.MigrationInvoker)
at org.apache.dubbo.registry.client.migration.MigrationInvoker.migrateToApplicationFirstInvoker(MigrationInvoker.java:244)
# MigrationRuleHandler类
at org.apache.dubbo.registry.client.migration.MigrationRuleHandler.refreshInvoker(MigrationRuleHandler.java:73)
at org.apache.dubbo.registry.client.migration.MigrationRuleHandler.doMigrate(MigrationRuleHandler.java:57)
- locked <0x2947> (a org.apache.dubbo.registry.client.migration.MigrationRuleHandler)
# refer操作
at org.apache.dubbo.registry.client.migration.MigrationRuleListener.onRefer(MigrationRuleListener.java:241)
at org.apache.dubbo.registry.integration.RegistryProtocol.interceptInvoker(RegistryProtocol.java:531)
at org.apache.dubbo.registry.integration.RegistryProtocol.doRefer(RegistryProtocol.java:500)
at org.apache.dubbo.registry.integration.RegistryProtocol.refer(RegistryProtocol.java:485)
at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:74)
at org.apache.dubbo.qos.protocol.QosProtocolWrapper.refer(QosProtocolWrapper.java:83)
at org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:71)
at org.apache.dubbo.rpc.protocol.ProtocolSerializationWrapper.refer(ProtocolSerializationWrapper.java:52)
at org.apache.dubbo.rpc.Protocol$Adaptive.refer(Protocol$Adaptive.java:-1)
# ReferenceConfig类
at org.apache.dubbo.config.ReferenceConfig.createInvokerForRemote(ReferenceConfig.java:481)
at org.apache.dubbo.config.ReferenceConfig.createProxy(ReferenceConfig.java:386)
at org.apache.dubbo.config.ReferenceConfig.init(ReferenceConfig.java:275)
- locked <0x2948> (a org.apache.dubbo.config.ReferenceConfig)
at org.apache.dubbo.config.ReferenceConfig.get(ReferenceConfig.java:216)
# SimpleReferenceCache类
at org.apache.dubbo.config.utils.SimpleReferenceCache.get(SimpleReferenceCache.java:110)
# DefaultModuleDeployer类
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.lambda$referServices$6(DefaultModuleDeployer.java:384)
at org.apache.dubbo.config.deploy.DefaultModuleDeployer$$Lambda$905/0x0000000801117538.accept(Unknown Source:-1)
at java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4780)
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.referServices(DefaultModuleDeployer.java:364)
at org.apache.dubbo.config.deploy.DefaultModuleDeployer.start(DefaultModuleDeployer.java:151)
- locked <0x2949> (a org.apache.dubbo.config.deploy.DefaultModuleDeployer)
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onContextRefreshedEvent(DubboDeployApplicationListener.java:108)
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:98)
at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:44)
# SimpleApplicationEventMulticaster类
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143)
# AbstractApplicationContext类
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:421)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:378)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:938)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586)
- locked <0x294a> (a java.lang.Object)
# ServletWebServerApplicationContext类
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145)
# SpringApplication类
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:730)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:302)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1290)
# 项目启动入口
at com.example.demo.DemoApplication.main(DemoApplication.java:14)
? ? ? ? ? ? ? ? ? ?
MigrationInvoker:是否使用应用粒度服务发现invoker
public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
private synchronized void calcPreferredInvoker(MigrationRule migrationRule) {
if (serviceDiscoveryInvoker == null || invoker == null) {
return;
}
Set<MigrationAddressComparator> detectors = ScopeModelUtil.getApplicationModel(consumerUrl == null ? null : consumerUrl.getScopeModel())
.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
if (CollectionUtils.isNotEmpty(detectors)) {
// pick preferred invoker
// the real invoker choice in invocation will be affected by promotion
if (detectors.stream().allMatch(comparator -> comparator.shouldMigrate(serviceDiscoveryInvoker, invoker, migrationRule))) {
//是否使用应用粒度服务发现invoker
this.currentAvailableInvoker = serviceDiscoveryInvoker;
} else {
this.currentAvailableInvoker = invoker;
}
}
}
????????????
DefaultMigrationAddressComparator:判断是否使用新地址
public class DefaultMigrationAddressComparator implements MigrationAddressComparator {
private static final Logger logger = LoggerFactory.getLogger(DefaultMigrationAddressComparator.class);
private static final String MIGRATION_THRESHOLD = "dubbo.application.migration.threshold";
private static final String DEFAULT_THRESHOLD_STRING = "0.0";
private static final float DEFAULT_THREAD = 0f;
public static final String OLD_ADDRESS_SIZE = "OLD_ADDRESS_SIZE";
public static final String NEW_ADDRESS_SIZE = "NEW_ADDRESS_SIZE";
private Map<String, Map<String, Integer>> serviceMigrationData = new ConcurrentHashMap<>();
@Override
public <T> boolean shouldMigrate(ClusterInvoker<T> newInvoker, ClusterInvoker<T> oldInvoker, MigrationRule rule) {
Map<String, Integer> migrationData = serviceMigrationData.computeIfAbsent(oldInvoker.getUrl().getDisplayServiceKey(), _k -> new ConcurrentHashMap<>());
if (!newInvoker.hasProxyInvokers()) {
migrationData.put(OLD_ADDRESS_SIZE, getAddressSize(oldInvoker));
migrationData.put(NEW_ADDRESS_SIZE, -1);
logger.info("No " + getInvokerType(newInvoker) + " address available, stop compare.");
return false;
}
if (!oldInvoker.hasProxyInvokers()) {
migrationData.put(OLD_ADDRESS_SIZE, -1);
migrationData.put(NEW_ADDRESS_SIZE, getAddressSize(newInvoker));
logger.info("No " + getInvokerType(oldInvoker) + " address available, stop compare.");
return true;
}
int newAddressSize = getAddressSize(newInvoker); //新地址大小
int oldAddressSize = getAddressSize(oldInvoker); //旧地址大小
migrationData.put(OLD_ADDRESS_SIZE, oldAddressSize);
migrationData.put(NEW_ADDRESS_SIZE, newAddressSize);
String rawThreshold = null;
Float configedThreshold = rule == null ? null : rule.getThreshold(oldInvoker.getUrl());
if (configedThreshold != null && configedThreshold >= 0) {
rawThreshold = String.valueOf(configedThreshold);
}
rawThreshold = StringUtils.isNotEmpty(rawThreshold) ? rawThreshold : ConfigurationUtils.getCachedDynamicProperty(MIGRATION_THRESHOLD, DEFAULT_THRESHOLD_STRING);
//比较阀值,dubbo.application.migration.threshold设置,默认为0.0
float threshold;
try {
threshold = Float.parseFloat(rawThreshold);
} catch (Exception e) {
logger.error("Invalid migration threshold " + rawThreshold);
threshold = DEFAULT_THREAD;
}
logger.info("serviceKey:" + oldInvoker.getUrl().getServiceKey() + " Instance address size " + newAddressSize + ", interface address size " + oldAddressSize + ", threshold " + threshold);
if (newAddressSize != 0 && oldAddressSize == 0) {
return true; //新地址不等于0,旧地址等于0,表示只有3.x的地址,返回true
}
if (newAddressSize == 0 && oldAddressSize == 0) {
return false; //新地址等于0,旧地址等于0,,返回false
}
if (((float) newAddressSize / (float) oldAddressSize) >= threshold) {
return true; //新地址大小/旧地址大小 >=0,返回true
}
return false; //如果都不满足,返回false,表示使用旧地址,不进行迁移
}
private <T> int getAddressSize(ClusterInvoker<T> invoker) {
if (invoker == null) {
return -1;
}
List<Invoker<T>> invokers = invoker.getDirectory().getAllInvokers();
return CollectionUtils.isNotEmpty(invokers) ? invokers.size() : 0;
}
public Map<String, Integer> getAddressSize(String displayServiceKey) {
return serviceMigrationData.get(displayServiceKey);
}
private String getInvokerType(ClusterInvoker<?> invoker) {
if (invoker.isServiceDiscovery()) {
return "instance";
}
return "interface";
}
}
?? ? ? ? ? ? ? ?? ?
????????????????????????
|