Cluster层概述
Cluster层中包含Cluster、Directory、Router、LoadBalance几大核心接口 Cluster层是抽象概念,表示的是 对外的 整个集群容错层 Cluster接口 是 容错接口,提供 容错策略
Cluster(集群容错)的具体实现
用户可以在 <dubbo:service> <dubbo:reference> <dubbo:consumer> <dubbo:provider> 标签上 通过 cluster属性 设置
Cluster扩展接口
@SPI(Cluster.DEFAULT)
public interface Cluster {
String DEFAULT = "failover";
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;
static Cluster getCluster(String name) {
return getCluster(name, true);
}
static Cluster getCluster(String name, boolean wrap) {
if (StringUtils.isEmpty(name)) {
name = Cluster.DEFAULT;
}
return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap);
}
}
默认实现类是FailoverCluster
Cluster扩展接口 与 ClusterInvoker接口的关系
Cluster扩展接口下有多种实现类,每种实现类中都需要实现join 方法,在方法中都会new一个对应的ClusterInvoker实现
AbstractCluster抽象类(待完善)
AbstractClusterInvoker抽象类
AbstractClusterInvoker实现了ClusterInvoker接口 ClusterInvoker接口继承Invoker Invoker接口继承Node(节点)接口
Node(节点)接口:
- getUrl
- isAvailable 判断节点是否可用
- destroy 销毁节点
Invoker接口:
- getInterface 获取服务接口
- invoke 调用
ClusterInvoker接口:
- getRegistryUrl 获取注册中心地址
- getDirectory 获取 服务目录
- isDestroyed(待整理)
- isServiceDiscovery(待整理)
- hasProxyInvokers(待整理)
所以,AbstractClusterInvoker需要实现以上这些接口的功能
public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(AbstractClusterInvoker.class);
protected Directory<T> directory;
protected boolean availablecheck;
private AtomicBoolean destroyed = new AtomicBoolean(false);
private volatile Invoker<T> stickyInvoker = null;
public AbstractClusterInvoker() {
}
public AbstractClusterInvoker(Directory<T> directory) {
this(directory, directory.getUrl());
}
public AbstractClusterInvoker(Directory<T> directory, URL url) {
if (directory == null) {
throw new IllegalArgumentException("service directory == null");
}
this.directory = directory;
this.availablecheck = url.getParameter(CLUSTER_AVAILABLE_CHECK_KEY, DEFAULT_CLUSTER_AVAILABLE_CHECK);
}
}
AbstractClusterInvoker实现Node接口方法
@Override
public URL getUrl() {
return directory.getConsumerUrl();
}
@Override
public boolean isAvailable() {
Invoker<T> invoker = stickyInvoker;
if (invoker != null) {
return invoker.isAvailable();
}
return directory.isAvailable();
}
@Override
public void destroy() {
if (destroyed.compareAndSet(false, true)) {
directory.destroy();
}
}
directory方法可能是StaticDirectory、RegistryDirectory 它们的getConsumerUrl、isAvailable、destroy 方法请见Dubbo-Directory(服务目录)、StaticDirectory、RegistryDirectory
AbstractClusterInvoker实现Invoker接口方法
@Override
public Class<T> getInterface() {
return directory.getInterface();
}
AbstractClusterInvoker#invoker(重要!!!)
@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
AbstractClusterInvoker实现ClusterInvoker接口方法
public URL getRegistryUrl() {
return directory.getUrl();
}
public Directory<T> getDirectory() {
return directory;
}
@Override
public boolean isDestroyed() {
return destroyed.get();
}
AbstractClusterInvoker其他方法(逐步添加)
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
return directory.list(invocation);
}
涉及到directory的都后续整理
doInvoke
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
LoadBalance loadbalance) throws RpcException;
checkInvokers
protected void checkInvokers(List<Invoker<T>> invokers, Invocation invocation) {
if (CollectionUtils.isEmpty(invokers)) {
throw new RpcException(RpcException.NO_INVOKER_AVAILABLE_AFTER_FILTER, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". No provider available for the service " + directory.getConsumerUrl().getServiceKey()
+ " from registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion()
+ ". Please check if the providers have been started and registered.");
}
}
Cluster(集群容错)总体工作流程
- 生成Invoker对象,不同的Cluster实现类 会生成 不同类型的 ClusterInvoker实例,然后调用ClusterInvoker#invoke方法,正式开始调用流程
- 获得 可调用的服务列表,首先会做前置校验,检查 远程服务 是否已经被销毁
checkWhetherDestroyed 。然后通过directory.list(invocation) 方法获取 所有可用的服务列表,接着使用 Router接口 处理该服务列表,根据 路由规则 过滤一部分服务,最终返回 剩余的服务列表 - 做负载均衡
initLoadBalance ,通过 不同的 负载均衡策略 在2中的列表中选出一个服务,用作最后的调用。调用ExtensionLoader 获取 不同负载均衡策略 的 扩展点实现。如果是 异步调用 则 设置调用编号,接着调用 子类实现的doInvoker 方法 - 做RPC调用,首先保存 每次调用的Invoker 到 RPC上下文-RpcContext,并做RPC调用,然后处理调用结果,对应调用出现的异常、成功、失败等情况,每种负载均衡策略 都会有不同的处理方式
集群容错策略(10种)
各节点关系:
- 这里的Invoker 是 Provider的一个 可调用Service的抽象,Invoker封装了 Provider地址 及 Service接口信息
- Directory代表多个Invoker,可以把它看成 List<Invoker> ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更
- Cluster将Directory中的多个Invoker伪装成一个Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个
- Router负责从多个Invoker中 按路由规则 选出子集,比如读写分离,应用隔离等
- LoadBalance 负责从多个Invoker中 选出 具体的一个Invoker 用于本次调用,选的过程 包含了 负载均衡算法,调用失败后,需要重选
Failover Cluster-失败重试
Dubbo的默认容错机制,会对请求 做 负载均衡 当服务消费方 调用服务提供者 失败后,会 自动切换到 其他服务提供者服务器 进行重试 这 通常用于 读操作 或者 具有幂等的写操作 需要注意的是,重试会带来更长延迟,在下游机器负载已经达到极限时,重试容易加重下游服务的负载 可以通过retries="2" 来设置重试次数(不含第1次) 可以使用<dubbo:reference retries="2"/> 来进行接口级别配置的重试次数,当服务消费方调用服务失败后,此例子会再重试两次,也就是说最多会做3次调用,这种配置 对 该接口的所有方法 生效
也可以针对某个方法配置重试次数
Failfast Cluster-快速失败
该容错机制 会对请求 做 负载均衡,受 网络都抖动的影响较大 当服务消费方调用服务提供者失败后,立即报错,也就是只调用一次 通常,这种模式用于非幂等性的写操作
Failsafe Cluster-安全失败
当服务消费者调用服务出现异常时,直接忽略异常,不关心调用是否成功,也不想抛出异常影响外层调用 这种模式 通常用于 写入审计日志等操作
Failback Cluster-失败自动恢复
该容错机制 会对请求 做 负载均衡 当服务消费端调用服务出现异常后,在后台 记录 失败的请求,记录到失败队列中,并 由一个线程池 按照 一定的策略 后期再进行重试 这种模式 适用于 一些 异步 或 最终一致性的请求,通常用于消息通知操作
Forking Cluster-并行调用
当消费方调用一个接口方法后,Dubbo Client 会 并行调用 多个服务提供者的服务 只要其中有一个成功即返回 这种模式通常用于 实时性要求较高 的 读操作,但需要浪费更多服务资源 可通过forks="最大并行调用数n" 来设置最大并行数,如下: 当 可用的服务数 < 最大并行调用数n,请求所有可用服务v
Broadcast Cluster-广播调用
当消费者调用一个接口方法后,Dubbo Client 会 逐个调用 所有服务提供者 任意一台服务器 调用异常 则 这次调用就标志失败 这种模式通常用于 通知 所有提供者 更新缓存或日志等本地资源信息
Mock Cluster
提供 在调用失败时,返回 伪造的响应结果,或 直接强制返回 伪造的结果,不会发起远程调用
Available Cluster
最简单的方式,请求不会做负载均衡,遍历所有服务列表,找到 第一个可用的节点,直接请求 并 返回结果 如果没有可用的节点,则 直接抛出异常
Mergeable Cluster
可以 自动 把 多个节点 请求得到的结果 进行合并 用户可以在<dubbo:reference> 标签中通过merge="true" 开启 合并时 可以通过 group="*" 属性 指定 需要合并哪些分组的结果
默认会根据 方法的返回值 自动匹配 合并器,如果 同一个类型 有两个不同的合并器实现,则需要在参数中指定合并器的名字merge="合并器名"
ZoneAware Cluster
多注册中心订阅的场景,注册中心集群间的负载均衡 对于多注册中心间的选址策略有如下四种
- 指定优先级:
preferred="true" 注册中心的地址将被优先选择
<dubbo:registry address="zookeeper://127.0.0.1:2181" preferred="true" />
- 同中心优先:检查当前请求所属的区域,优先选择具有相同区域的注册中心
<dubbo:registry address="zookeeper://127.0.0.1:2181" zone="beijing" />
<dubbo:registry id="beijing" address="zookeeper://127.0.0.1:2181" weight="100" />
<dubbo:registry id="shanghai" address="zookeeper://127.0.0.1:2182" weight="10" />
集群容错策略详解
先看一个简化的时序图 从中可以看到,调用集群容错 是在 服务降级策略 后 进行的 集群容错FailoverClusterInvoker内部 首先会 调用父类AbstractClusterInvoker的list 方法 来获取 invoker列表 即 从RegistryDirectory管理的 RouterChain的route 方法中 获取保存的invoker列表
Failover Cluster-失败重试
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = calculateInvokeTimes(methodName);
RpcException le = null;
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
checkInvokers(copyInvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
private int calculateInvokeTimes(String methodName) {
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
RpcContext rpcContext = RpcContext.getContext();
Object retry = rpcContext.getObjectAttachment(RETRIES_KEY);
if (null != retry && retry instanceof Number) {
len = ((Number) retry).intValue() + 1;
rpcContext.removeAttachment(RETRIES_KEY);
}
if (len <= 0) {
len = 1;
}
return len;
}
}
如果第一次调用出现异常,则会循环,这时候i=1,所以会执行代码checkWhetherDestroyed 以检查 是否 有线程调用了 当前ReferenceConfig的destroy方法,销毁了当前消费者 如果当前消费者实例已经被销毁,那么重试就没意义了,所以会抛出RpcException异常
如果当前消费者实例没有被销毁,则执行代码list(invocation) 以重新获取当前服务提供者列表 这是因为从第一次调开始到现在可能提供者列表已经变化了,获取列表后 执行代码checkInvokers 又进行了一次校验 如果校验通过,则执行代码Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); ,以便根据负载均衡策略选择一个服务提供者,然后再次尝试调用
Failfast Cluster-快速失败
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
public FailfastClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) {
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
"Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
+ " select from all providers " + invokers + " for service " + getInterface().getName()
+ " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
e.getCause() != null ? e.getCause() : e);
}
}
}
了解一下构造方法中directory.getUrl() 的内容
Failsafe Cluster-安全失败
public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);
public FailsafeClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation);
}
}
}
Failback Cluster-失败自动恢复
@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
Invoker<T> invoker = null;
try {
checkInvokers(invokers, invocation);
invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
addFailed(loadbalance, invocation, invokers, invoker);
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation);
}
}
FailbackClusterInvoker类
public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);
private static final long RETRY_FAILED_PERIOD = 5;
private final int retries;
private final int failbackTasks;
private volatile Timer failTimer;
public FailbackClusterInvoker(Directory<T> directory) {
super(directory);
int retriesConfig = getUrl().getParameter(RETRIES_KEY, DEFAULT_FAILBACK_TIMES);
if (retriesConfig <= 0) {
retriesConfig = DEFAULT_FAILBACK_TIMES;
}
int failbackTasksConfig = getUrl().getParameter(FAIL_BACK_TASKS_KEY, DEFAULT_FAILBACK_TASKS);
if (failbackTasksConfig <= 0) {
failbackTasksConfig = DEFAULT_FAILBACK_TASKS;
}
retries = retriesConfig;
failbackTasks = failbackTasksConfig;
}
}
String RETRIES_KEY = "retries";
int DEFAULT_FAILBACK_TIMES = 3;
String FAIL_BACK_TASKS_KEY = "failbacktasks";
int DEFAULT_FAILBACK_TASKS = 100;
FailbackClusterInvoker#addFailed
private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
if (failTimer == null) {
synchronized (this) {
if (failTimer == null) {
failTimer = new HashedWheelTimer(
new NamedThreadFactory("failback-cluster-timer", true),
1,
TimeUnit.SECONDS, 32, failbackTasks);
}
}
}
RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
try {
failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
} catch (Throwable e) {
logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
}
}
RetryTimerTask
实现了定时任务接口TimerTask,重点是run方法
private class RetryTimerTask implements TimerTask {
private final Invocation invocation;
private final LoadBalance loadbalance;
private final List<Invoker<T>> invokers;
private final int retries;
private final long tick;
private Invoker<T> lastInvoker;
private int retryTimes = 0;
RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
this.loadbalance = loadbalance;
this.invocation = invocation;
this.invokers = invokers;
this.retries = retries;
this.tick = tick;
this.lastInvoker=lastInvoker;
}
@Override
public void run(Timeout timeout) {
try {
Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
lastInvoker = retryInvoker;
retryInvoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
if ((++retryTimes) >= retries) {
logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
} else {
rePut(timeout);
}
}
}
private void rePut(Timeout timeout) {
if (timeout == null) {
return;
}
Timer timer = timeout.timer();
if (timer.isStop() || timeout.isCancelled()) {
return;
}
timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
}
}
Forking Cluster-并行调用
String FORKS_KEY = "forks";
int DEFAULT_FORKS = 2;
String TIMEOUT_KEY = "timeout";
int DEFAULT_TIMEOUT = 1000;
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
private final ExecutorService executor = Executors.newCachedThreadPool(
new NamedInternalThreadFactory("forking-cluster-timer", true));
public ForkingClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<>(forks);
while (selected.size() < forks) {
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {
selected.add(invoker);
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
for (final Invoker<T> invoker : selected) {
executor.execute(() -> {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
}
}
});
}
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
} finally {
RpcContext.getContext().clearAttachments();
}
}
}
count用来记录 远程调用过程中 出现异常的次数 如果 异常次数 大于等于 forks参数值,则说明 有forks个 并发调用全部失败了 这时候 将把异常 放到 并发安全的队列ref中
调用队列ref的poll 方法,poll 方法带有超时时间,并且当队列ref中含有元素 或者 超时时间到了的时候 返回 并发启动了forks个请求,只要有一个请求返回了,ref中就有元素了,这时poll 方法就直接返回了远程调用的结果 如果forks个调用都失败了,由于 远程调用过程的超时时间 和 poll 方法的超时时间相同,并且远程调用是先发起的,那么这时poll 方法会返回 调用的异常信息
这里需要注意的是,即使poll 返回了一个RPC调用的结果,其他并行的调用还是要继续进行的
Broadcast Cluster-广播(串行)调用
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
private static final String BROADCAST_FAIL_PERCENT_KEY = "broadcast.fail.percent";
private static final int MAX_BROADCAST_FAIL_PERCENT = 100;
private static final int MIN_BROADCAST_FAIL_PERCENT = 0;
public BroadcastClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
URL url = getUrl();
int broadcastFailPercent = url.getParameter(BROADCAST_FAIL_PERCENT_KEY, MAX_BROADCAST_FAIL_PERCENT);
if (broadcastFailPercent < MIN_BROADCAST_FAIL_PERCENT || broadcastFailPercent > MAX_BROADCAST_FAIL_PERCENT) {
logger.info(String.format("The value corresponding to the broadcast.fail.percent parameter must be between 0 and 100. " +
"The current setting is %s, which is reset to 100.", broadcastFailPercent));
broadcastFailPercent = MAX_BROADCAST_FAIL_PERCENT;
}
int failThresholdIndex = invokers.size() * broadcastFailPercent / MAX_BROADCAST_FAIL_PERCENT;
int failIndex = 0;
for (Invoker<T> invoker : invokers) {
try {
result = invoker.invoke(invocation);
if (null != result && result.hasException()) {
Throwable resultException = result.getException();
if (null != resultException) {
exception = getRpcException(result.getException());
logger.warn(exception.getMessage(), exception);
if (failIndex == failThresholdIndex) {
break;
} else {
failIndex++;
}
}
}
} catch (Throwable e) {
exception = getRpcException(e);
logger.warn(exception.getMessage(), exception);
if (failIndex == failThresholdIndex) {
break;
} else {
failIndex++;
}
}
}
if (exception != null) {
if (failIndex == failThresholdIndex) {
logger.debug(
String.format("The number of BroadcastCluster call failures has reached the threshold %s", failThresholdIndex));
} else {
logger.debug(String.format("The number of BroadcastCluster call failures has not reached the threshold %s, fail size is %s",
failThresholdIndex, failIndex));
}
throw exception;
}
return result;
}
private RpcException getRpcException(Throwable throwable) {
RpcException rpcException = null;
if (throwable instanceof RpcException) {
rpcException = (RpcException) throwable;
} else {
rpcException = new RpcException(throwable.getMessage(), throwable);
}
return rpcException;
}
}
现在广播调用中,可以通过 broadcast.fail.percent 配置节点调用失败的比例 当达到这个比例后,BroadcastClusterInvoker 将不再调用其他节点,直接抛出异常 broadcast.fail.percent 取值在 0~100 范围内 默认情况下 当 全部调用失败后,才会抛出异常 broadcast.fail.percent 只是 控制 当失败后 是否 继续调用 其他节点,并不改变结果(任意一台报错则报错 broadcast.fail.percent 参数 在 dubbo2.7.10 及以上版本生效
ex:broadcast.fail.percent=20 代表了当 20% 的节点调用失败就抛出异常,不再调用其他节点
@reference(cluster = "broadcast", parameters = {"broadcast.fail.percent", "20"})
基于扩展接口自定义集群容错策略
- 首先需要实现Cluster接口,在join方法中要new自定义的ClusterInvoker
- 需要继承 AbstractClusterInvoker类 创建 自己的ClusterInvoker类
- 在org.apache.dubbo.rpc.cluster.Cluster目录下创建文件,并在文件里添加myCluster=xx.xx.xx.MyCluster
- 在
dubbo:reference 配置中使用自己的集群容错模式
|