IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> Dubbo集群容错与负载均衡策略(一)-集群容错策略 -> 正文阅读

[系统运维]Dubbo集群容错与负载均衡策略(一)-集群容错策略

Cluster层概述

在这里插入图片描述
Cluster层中包含Cluster、Directory、Router、LoadBalance几大核心接口
Cluster层是抽象概念,表示的是 对外的 整个集群容错层
Cluster接口 是 容错接口,提供 容错策略

Cluster(集群容错)的具体实现

用户可以在 <dubbo:service> <dubbo:reference> <dubbo:consumer> <dubbo:provider> 标签上 通过 cluster属性 设置

Cluster扩展接口

在这里插入图片描述

@SPI(Cluster.DEFAULT)
public interface Cluster {

    String DEFAULT = "failover";

    /**
     * Merge the directory invokers to a virtual invoker.
     * 合并多个invokers 成 一个虚拟的invoker
     *
     * @param <T>
     * @param directory
     * @return cluster invoker
     * @throws RpcException
     */
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;

	// 根据name获取Cluster的 扩展实现封装(wrap)类,默认是FailoverCluster
    static Cluster getCluster(String name) {
        return getCluster(name, true);
    }

    static Cluster getCluster(String name, boolean wrap) {
        if (StringUtils.isEmpty(name)) {
            name = Cluster.DEFAULT;
        }
        return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap);
    }
}

默认实现类是FailoverCluster

Cluster扩展接口 与 ClusterInvoker接口的关系

Cluster扩展接口下有多种实现类,每种实现类中都需要实现join方法,在方法中都会new一个对应的ClusterInvoker实现

AbstractCluster抽象类(待完善)

AbstractClusterInvoker抽象类

在这里插入图片描述

AbstractClusterInvoker实现了ClusterInvoker接口
ClusterInvoker接口继承Invoker
Invoker接口继承Node(节点)接口

Node(节点)接口:

  • getUrl
  • isAvailable 判断节点是否可用
  • destroy 销毁节点

Invoker接口:

  • getInterface 获取服务接口
  • invoke 调用

ClusterInvoker接口:

  • getRegistryUrl 获取注册中心地址
  • getDirectory 获取 服务目录
  • isDestroyed(待整理)
  • isServiceDiscovery(待整理)
  • hasProxyInvokers(待整理)

所以,AbstractClusterInvoker需要实现以上这些接口的功能

public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(AbstractClusterInvoker.class);

    protected Directory<T> directory;

    protected boolean availablecheck;

    private AtomicBoolean destroyed = new AtomicBoolean(false);

    private volatile Invoker<T> stickyInvoker = null;

    public AbstractClusterInvoker() {
    }

    public AbstractClusterInvoker(Directory<T> directory) {
        this(directory, directory.getUrl());
    }

    public AbstractClusterInvoker(Directory<T> directory, URL url) {
        if (directory == null) {
            throw new IllegalArgumentException("service directory == null");
        }

        this.directory = directory;
        //sticky: invoker.isAvailable() should always be checked before using when availablecheck is true.
        this.availablecheck = url.getParameter(CLUSTER_AVAILABLE_CHECK_KEY, DEFAULT_CLUSTER_AVAILABLE_CHECK);
    }

	// ...
}

AbstractClusterInvoker实现Node接口方法

	// 返回的是 服务目录中消费端地址
    @Override
    public URL getUrl() {
        return directory.getConsumerUrl();
    }
	
	// 返回invoker是否可用 或 服务目录是否可用
    @Override
    public boolean isAvailable() {
        Invoker<T> invoker = stickyInvoker;
        if (invoker != null) {
            return invoker.isAvailable();
        }
        return directory.isAvailable();
    }

	// 销毁服务目录
    @Override
    public void destroy() {
        if (destroyed.compareAndSet(false, true)) {
            directory.destroy();
        }
    }

directory方法可能是StaticDirectory、RegistryDirectory
它们的getConsumerUrl、isAvailable、destroy方法请见Dubbo-Directory(服务目录)、StaticDirectory、RegistryDirectory

AbstractClusterInvoker实现Invoker接口方法

    @Override
    public Class<T> getInterface() {
        return directory.getInterface();
    }

AbstractClusterInvoker#invoker(重要!!!)

    @Override
    public Result invoke(final Invocation invocation) throws RpcException {
    	// 是否被销毁
        checkWhetherDestroyed();

        // binding attachments into invocation.
        // 通过RpcContext上下文获取到额外参数,放入invocation
        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
        }

		// 通过invocation列举invokers 
        List<Invoker<T>> invokers = list(invocation);
        // 初始化负载均衡,整理负载均衡时再看
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        // 调用 doInvoke 进行后续集群容错等操作
        // 抽象方法,由子类实现
        // 这里的getUrl返回的是消费端的url
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }

AbstractClusterInvoker实现ClusterInvoker接口方法

    public URL getRegistryUrl() {
        return directory.getUrl();
    }

   	public Directory<T> getDirectory() {
        return directory;
    }

    @Override
    public boolean isDestroyed() {
        return destroyed.get();
    }

AbstractClusterInvoker其他方法(逐步添加)

    protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        return directory.list(invocation);
    }

涉及到directory的都后续整理

doInvoke

    protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                       LoadBalance loadbalance) throws RpcException;

checkInvokers

protected void checkInvokers(List<Invoker<T>> invokers, Invocation invocation) {
        if (CollectionUtils.isEmpty(invokers)) {
            throw new RpcException(RpcException.NO_INVOKER_AVAILABLE_AFTER_FILTER, "Failed to invoke the method "
                    + invocation.getMethodName() + " in the service " + getInterface().getName()
                    + ". No provider available for the service " + directory.getConsumerUrl().getServiceKey()
                    + " from registry " + directory.getUrl().getAddress()
                    + " on the consumer " + NetUtils.getLocalHost()
                    + " using the dubbo version " + Version.getVersion()
                    + ". Please check if the providers have been started and registered.");
        }
    }

Cluster(集群容错)总体工作流程

  1. 生成Invoker对象,不同的Cluster实现类 会生成 不同类型的 ClusterInvoker实例,然后调用ClusterInvoker#invoke方法,正式开始调用流程
  2. 获得 可调用的服务列表,首先会做前置校验,检查 远程服务 是否已经被销毁checkWhetherDestroyed。然后通过directory.list(invocation)方法获取 所有可用的服务列表,接着使用 Router接口 处理该服务列表,根据 路由规则 过滤一部分服务,最终返回 剩余的服务列表
  3. 做负载均衡initLoadBalance,通过 不同的 负载均衡策略 在2中的列表中选出一个服务,用作最后的调用。调用ExtensionLoader 获取 不同负载均衡策略 的 扩展点实现。如果是 异步调用 则 设置调用编号,接着调用 子类实现的doInvoker方法
  4. 做RPC调用,首先保存 每次调用的Invoker 到 RPC上下文-RpcContext,并做RPC调用,然后处理调用结果,对应调用出现的异常、成功、失败等情况,每种负载均衡策略 都会有不同的处理方式

集群容错策略(10种)

在这里插入图片描述
各节点关系:

  • 这里的Invoker 是 Provider的一个 可调用Service的抽象,Invoker封装了 Provider地址 及 Service接口信息
  • Directory代表多个Invoker,可以把它看成 List<Invoker> ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更
  • Cluster将Directory中的多个Invoker伪装成一个Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个
  • Router负责从多个Invoker中 按路由规则 选出子集,比如读写分离,应用隔离等
  • LoadBalance 负责从多个Invoker中 选出 具体的一个Invoker 用于本次调用,选的过程 包含了 负载均衡算法,调用失败后,需要重选

Failover Cluster-失败重试

Dubbo的默认容错机制,会对请求 做 负载均衡
当服务消费方 调用服务提供者 失败后,会 自动切换到 其他服务提供者服务器 进行重试
通常用于 读操作 或者 具有幂等的写操作
需要注意的是,重试会带来更长延迟,在下游机器负载已经达到极限时,重试容易加重下游服务的负载
可以通过retries="2"来设置重试次数(不含第1次)
可以使用<dubbo:reference retries="2"/>来进行接口级别配置的重试次数,当服务消费方调用服务失败后,此例子会再重试两次,也就是说最多会做3次调用,这种配置 对 该接口的所有方法 生效

也可以针对某个方法配置重试次数
在这里插入图片描述

Failfast Cluster-快速失败

该容错机制 会对请求 做 负载均衡,受 网络都抖动的影响较大
当服务消费方调用服务提供者失败后,立即报错,也就是只调用一次
通常,这种模式用于非幂等性的写操作

Failsafe Cluster-安全失败

当服务消费者调用服务出现异常时,直接忽略异常,不关心调用是否成功,也不想抛出异常影响外层调用
这种模式 通常用于 写入审计日志等操作

Failback Cluster-失败自动恢复

该容错机制 会对请求 做 负载均衡
当服务消费端调用服务出现异常后,在后台 记录 失败的请求,记录到失败队列中,并 由一个线程池 按照 一定的策略 后期再进行重试
这种模式 适用于 一些 异步 或 最终一致性的请求,通常用于消息通知操作

Forking Cluster-并行调用

当消费方调用一个接口方法后,Dubbo Client 会 并行调用 多个服务提供者的服务
只要其中有一个成功即返回
这种模式通常用于 实时性要求较高 的 读操作,但需要浪费更多服务资源
可通过forks="最大并行调用数n"来设置最大并行数,如下:
在这里插入图片描述
当 可用的服务数 < 最大并行调用数n,请求所有可用服务v

Broadcast Cluster-广播调用

当消费者调用一个接口方法后,Dubbo Client 会 逐个调用 所有服务提供者
任意一台服务器 调用异常 则 这次调用就标志失败
这种模式通常用于 通知 所有提供者 更新缓存或日志等本地资源信息

Mock Cluster

提供 在调用失败时,返回 伪造的响应结果,或 直接强制返回 伪造的结果,不会发起远程调用

Available Cluster

最简单的方式,请求不会做负载均衡,遍历所有服务列表,找到 第一个可用的节点,直接请求 并 返回结果
如果没有可用的节点,则 直接抛出异常

Mergeable Cluster

可以 自动 把 多个节点 请求得到的结果 进行合并
用户可以在<dubbo:reference>标签中通过merge="true"开启
合并时 可以通过 group="*"属性 指定 需要合并哪些分组的结果

默认会根据 方法的返回值 自动匹配 合并器,如果 同一个类型 有两个不同的合并器实现,则需要在参数中指定合并器的名字merge="合并器名"

ZoneAware Cluster

多注册中心订阅的场景,注册中心集群间的负载均衡
对于多注册中心间的选址策略有如下四种

  • 指定优先级:preferred="true"注册中心的地址将被优先选择
<dubbo:registry address="zookeeper://127.0.0.1:2181" preferred="true" />
  • 同中心优先:检查当前请求所属的区域,优先选择具有相同区域的注册中心
<dubbo:registry address="zookeeper://127.0.0.1:2181" zone="beijing" />
  • 权重轮询:根据每个注册中心的权重分配流量
<dubbo:registry id="beijing" address="zookeeper://127.0.0.1:2181" weight="100" />
<dubbo:registry id="shanghai" address="zookeeper://127.0.0.1:2182" weight="10" />
  • 缺省值:选择一个可用的注册中心

集群容错策略详解

先看一个简化的时序图
在这里插入图片描述
从中可以看到,调用集群容错 是在 服务降级策略 后 进行的
集群容错FailoverClusterInvoker内部 首先会 调用父类AbstractClusterInvoker的list方法 来获取 invoker列表
即 从RegistryDirectory管理的 RouterChain的route方法中 获取保存的invoker列表

Failover Cluster-失败重试

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        // 获取重试次数
        int len = calculateInvokeTimes(methodName);
        // retry loop.
        RpcException le = null; // last exception.
        // 已调用过的invokers
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            // 重试时,进行重新选择,避免 重试时invoker列表已经发生变化
            // 如果列表发生了变化,则invoked判断会失效
            if (i > 0) {
            	// 校验invoker是否已经被销毁,抛出异常
                checkWhetherDestroyed();
                // 通过AbstractClusterInvoker#list方法重新获得 所有服务提供者
                copyInvokers = list(invocation);
                // check again 重新检查
                checkInvokers(copyInvokers, invocation);
            }
            // 根据负载均衡策略选择一个invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            // 放入已调用的invoker列表中
            invoked.add(invoker);
            // 更新RpcContext上下文的invoker列表
            RpcContext.getContext().setInvokers((List) invoked);
            try {
            	// 发起远程调用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyInvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                	// 如果RpcException是业务异常,则直接抛出
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
            	// 封装成RpcException
                le = new RpcException(e.getMessage(), e);
            } finally {
            	// 把使用过的invoker的url放入到providers集合中
                providers.add(invoker.getUrl().getAddress());
            }
        }

		// 这里是循环外了
        throw new RpcException(le.getCode(), "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyInvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }

    private int calculateInvokeTimes(String methodName) {
    	// 从URL中获取方法名为methodName的参数retries的值,默认是2次
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        RpcContext rpcContext = RpcContext.getContext();
        // 从RpcContext上下文中获取当前重试次数
        Object retry = rpcContext.getObjectAttachment(RETRIES_KEY);
        if (null != retry && retry instanceof Number) {
        	// 重试次数+1,然后从RpcContext上下文中移除这个参数
            len = ((Number) retry).intValue() + 1;
            rpcContext.removeAttachment(RETRIES_KEY);
        }
        if (len <= 0) {
            len = 1;
        }
		
		// 返回 应 重试次数
        return len;
    }

}

如果第一次调用出现异常,则会循环,这时候i=1,所以会执行代码checkWhetherDestroyed以检查 是否 有线程调用了 当前ReferenceConfig的destroy方法,销毁了当前消费者
如果当前消费者实例已经被销毁,那么重试就没意义了,所以会抛出RpcException异常

如果当前消费者实例没有被销毁,则执行代码list(invocation)以重新获取当前服务提供者列表
这是因为从第一次调开始到现在可能提供者列表已经变化了,获取列表后
执行代码checkInvokers又进行了一次校验
如果校验通过,则执行代码Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);,以便根据负载均衡策略选择一个服务提供者,然后再次尝试调用

Failfast Cluster-快速失败

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public FailfastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    	// 校验从AbstractClusterInvoker传入的Invoker列表是否为空
        checkInvokers(invokers, invocation);
        // 使用负载均衡策略选择一个服务提供者
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
        	// 执行远程调用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
        	// 出错则抛出RpcException异常
            if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                    "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                            + " select from all providers " + invokers + " for service " + getInterface().getName()
                            + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                            + " use dubbo version " + Version.getVersion()
                            + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                    e.getCause() != null ? e.getCause() : e);
        }
    }
}

了解一下构造方法中directory.getUrl()的内容
在这里插入图片描述
在这里插入图片描述

Failsafe Cluster-安全失败

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);

    public FailsafeClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            // 如果调用出现异常 则 返回一个默认null结果
            return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
        }
    }
}
  • AsyncRpcResult这里可能会涉及到调用的同步异步,后续整理

Failback Cluster-失败自动恢复

    @Override
    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        Invoker<T> invoker = null;
        try {
            checkInvokers(invokers, invocation);
            invoker = select(loadbalance, invocation, invokers, null);
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                    + e.getMessage() + ", ", e);
            // 调用失败执行addFailed方法
            addFailed(loadbalance, invocation, invokers, invoker);
            return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
        }
    }

FailbackClusterInvoker类

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

	// 重试间隔
    private static final long RETRY_FAILED_PERIOD = 5;

	// 重试次数
    private final int retries;

	// 失败任务数
    private final int failbackTasks;

	// 定时器
    private volatile Timer failTimer;

    public FailbackClusterInvoker(Directory<T> directory) {
        super(directory);

        int retriesConfig = getUrl().getParameter(RETRIES_KEY, DEFAULT_FAILBACK_TIMES);
        if (retriesConfig <= 0) {
            retriesConfig = DEFAULT_FAILBACK_TIMES;
        }
        int failbackTasksConfig = getUrl().getParameter(FAIL_BACK_TASKS_KEY, DEFAULT_FAILBACK_TASKS);
        if (failbackTasksConfig <= 0) {
            failbackTasksConfig = DEFAULT_FAILBACK_TASKS;
        }
        retries = retriesConfig;
        failbackTasks = failbackTasksConfig;
    }

	// ...
}


    String RETRIES_KEY = "retries";
    int DEFAULT_FAILBACK_TIMES = 3;

    String FAIL_BACK_TASKS_KEY = "failbacktasks";
	int DEFAULT_FAILBACK_TASKS = 100;

FailbackClusterInvoker#addFailed

    private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
        if (failTimer == null) {
            synchronized (this) {
                if (failTimer == null) {
                	// new一个定时器
                 	// tickDuration=1 一个tick的时间间隔
                 	// unit=SECONDS 一个tick的时间间隔单位
                 	// ticksPerWheel 时间轮 的 长度,一圈下来有多少格
                 	// maxPendingTimeouts(failbackTasks) 最大挂起Timeouts数量
                    failTimer = new HashedWheelTimer(
                            new NamedThreadFactory("failback-cluster-timer", true),
                            1,
                            TimeUnit.SECONDS, 32, failbackTasks);
                }
            }
        }
        // 定时重试任务 内部类
        RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
        try {
        	// 将定时重试任务 加入到 定时器,延迟5s执行
            failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
        } catch (Throwable e) {
            logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
        }
    }

RetryTimerTask

实现了定时任务接口TimerTask,重点是run方法

private class RetryTimerTask implements TimerTask {
        private final Invocation invocation;
        private final LoadBalance loadbalance;
        private final List<Invoker<T>> invokers;
        private final int retries;
        private final long tick;
        private Invoker<T> lastInvoker;
        private int retryTimes = 0;

        RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
            this.loadbalance = loadbalance;
            this.invocation = invocation;
            this.invokers = invokers;
            this.retries = retries;
            this.tick = tick;
            this.lastInvoker=lastInvoker;
        }

        @Override
        public void run(Timeout timeout) {
            try {
            	// 负载均衡器 选择一个invoker
                Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
                lastInvoker = retryInvoker;
                retryInvoker.invoke(invocation);
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
                // 如果调用异常,重试次数+1,然后判断如果没有超过最大重试次数,则继续rePut
                if ((++retryTimes) >= retries) {
                    logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
                } else {
                    rePut(timeout);
                }
            }
        }

        private void rePut(Timeout timeout) {
            if (timeout == null) {
                return;
            }

            Timer timer = timeout.timer();
            if (timer.isStop() || timeout.isCancelled()) {
                return;
            }

			// 再次重试
            timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
        }
    }
  • TimerTask、Timeout 、Timer

Forking Cluster-并行调用

    String FORKS_KEY = "forks";
    int DEFAULT_FORKS = 2;

    String TIMEOUT_KEY = "timeout";
    int DEFAULT_TIMEOUT = 1000;
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {

    /**
     * Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}
     * which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
     */
    private final ExecutorService executor = Executors.newCachedThreadPool(
            new NamedInternalThreadFactory("forking-cluster-timer", true));

    public ForkingClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            // 已选择的invokers
            final List<Invoker<T>> selected;
            // 获取并行参数
            final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
            // 获取超时参数
            final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            if (forks <= 0 || forks >= invokers.size()) {
            	// 如果没有设置并行数量 或者 并行数量>invoker数量,则最多并行invokers数量
                selected = invokers;
            } else {
                selected = new ArrayList<>(forks);
                // 如果已选择的invoker个数 < 并行数量,则可以继续选择invoker
				// 目的是当invokers > forks时,不选超
                while (selected.size() < forks) {
                	// 先根据 复杂均衡选择一个invoker,然后判断是否在selected列表中,如果不在,则加入selected列表
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    if (!selected.contains(invoker)) {
                        //Avoid add the same invoker several times.
                        selected.add(invoker);
                    }
                }
            }
            // 更新上下文中的 已选择的invoker列表
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            // 链表阻塞队列
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            // 循环这个已选择过的列表selected
            for (final Invoker<T> invoker : selected) {
            	// 使用线程池执行
                executor.execute(() -> {
                    try {
                        Result result = invoker.invoke(invocation);
                        // 将 调用结果 添加到队列的最后一个元素
                        ref.offer(result);
                    } catch (Throwable e) {
                    	// 如果调用异常,则count自增1
                        int value = count.incrementAndGet();
                        // 如果value >= 已选择的invoker列表大小,将(最后一个)异常 添加搭配阻塞队列的最后一个元素
                        if (value >= selected.size()) {
                            ref.offer(e);
                        }
                    }
                });
            }
            try {
            	// 从队列中获取第一个元素返回结果
            	// 如果都失败了,则阻塞队列中只会有一个元素
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                if (ret instanceof Throwable) { // 如果是异常则封装成RpcException然后抛出
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                }
                // 不是异常返回结果
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
            }
        } finally {
            // clear attachments which is binding to current thread.
            RpcContext.getContext().clearAttachments();
        }
    }
}

count用来记录 远程调用过程中 出现异常的次数
如果 异常次数 大于等于 forks参数值,则说明 有forks个 并发调用全部失败了
这时候 将把异常 放到 并发安全的队列ref中

调用队列ref的poll方法,poll方法带有超时时间,并且当队列ref中含有元素 或者 超时时间到了的时候 返回
并发启动了forks个请求,只要有一个请求返回了,ref中就有元素了,这时poll方法就直接返回了远程调用的结果
如果forks个调用都失败了,由于 远程调用过程的超时时间 和 poll方法的超时时间相同,并且远程调用是先发起的,那么这时poll方法会返回 调用的异常信息

这里需要注意的是,即使poll返回了一个RPC调用的结果,其他并行的调用还是要继续进行的

Broadcast Cluster-广播(串行)调用

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
    private static final String BROADCAST_FAIL_PERCENT_KEY = "broadcast.fail.percent";
    private static final int MAX_BROADCAST_FAIL_PERCENT = 100;
    private static final int MIN_BROADCAST_FAIL_PERCENT = 0;

    public BroadcastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;
        URL url = getUrl();
        // The value range of broadcast.fail.threshold must be 0~100.
        // 100 means that an exception will be thrown last, and 0 means that as long as an exception occurs, it will be thrown.
        // see https://github.com/apache/dubbo/pull/7174
        // 广播失败百分比
        int broadcastFailPercent = url.getParameter(BROADCAST_FAIL_PERCENT_KEY, MAX_BROADCAST_FAIL_PERCENT);

        if (broadcastFailPercent < MIN_BROADCAST_FAIL_PERCENT || broadcastFailPercent > MAX_BROADCAST_FAIL_PERCENT) {
            logger.info(String.format("The value corresponding to the broadcast.fail.percent parameter must be between 0 and 100. " +
                    "The current setting is %s, which is reset to 100.", broadcastFailPercent));
            broadcastFailPercent = MAX_BROADCAST_FAIL_PERCENT;
        }

		// 阈值 = invokers*配置百分比/最大百分比
        int failThresholdIndex = invokers.size() * broadcastFailPercent / MAX_BROADCAST_FAIL_PERCENT;
        int failIndex = 0;
        for (Invoker<T> invoker : invokers) {
            try {
                result = invoker.invoke(invocation);
                if (null != result && result.hasException()) {
                    Throwable resultException = result.getException();
                    if (null != resultException) {
                        exception = getRpcException(result.getException());
                        logger.warn(exception.getMessage(), exception);
                        if (failIndex == failThresholdIndex) {
                        	// 如果达到阈值,就不再调用后面的invoker
                            break;
                        } else {
                            failIndex++;
                        }
                    }
                }
            } catch (Throwable e) {
                exception = getRpcException(e);
                logger.warn(exception.getMessage(), exception);
                if (failIndex == failThresholdIndex) {
                    break;
                } else {
                    failIndex++;
                }
            }
        }

        if (exception != null) {
            if (failIndex == failThresholdIndex) {
                logger.debug(
                        String.format("The number of BroadcastCluster call failures has reached the threshold %s", failThresholdIndex));
            } else {
                logger.debug(String.format("The number of BroadcastCluster call failures has not reached the threshold %s, fail size is %s",
                        failThresholdIndex, failIndex));
            }
            throw exception;
        }

        return result;
    }

    private RpcException getRpcException(Throwable throwable) {
        RpcException rpcException = null;
        if (throwable instanceof RpcException) {
            rpcException = (RpcException) throwable;
        } else {
            rpcException = new RpcException(throwable.getMessage(), throwable);
        }
        return rpcException;
    }
}

现在广播调用中,可以通过 broadcast.fail.percent 配置节点调用失败的比例
当达到这个比例后,BroadcastClusterInvoker 将不再调用其他节点,直接抛出异常
broadcast.fail.percent 取值在 0~100 范围内
默认情况下 当 全部调用失败后,才会抛出异常
broadcast.fail.percent 只是 控制 当失败后 是否 继续调用 其他节点,并不改变结果(任意一台报错则报错
broadcast.fail.percent 参数 在 dubbo2.7.10 及以上版本生效

ex:broadcast.fail.percent=20 代表了当 20% 的节点调用失败就抛出异常,不再调用其他节点

@reference(cluster = "broadcast", parameters = {"broadcast.fail.percent", "20"})

基于扩展接口自定义集群容错策略

  1. 首先需要实现Cluster接口,在join方法中要new自定义的ClusterInvoker
  2. 需要继承 AbstractClusterInvoker类 创建 自己的ClusterInvoker类
  3. 在org.apache.dubbo.rpc.cluster.Cluster目录下创建文件,并在文件里添加myCluster=xx.xx.xx.MyCluster
  4. dubbo:reference配置中使用自己的集群容错模式
  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章      下一篇文章      查看所有文章
加:2021-09-12 13:32:23  更:2021-09-12 13:34:08 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/15 15:06:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码