IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Spring Cloud Alibaba 核心组件解析 -> 正文阅读

[Java知识库]Spring Cloud Alibaba 核心组件解析

一. Nacos

1. 概述

对于 Nacos 是什么,有什么用,怎么用,就不再赘述了。

官网地址:https://nacos.io/zh-cn/

Nacos 系统架构图:
在这里插入图片描述
主要包括:

  • Provider APP:提供可复用可调用服务的应用。
  • Consumer APP:会对某个服务发起调用的应用方。
  • Name Server:分布式系统中通过 DNS、VIP(Vritual IP)或者地址来实现集群的服务路由。
  • Nacos Server:Nacos 服务的提供者。
    • OpenAPI:提供给外部的功能访问入口。
    • Config Service:配置服务。
    • Naming Service:提供分布式系统中所有对象(Object)、实体(Entity)的“名字”到关联的元数据之间的映射管理服务。
    • Nacos Core:Nacos 的核心源码部分。
    • Consistency Protocol:保证分布式共识的协议算法。
  • Nacos Console:Nacos 的控制台中心。

2. 服务注册发现实现原理

对于 Nacos 作为注册中心的源码分析主要包括三个方面:

  • 服务的注册;
  • 服务地址的获取;
  • 服务地址变化的感知;

2.1 服务注册

Spring Cloud 提供了服务注册的接口标准,集成到 Spring Cloud 中的注册中心会实现这些方法。

public interface ServiceRegistry<R extends Registration> {

    /**
     * 注册服务方法
     * @param registration
     */
    void register(R registration);

    /**
     * 取消注册服务方法
     * @param registration
     */
    void deregister(R registration);

    /**
     * 关闭注册中心方法(生命周期方法)
     */
    void close();

    /**
     * 设置注册服务的状态
     * @param registration
     * @param status
     */
    void setStatus(R registration, String status);

    /**
     * 获取注册服务的状态
     * @param registration
     * @param <T>
     * @return
     */
    <T> T getStatus(R registration);
}

Nacos 通过实现类 NacosServiceRegistry 实现了这些方法完成对 Nacos 的操作。

并且在SpringCloud 整合的时候,为容器导入了一个 AutoServiceRegistration 类,有一个 AbstractAutoServiceRegistration 类继承了该类,并且实现了如下方法,在 WebServerInitializedEvent 监听服务器初始化事件的发生,然后在 bind() 方法中的 start() 方法中会调用 register(); 进行服务的注册。

	public void onApplicationEvent(WebServerInitializedEvent event) {
		bind(event);
	}
  • register()方法:将传入的注册信息实体注册到 Nacos 中。
    @Override
    public void register(Registration registration) {
        // 判断注册实体是否有 ID 来进行标识。
        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
            return;
        }
        // 获取操作 Nacos 的客户端服务类
        NamingService namingService = namingService();
        // 获取注册信息的服务 ID,其实就是服务名
        String serviceId = registration.getServiceId();
        // 从配置中获取注册到对应的组
        String group = nacosDiscoveryProperties.getGroup();
        // 通过注册信息的对应主机端口等获取提供的服务信息主体
        Instance instance = getNacosInstanceFromRegistration(registration);
        try {
            // 通过 nacos 客户端注册对应服务
            namingService.registerInstance(serviceId, group, instance);
            log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                    instance.getIp(), instance.getPort());
        }
        catch (Exception e) {
            log.error("nacos registry, {} register failed...{},", serviceId,
                    registration.toString(), e);
            rethrowRuntimeException(e);
        }
    }
  • registerInstance():注册实例的方法,包括了对心跳检测的集成和真实服务的注册。
    /**
     *
     * @param serviceName:注册的服务名
     * @param groupName:注册到的对应组
     * @param instance:服务的对应具体信息实体
     * @throws NacosException
     */
    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        // 获取对应的组名服务名,形式为 GroupName@@ServiceName
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        // 通过这个是否短命的标志来判断是否需要心跳检测
        if (instance.isEphemeral()) {
            // 构造一个心跳检测的具体信息,包括心跳检测的,IP,端口,时间间隔等
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            // 添加这个心跳检测的具体信息,定时发送数据包进行检测
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        // 真正进行服务的注册
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }
心跳检测

心跳检测的实现,将心跳检测的信息放入到一个Map中,使用定时任务发送心跳检测。

    /**
     * 增加心跳检测实例
     * 启动定时任务进行心跳检测
     * @param serviceName
     * @param beatInfo
     */
    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        // 构造一个对应服务的 key
        String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        // 如果原本就存在,则移除,停止心跳检测
        if ((existBeat = dom2Beat.remove(key)) != null) {
            existBeat.setStopped(true);
        }
        // 将新加入的心跳检测的服务加入 dom2Beat(一个 ConcurrentHashMap)
        dom2Beat.put(key, beatInfo);
        // 使用定时的线程池执行一个定时任务,定时发送心跳检测任务
        executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        // 给指标的监视器设置心跳检测服务集合的大小
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }
服务注册

真正进行服务的注册,封装参数之后,通过 Nacos 的客户端发送请求注册服务。

    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
                instance);
        // 封装注册的参数
        final Map<String, String> params = new HashMap<String, String>(16);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
        // 真正使用 Nacos 的客户端调用暴露的Api在注册中心注册
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
    }

客户端调用的 Api 在 nacos-naming 模块下的 InstanceController 中。


@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {

    . . .

    /**
     * 注册服务的接口
     * @param request
     * @return
     * @throws Exception
     */
    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {

        // 得到对应的参数并进行校验
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        final Instance instance = parseInstance(request);
        // 真正执行注册
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

    . . .
}

Controller 中使用 serviceManager 对服务实例进行真正的注册。

    /**
     * 对实例进行注册,如果没有对应服务会先创建服务并进行初始化
     * 其实所有的服务集合是一个 ConcurrenrtHashMap<namespaceId 和 serviceName 组成的键, 值是对应的服务 service>
     * 每个服务又有一个 ConcurrentHashMap 键是对应的服务实例的命名空间id 和服务名,值是对应实例的具体信息
     * @param namespaceId
     * @param serviceName
     * @param instance
     * @throws NacosException
     */
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        /**
         * 创建一个空的服务如果没有被创建的话,实际就是初始化 service map
         * 主要做了三件事:
         *      (1)创建该服务对象,putService() 加入到缓存中。
         *      (2)service.init(),初始化服务:
         *          启动定时任务,对客户端发送心跳包的最后时间进行更新,
         *          如果超时则代表客户端不健康,将 healthy 属性设置为 false
         *      (3)consistencyService.listen() 对其他节点数据进行监听,保证数据的一致性
         */
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        // 通过这两个参数从map中得到对应的服务对象
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        // 对创建的服务添加实例
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }
总结
  • 在SpringBoot 项目启动的时候,如果倒入了相关依赖,并且开启了@EnableDiscoveryClient注解,该注解会将autoRegister 属性设置为 true;SpringBoot 就会倒入一个自动配置的 AutoServiceRegistration 类,有一个 AbstractAutoServiceRegistration 类继承了该类,并且实现了如下方法,在 WebServerInitializedEvent 监听服务器初始化事件的发生,然后在 bind() 方法中的 start() 方法中会调用 register(); 进行服务的注册。
  • 对于服务的注册,主要包含两块的内容,本地 Nacos 客户端的操作,以及远程 Nacos 服务器的操作:
    • 本地 Nacos 的客户端会将服务注册的信息封装为一个实例,检查如果需要心跳检测,则会启动一个定时任务,定时向服务端发送心跳检测包。之后,会使用 namingService 对远端 Nacos 服务器发送请求进行服务的注册。
    • 远端服务器中的 InstanceController 会接收到该请求,使用 ServiceManager 进行服务的注册,对服务以及对应的实例进行初始化和添加,对应存储的数据结构是,所有的服务存放在一个 ConcurrentHashMap 里,一个服务的所有实例也存放在一个 ConcurrentHashMap。

2.2 服务地址的发现和动态感知

了解了上面的内容和服务实力的存储方式之后,这个内容就很简单了,对于地址的发现和感知主要用于对服务的调用,获取到对应服务的 IP:PORT 之后进行请求的调用。

地址发现

对于地址的发现还是在 InstanceController 中的 list() 方法,该方法可以获得对应服务的全部地址信息,再通过相关的负载均衡的策略等,进行服务的调用。

对应的方法比较简单就不贴出来了,就是从 servcieMap 中取对应的服务,拿出所有实例,对返回参数进行一个封装之后返回。

动态感知

Nacos 客户端会通过 subscribe() 方法,对Nacos 远端服务进行监听,NamingService 中组合了一个 HostReactor 类,该类初始化的时候,会开启一定定时任务的线程,每十秒钟向服务端 pull 一次最新的地址数据。并且服务端在检测到对应服务下线或者更新之后,也会主动推送消息给 Nacos 客户端,双方采用的是 UDP 协议。

3. 配置中心实现原理

整体架构:

在这里插入图片描述

Nacos 的配置中心主要提供了四种操作以及对应的 open api。

  • 发布配置信息,将配置添加到对应的 Nacos 服务器。
    @PostMapping
    @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
    public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
            @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
            @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
            @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
            @RequestParam(value = "appName", required = false) String appName,
            @RequestParam(value = "src_user", required = false) String srcUser,
            @RequestParam(value = "config_tags", required = false) String configTags,
            @RequestParam(value = "desc", required = false) String desc,
            @RequestParam(value = "use", required = false) String use,
            @RequestParam(value = "effect", required = false) String effect,
            @RequestParam(value = "type", required = false) String type,
            @RequestParam(value = "schema", required = false) String schema) throws NacosException
  • 获取配置信息,从 Nacos 服务器获取配置。
    @GetMapping
    @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
    public void getConfig(HttpServletRequest request, HttpServletResponse response,
            @RequestParam("dataId") String dataId, @RequestParam("group") String group,
            @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
            @RequestParam(value = "tag", required = false) String tag)
            throws IOException, ServletException, NacosException
  • 删除配置信息
    @DeleteMapping
    @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
    public Boolean deleteConfig(HttpServletRequest request, HttpServletResponse response,
            @RequestParam("dataId") String dataId, //
            @RequestParam("group") String group, //
            @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
            @RequestParam(value = "tag", required = false) String tag) throws NacosException
  • 监听配置信息,订阅感兴趣的配置信息,在配置发生变化的时候触发事件。
    @PostMapping("/listener")
    @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
    public void listener(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException

3.1 配置的动态监听

如果 Nacos 服务器上的配置信息发生变化的时候,需要让对应的应用程序对相应的配置感知并且随之改动。所以需要客户端对感兴趣的配置信息进行动态的监听。

对于配置的实时变更,需要服务器和客户端之间的数据交互,而对于服务器和服务器的数据交互无非就是两种形式

  • PULL,客户端从服务器主动拉取数据。
    • 对于 PULL 模式,由于定时任务存在时间间隔,所以不能保证实时性,并且由于更新不是经常发生,所以大部分的 PULL 是一个无效的拉取。
  • PUSH,服务器主动向客户端推送更新数据。
    • 对于 PUSH 模式,需要维护一个服务器端和客户端的连接,并且为了保证连接的可靠性,需要通过心跳机制的检测。这样在客户端数量很大的情况下,会耗费大量的系统资源。

Nacos 采用的是PULL 和 PUSH 相结合的模式,即一种长轮询机制,具体过程如下:

  • Nacos 的客户端向服务器发送一次检查配置更新的请求,服务器端检查配置是否一致,如果不一致会直接返回更新的信息,如果一致,则会将请求保存住,添加到服务器端的 allSubs 队列中,设置一个定时 29.5 s 的定时任务,进行延时操作(该时间可能会由具体情况发生变动)。
    • 如果在这 29.5s 内发生了配置的更新,服务器端则会触发对应时间,遍历 allSubs 队列,找到感兴趣的客户端,将配置信息的更新操作写入到响应中并返回。类似于完成了一次 PUSH 操作,但是不需要建立一个服务器端到客户端的 HTTP 连接。
    • 如果在此时间内没有发生改变,则会进入自动检测机制,直接返回响应给客户端。

3.2 配置的加载

Spring 通过抽象了一个 Environment 来表示整个 Spring 的环境配置信息。Spring 启动的时候会把所有的配置环境信息加载到 Environment 中。之后就可以进行获取。

对于 Nacos 要实现配置的统一管理和动态刷新,主要需要解决两个问题:

  1. 如何将远程的配置加载到本地的 Environment 中?
  2. 配置进行更新的时候,如何将新的配置更新到本地的 Environment 中,并且对使用这些配置的组件进行动态刷新?

首先 SpringBoot 启动的 run 方法中,有一个准备上下文环境信息的方法:

prepareContext(context, environment, listeners, applicationArguments, printedBanner);

在 prepareContext 方法中,会对上下文进行初始化:

applyInitializers(context);
	protected void applyInitializers(ConfigurableApplicationContext context) {
		for (ApplicationContextInitializer initializer : getInitializers()) {
			Class<?> requiredType = GenericTypeResolver.resolveTypeArgument(initializer.getClass(),
					ApplicationContextInitializer.class);
			Assert.isInstanceOf(requiredType, context, "Unable to call initializer.");
			// 使用初始化器进行初始化
			initializer.initialize(context);
		}
	}
  • 在初始化器,执行初始化 initialize() 方法中,会去调用locator 去加载配置信息,这里其实就会去 Nacos 的实现中,加载远程的配置信息。
Collection<PropertySource<?>> source = locator.locateCollection(environment);
PropertySource<?> propertySource = locator.locate(environment);
	@Override
	public PropertySource<?> locate(Environment env) {
		nacosConfigProperties.setEnvironment(env);
		// 就是通过这个 ConfigService 对远程的配置做相关操作
		ConfigService configService = nacosConfigManager.getConfigService();

		if (null == configService) {
			log.warn("no instance of config service found, can't load config from nacos");
			return null;
		}
		long timeout = nacosConfigProperties.getTimeout();
		nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
				timeout);
		String name = nacosConfigProperties.getName();

		String dataIdPrefix = nacosConfigProperties.getPrefix();
		if (StringUtils.isEmpty(dataIdPrefix)) {
			dataIdPrefix = name;
		}

		if (StringUtils.isEmpty(dataIdPrefix)) {
			dataIdPrefix = env.getProperty("spring.application.name");
		}

		CompositePropertySource composite = new CompositePropertySource(
				NACOS_PROPERTY_SOURCE_NAME);

		loadSharedConfiguration(composite);
		loadExtConfiguration(composite);
		loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);

		return composite;
	}

之后,在上下文准备完成之后继续进行应用的启动,应用准备完毕之后会触发一个应用准备完毕的事件,Nacos 的监听器会监听该事件,并注册一个监听器,对配置信息进行监听,从而实现配置文件的刷新,同样,该客户端和服务器之间的通信,也使用的是长轮询的机制。

	@Override
	public void onApplicationEvent(ApplicationReadyEvent event) {
		// many Spring context
		if (this.ready.compareAndSet(false, true)) {
			this.registerNacosListenersForApplications();
		}
	}

客户端会有一个 ClientWorker 类,其中有两个线程池:

  • 执行检查配置信息的定时线程池:作用是检查本地的配置和缓存在磁盘中的配置是否相同,如果不相同则说明发生了变更,需要进行更新,每 10ms 会执行一次(目的是在本地保存一个缓存配置文件,用以故障的容灾)
  • 进行长轮询的定时线程池:和上面 Nacos 的服务动态更新的情况相同,获取服务器的最新数据,每 30s 执行一次。每个长轮询任务最多处理监听 3000 个配置集,超过部分会拆分多个任务进行处理。
    public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
            final Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;   
        init(properties);        
        this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        
        this.executorService = Executors
                .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                        t.setDaemon(true);
                        return t;
                    }
                });
        
        this.executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }

二. Sentinel

对于 Sentinel 主要就是用于微服务的限流和降级,来应对高并发的场景。

1. 限流

对于限流,主要有以下几种的算法实现:

  • 计数器算法
    • 该算法是一种简单实现的限流算法,即在一个时间周期内,每一次的请求访问会给计数器的值加一,如果达到设定的计数阈值,则会触发对应的降级策略。到下一个时间周期的时候,会对计数值进行重置。
    • 该算法实现简单,但是存在临界的问题,即如果第一个周期的末尾和第二个周期的开头出现了大量的请求,则无法进行控制。
  • 滑动窗口算法
    • 该滑动窗口类似于TCP缓冲区的滑动窗口,都是一个抽象的概念,给每个滑动窗口定义了总的流量大小,再给每个滑动窗口划分若干个小窗口,每个小窗口有独立的流量计数器,请求会在对应的时间窗口的计数器累加,如果大窗口内的总流量大于阈值,则会触发限流策略,每当开启一个新的时间段窗口,则会删除第一个小窗口的请求数量,相当于滑动窗口向右移动一格。
    • 该算法解决了临界的问题,Sentinel 也使用了该方法。
  • 令牌桶算法
    • 对于每个请求,要被处理首先要从一个容器中获取令牌,令牌的数量有限,没有获取到令牌的请求会触发限流的策略。
    • 并且会有一个定时任务不断的补充令牌。
  • 漏桶算法
    • 所有的请求会发到一个容器中,容器会以一定的速率放行请求,以此来实现流量的控制。

1.1 Sentinel 限流概述

对于 Sentinel 的使用就不再赘述了,直接对资源使用注解定义,配置资源名和阻塞的处理器就可以通过 DashBoard 控制台配置使用。

    @SentinelResource(value = "limite", blockHandler = "limiteHandler")
    @GetMapping("/limite")
    public String testLimite() {
        return "限流测试接口";
    }

    public String limiteHandler(BlockException e) {
        return "被限流啦。。。。。";
    }

在这里插入图片描述
仅对流控规则相关参数进行说明:

  • 资源名:对应配置的响应资源接口的名称。
  • 针对来源:限流可以针对调用者来进行,比如哪个微服务的服务名等。
  • 阈值类型:
    • QPS:每秒查询的数量。
    • 线程数:并发的线程数量。
  • 流控模式:
    • 直接:就是直接对自己的流量进行限制,到达阈值则限流。
    • 关联:当关联到的资源达到流量阈值的时候,也进行限流策略。
    • 链路:只控制资源入口的流量,如果达到阈值就进行限流。
  • 流控效果:
    • 快速失败:直接限流,调用对应的处理器。
    • Warm Up:进行预热,初始从配置阈值 / 3 的流量开始限流,根据配置的预热时间,逐渐增长到阈值大小。
    • 排队等待:就是将请求串行等待之前的请求完成。会设置等待超时的时间,避免一致等待。

1.2 Sentinel 限流实现

Sentinel 总体架构如下:

在这里插入图片描述
对于一次请求的调用,Sentinel 会通过一连串 Slot 插槽串联成一条链(责任链模式),将功能编排在一起,每个插槽会包含自己的功能,包括链路的构建,即诶单的构建,流量的监控等等。这一块可以参照官网的详细说明。

Sentinel 官网

Sentinel 对于所有请求的拦截,主要得益于 SpringBoot 的 SentinelWebAutoConfiguration 的自动配置:

public class SentinelWebAutoConfiguration implements WebMvcConfigurer {
	// 省略一些代码
	
	@Override
	public void addInterceptors(InterceptorRegistry registry) {
		if (!sentinelWebInterceptorOptional.isPresent()) {
			return;
		}
		SentinelProperties.Filter filterConfig = properties.getFilter();
		registry.addInterceptor(sentinelWebInterceptorOptional.get())
				.order(filterConfig.getOrder())
				.addPathPatterns(filterConfig.getUrlPatterns());
		log.info(
				"[Sentinel Starter] register SentinelWebInterceptor with urlPatterns: {}.",
				filterConfig.getUrlPatterns());
	}
	
	// 省略一些代码

}

很容易看出,该配置为容器加入了一个拦截器,并且通过 /** 对所有请求进行拦截。然后使用 URLClean(如果有的话)进行路径的清洗,也就是路径资源的归类。然后对资源添加限流埋点,对流量进行统计。

通过这样的方式,整个系统的资源也就都变成了 Sentinel 的资源。

private List<String> urlPatterns = Arrays.asList("/**");

源码分析

对于限流具体的实现主要在 CtSph 类(通过 SphU调用)的 entryWithPriority 方法中。

当调用 SphU.entry() 方法的时候,会进入该方法,并返回一个 Entry 代表这个资源可以被继续执行的凭证,类似于 Token。如果该资源需要被限流,则会抛出 BlockException 异常。

该方法主要包括三个步骤:

  • 获取当时运行的上下文并进行检查。
  • 获取资源处理的对应处理槽的链条。
  • 通过处理链,资源和上下文得到 Entry 并检验是否需要被限流。
    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
            throws BlockException {
        // TODO 获取上下文并检查
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            return new CtEntry(resourceWrapper, null, context);
        }
        if (context == null) {
            context = CtSph.InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }

        // TODO 获得资源处理的处理槽链条
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
		// 通过处理链资源和上下文得到 Entry
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            // TODO 通过链条检验是否被限流,没有则返回 Entry 限流则抛出 BlockException
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }

对于上下文的校验,就不多跟源码了,就是得到该资源的运行环境,包括取得的资源,来源,链路等等。

获取处理链通过 lookProcessChain 方法,就是一个单例双重检索的获取过程,有则从 map 中获取,没有则创建。创建的原理和CopyOnWriteArrayList 一样使用写时复制避免在一个线程读取一个线程更新的时候抛出 ConcurrentModificationException。

    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }
					// 创建 chain,通过编排过的 slot 一个一个遍历加入链条
                    chain = SlotChainProvider.newSlotChain();
                    // 和 CopyOnWriteArrayList 一样使用写时复制避免抛出 ConcurrentModificationException
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }

对于 chain 的创建,会使用 newSlotChain 方法,也就是把编排过的 slot 一个一个通过 addLast 加入到链条中,Slot 的具体实现有很多,对于限流相关的主要就是 FlowSlot(流量校验)和 StatisticSlot(指标数据的统计)。接下来就对这两个的具体实现进行分析,Sentinel 如何进行限流。

在这里插入图片描述

FlowSlot

该处理槽用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;对于访问的资源,会先进入 entry 方法,再通过 checkFlow 方法对流控规则进行检测。


@SpiOrder(-2000)// 该注解定义槽位的优先级,也就是加载顺序
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    /**
     * 流量规则的检测器
     */
    private final FlowRuleChecker checker;

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
            throws BlockException {
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }

    private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
        @Override
        public Collection<FlowRule> apply(String resource) {
            Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
            return flowRules.get(resource);
        }
    };
}

在 checkFlow 方法中封装的比较深,会从 checkFlow ? canPassCheck ? passLocalCheck

passLocalCheck 中有两个比较重要的方法,分别是:对具体流量处理策略的获取,以及是否限流的判断。

    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        // 获取流量的处理策略                            
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }
		// 判断是否需要限流
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }
    static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
        String limitApp = rule.getLimitApp();
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();
        /**
         * 第一种情况:(在 dashboard 或者编程式中具体指定了「针对来源」为某个资源)
         *      如果限流规则设置了具体的流控来源,并且该流量来自该应用,则进入该策略。
         *      该策略的优势是可以保证重要系统的流控区分,将流控规则细化
         */
        if (limitApp.equals(origin) && filterOrigin(origin)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return context.getOriginNode();
            }
            return selectReferenceNode(rule, context, node);
        /**
         * 第二种情况:(限流规则没有指定针对来源,也就是默认 default)
         *      为 default 直接使用该策略,实现简单,不需要对来源进行区分
         */
        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return node.getClusterNode();
            }
            return selectReferenceNode(rule, context, node);
        /**
         * 第三种情况:(限流规则针对来源设置为 other)
         *      上两种都不匹配则进入该策略,和第一种配合使用
         *      在应用很多不想一个一个配置的情况下命中该策略。
         */
        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
                && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return context.getOriginNode();
            }
            return selectReferenceNode(rule, context, node);
        }

        return null;
    }

之后对于是否需要限流的判断,也分四种具体的不同实现,也就是以上配置种说明的四种情况:

  • DefaultController:直接失败,也就是限流直接拒绝。
  • WarmUpController:预热,也就是根据设定时间增长流量到设定阈值。
  • WarmUpRateLimiterController:预热 + 排队等待。
  • RateLimiterController:排队等待,被限流的请求放入队列中排队。

在这里插入图片描述

StatisticSlot

该槽是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据。和 FlowSlot 配合使用来进行限流的操作。

@SpiOrder(-7000)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            /**
             * 先触发后面的一些 slot 进行校验,比如 SystemSlot AuthoritySlot 等
             * 因为校验通过的流量,和异常的指标需要不同方式的记录,所以先触发后续 slot
             * 该方法类似于 filter.doFilter() 方法
             */
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // 请求通过的话,增加统计的线程数和通过请求个数
            node.increaseThreadNum();
            node.addPassRequest(count);

            /**
             * 如果指定了特殊的来源节点,给该来源节点添加线程和请求数
             * 这个参数会在指定「针对来源」的流控策略中使用
             */
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            /**
             * 如果请求的资源是入口资源,则添加全局的线程数和请求数
             * 该参数在流控效果设置为「链路」的情况下使用
             */
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // 执行事件的通知来触发一些回调函数
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        // 对于一些异常的处理    
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            context.getCurEntry().setBlockError(e);
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }
            throw e;
        } catch (Throwable e) {
            context.getCurEntry().setError(e);
            throw e;
        }
    }

    // . . . 省略其他方法实现
}

很显然,流量控制的核心,也就是对线程或者请求数的统计,因为 Sentinel 主要就通过这两个参数来进行是否限流的判断,这两个参数基于以下两个方法统计:

node.increaseThreadNum();
node.addPassRequest(count);

该 node 的类型为 DefaultNode(继承于 StatisticNode),但是在 DefaultNode 中又调用了 super.increaseThreadNum(),所以真实的实现,还是在 StatisticNode 中。

public class StatisticNode implements Node {

    /**
     * 每秒钟流量的滑动计数器
     */
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
            IntervalProperty.INTERVAL);

    /**
     * 每分钟流量的滑动计数器
     */
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

    /**
     * 线程计数器,使用 LongAdder 实现
     */
    private LongAdder curThreadNum = new LongAdder();

    /**
     * 增加请求的通过数
     * @param count
     */
    @Override
    public void addPassRequest(int count) {
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }

    /**
     * 增加往返时间和成功请求数
     * @param rt
     * @param successCount
     */
    @Override
    public void addRtAndSuccess(long rt, int successCount) {
        rollingCounterInSecond.addSuccess(successCount);
        rollingCounterInSecond.addRT(rt);

        rollingCounterInMinute.addSuccess(successCount);
        rollingCounterInMinute.addRT(rt);
    }

    // 省略其他数据统计代码

}

很明显可以看出对于窗口的参数计数器都是通过 Metric 对象实现的,StatisticNode 通过组合了 Metric 对象来完成计数。Metric 是记录受保护资源的调用度量,也就是定义了各个指标记录行为的接口,其实现类为 ArrayMetric。

public class ArrayMetric implements Metric {

    /**
     * 「重要!」
     * 指标数据存储的地方
     */
    private final LeapArray<MetricBucket> data;

    // 从构造方法可以看出,每秒的滑动窗口计数器使用的是 OccupiableBucketLeapArray
    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

    // 每分钟的传入的最后一个参数为 false 所以使用的是 BucketLeapArray
    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }

    /**
     * 增加成功请求数
     * @param count
     */
    @Override
    public void addSuccess(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addSuccess(count);
    }

    /**
     * 增加通过请求数
     * @param count
     */
    @Override
    public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addPass(count);
    }
    
    // 省略其他统计方法

}

从中又很容易看出对于指标的统计都是对其中 data 属性进行操作,也就是一个 LeapArray 类型的字段,每次会取出当前的窗口 WindowWrap 进行计数。

从对于下标的计算 (int)(timeId % array.length()); 可以看出 LeapArray 是一个环形的结构,初始化的时候会创建固定个窗口对象,并且只保存最近一分钟的,之后新增的数据会覆盖之前的。

public abstract class LeapArray<T> {

    /**
     * 单个小窗口的长度,也就是时间跨度
     */
    protected int windowLengthInMs;

    /**
     * 采样窗口的个数
     * 通过 intervalInMs / windowLengthInMs 得到
     */
    protected int sampleCount;

    /**
     * 大窗口的长度,也就是全部小窗口的总和
     */
    protected int intervalInMs;

    /**
     * 存储所有窗口的原子数组
     */
    protected final AtomicReferenceArray<WindowWrap<T>> array;

    /**
     * 全局锁
     * 在重置覆盖窗口数据的时候使用
     */
    private final ReentrantLock updateLock = new ReentrantLock();


    /**
     * 获取当前窗口的方法
     * @param timeMillis:该参数传入的就是当前的时间戳
     * @return
     */
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        // 根据时间戳计算当前窗口的索引
        int idx = calculateTimeIdx(timeMillis);
        // 计算当前窗口的开始时间
        long windowStart = calculateWindowStart(timeMillis);

        /**
         * 从 array 中获取当前窗口有三种情况:
         *
         * (1) 对应索引的窗口不存在,需要自己创建一个并使用 CAS 加入;
         * (2) 当前索引窗口命中,窗口的开始时间相同,直接返回
         * (3) 当前窗口应该的开始时间大于旧的开始时间,对应的窗口过时了,需要覆盖
         */
        // 自旋
        while (true) {
            WindowWrap<T> old = array.get(idx);
            /**
             * 第一种情况:
             *      对应索引的窗口不存在
             */
            if (old == null) {
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // CAS 加入成功,直接返回
                    return window;
                } else {
                    // 加入失败,避免一致占用 CPU 让出线程,大概率下次进入则可以获取对应的窗口
                    Thread.yield();
                }
            /**
             * 第二种情况:
             *      当前窗口的开始时间和获取的开始时间相同,则直接返回当前窗口
             */
            } else if (windowStart == old.windowStart()) {
                return old;
            /**
             * 第三种情况:
             *      当前的窗口开始时间比获取到的窗口开始时间晚,
             *      也就是之前的窗口过期了,需要重置。
             */
            } else if (windowStart > old.windowStart()) {
                // 获取全局锁
                if (updateLock.tryLock()) {
                    try {
                        // 成功重置
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // 没有获取到锁则放出线程
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // 理论上不会进入这种情况
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
}

这个地方我有个疑惑,对于 else if (windowStart < old.windowStart()) 这种情况,给出的注释是理论上不会进入该分支,但是对于 windowStart 的时间是在循环之前计算的,索引也是,但是循环中 get 方法是实时的,我感觉还是有实效性的问题

以上部分就讲清楚了滑动窗口的数据结构以及如何获取对应的当前窗口,之后就要弄明白窗口是什么,又如何进行数据的统计。

首先窗口是什么,是一个 WindowWrap 类型的窗口包装,包含了窗口相关的时间跨度以及开始时间属性,包装的是一个 MetricBucket 对象,用来真正的统计计数。

public class MetricBucket {

    /**
     * 对于各个属性计数的 LongAddr 数组
     *      counters[0]    PASS 通过数
     *      counters[1]    BLOCK 阻塞拒绝数
     *      counters[2]    EXCEPTION 出现异常数
     *      counters[3]    SUCCESS 成功数
     *      counters[4]    RT 响应时间
     *      counters[5]    OCCUPIED_PASS 预分配的通过数
     */
    private final LongAdder[] counters;

    /**
     * 最小的往返时间
     */
    private volatile long minRt;

    // 省略所有方法实现,就是使用对应的累加器累加
}

知道了如何对于流量统计,再回到之前限流判断的方法,以默认直接失败的为例:会通过统计的数据和设置的阈值对比,来进行限流的判断。

public class DefaultController implements TrafficShapingController {
    
    /**
     * 设置限流规则时,设置的阈值
     */
    private double count;

    /**
     * 设置的阈值类型
     *      0   线程数
     *      1   QPS
     */
    private int grade;

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // 获取当前窗口的计数
        int curCount = avgUsedTokens(node);
        // 如果当前计数 + 需要获取的计数 > 阈值,则失败
        if (curCount + acquireCount > count) {
            // 如果 prioritized 为 true 并且为 QPS 的模式,则可以等待
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

    /**
     * 从节点获取线程数或者QPS
     * @param node
     * @return
     */
    private int avgUsedTokens(Node node) {
        // 节点为空,则说明没有统计数据,直接返回默认 0
        if (node == null) {
            return DEFAULT_AVG_USED_TOKENS;
        }
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
    }
}

1.3 限流总结

  • 首先,Sentinel 整合 SpringBoot 的话,会为容器导入一个 SentinelWebAutoConfigration, 该配置类为容器增加了一个拦截器,会对所有资源进行拦截,进而转化为 Sentinel 的资源。
  • 之后的请求就会进入 SphU.entry() 方法,该方法会对是否限流进行判断,不需要限流则会返回一个 Entry ,类似于通行的凭证。需要限流则会根据具体的策略执行。
  • 该方法又会进入 CtSph 的 entryWithPriority 方法,该方法会得到一条单例的处理槽链,这是 Sentinel 的核心,再通过这条链条进行限流的具体处理。
  • 限流相关的槽是 Statistic Slot 统计数据指标的槽以及 Flow Slot 进行流量控制的槽。
    • Statistic Slot:使用的是滑动窗口的统计算法,划分一个大窗口,内部又分割了很多小窗口,每次会获取对应的小窗口,再使用窗口包装的 MetricBucket 中的 LongAddr 数组对对应的统计指标进行累加。
    • Flow Slot:限流控制的槽,主要用于获取限流的策略,再对流量进行比对处理。

2. 降级

前面对于 Sentinel 的处理链以及槽位和设计思想有了一定的了解之后,降级的实现就变得很好分析,降级主要是通过 DegradeSlot 处理槽来实现的。


@SpiOrder(-1000) // 槽位的优先级
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    /**
     * Sentinel 的槽位都是通过 entry 作为方法处理的入口
     * @param context
     * @param resourceWrapper
     * @param node
     * @param count
     * @param prioritized
     * @param args
     * @throws Throwable
     */
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        // 执行的检查,也就是查看是否需要限流或者降级
        performChecking(context, resourceWrapper);
        // 放行执行链,类似于 filter.doFilter()
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void performChecking(Context context, ResourceWrapper r) throws BlockException {
        // 通过资源名获取所有断路器
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            return;
        }
        // 遍历所有断路器,查看上下文是否能通过
        for (CircuitBreaker cb : circuitBreakers) {
            if (!cb.tryPass(context)) {
                throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
            }
        }
    }

    // 省略其他代码
}

在 DegradeSlot 中的 entry 方法会对降级进行判断,通过资源名获取所有的断路器,循环进行判断,判断逻辑主要在 cb.tryPass(context) 中实现。

该方法的实现很简单,就是检查现在连路的状态,如果是 CLOSED 说明链路关闭政策,则继续通行,如果是 OPEN 说明已经垄断,检查是否重试或者转化为半开。

    @Override
    public boolean tryPass(Context context) {
        if (currentState.get() == State.CLOSED) {
            return true;
        }
        if (currentState.get() == State.OPEN) {
            return retryTimeoutArrived() && fromOpenToHalfOpen(context);
        }
        return false;
    }

那么这个状态是在什么地方统计并且改变的呢,这里只是做了是否垄断的校验。这就得益于它的两个具体实现类。

在这里插入图片描述

  • ExceptionCircuitBreaker:异常的断路器,用于降级对应的异常比例的规则以及异常数的规则。
  • ResponseTimeCircuitBreaker:响应时间的断路器,对应RT时间判断的降级规则。

以上两个类内部都有一个 Counter 的计数类,包含了两个字段,都是 LongAddr 类型的,用于总数的统计以及不满足要求请求(例如异常或者超时)的统计,如果比例大于配置的降级规则,则改变对应的降级状态,之后在 tryPass 中检测从而进行降级。

另外,根据滑动窗口的算法规则,这次 WindowWrap 包装的就是这些 Counter,每次会取出本次窗口的 Counter 进行数量的统计。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-11-17 12:37:37  更:2021-11-17 12:37:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 2:03:02-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码