一. Nacos
1. 概述
对于 Nacos 是什么,有什么用,怎么用,就不再赘述了。
官网地址:https://nacos.io/zh-cn/
Nacos 系统架构图: 主要包括:
Provider APP :提供可复用可调用服务的应用。Consumer APP :会对某个服务发起调用的应用方。Name Server :分布式系统中通过 DNS、VIP(Vritual IP)或者地址来实现集群的服务路由。Nacos Server :Nacos 服务的提供者。
- OpenAPI:提供给外部的功能访问入口。
- Config Service:配置服务。
- Naming Service:提供分布式系统中所有对象(Object)、实体(Entity)的“名字”到关联的元数据之间的映射管理服务。
- Nacos Core:Nacos 的核心源码部分。
- Consistency Protocol:保证分布式共识的协议算法。
Nacos Console :Nacos 的控制台中心。
2. 服务注册发现实现原理
对于 Nacos 作为注册中心的源码分析主要包括三个方面:
2.1 服务注册
Spring Cloud 提供了服务注册的接口标准,集成到 Spring Cloud 中的注册中心会实现这些方法。
public interface ServiceRegistry<R extends Registration> {
void register(R registration);
void deregister(R registration);
void close();
void setStatus(R registration, String status);
<T> T getStatus(R registration);
}
Nacos 通过实现类 NacosServiceRegistry 实现了这些方法完成对 Nacos 的操作。
并且在SpringCloud 整合的时候,为容器导入了一个 AutoServiceRegistration 类,有一个 AbstractAutoServiceRegistration 类继承了该类,并且实现了如下方法,在 WebServerInitializedEvent 监听服务器初始化事件的发生,然后在 bind() 方法中的 start() 方法中会调用 register(); 进行服务的注册。
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
- register()方法:将传入的注册信息实体注册到 Nacos 中。
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
Instance instance = getNacosInstanceFromRegistration(registration);
try {
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
rethrowRuntimeException(e);
}
}
- registerInstance():注册实例的方法,包括了对心跳检测的集成和真实服务的注册。
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
serverProxy.registerService(groupedServiceName, groupName, instance);
}
心跳检测
心跳检测的实现,将心跳检测的信息放入到一个Map中,使用定时任务发送心跳检测。
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
服务注册
真正进行服务的注册,封装参数之后,通过 Nacos 的客户端发送请求注册服务。
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
客户端调用的 Api 在 nacos-naming 模块下的 InstanceController 中。
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
. . .
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
. . .
}
Controller 中使用 serviceManager 对服务实例进行真正的注册。
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
总结
- 在SpringBoot 项目启动的时候,如果倒入了相关依赖,并且开启了@EnableDiscoveryClient注解,该注解会将autoRegister 属性设置为 true;SpringBoot 就会倒入一个自动配置的 AutoServiceRegistration 类,有一个 AbstractAutoServiceRegistration 类继承了该类,并且实现了如下方法,在 WebServerInitializedEvent 监听服务器初始化事件的发生,然后在 bind() 方法中的 start() 方法中会调用 register(); 进行服务的注册。
- 对于服务的注册,主要包含两块的内容,本地 Nacos 客户端的操作,以及远程 Nacos 服务器的操作:
- 本地 Nacos 的客户端会将服务注册的信息封装为一个实例,检查如果需要心跳检测,则会启动一个定时任务,定时向服务端发送心跳检测包。之后,会使用 namingService 对远端 Nacos 服务器发送请求进行服务的注册。
- 远端服务器中的 InstanceController 会接收到该请求,使用 ServiceManager 进行服务的注册,对服务以及对应的实例进行初始化和添加,对应存储的数据结构是,所有的服务存放在一个 ConcurrentHashMap 里,一个服务的所有实例也存放在一个 ConcurrentHashMap。
2.2 服务地址的发现和动态感知
了解了上面的内容和服务实力的存储方式之后,这个内容就很简单了,对于地址的发现和感知主要用于对服务的调用,获取到对应服务的 IP:PORT 之后进行请求的调用。
地址发现
对于地址的发现还是在 InstanceController 中的 list() 方法,该方法可以获得对应服务的全部地址信息,再通过相关的负载均衡的策略等,进行服务的调用。
对应的方法比较简单就不贴出来了,就是从 servcieMap 中取对应的服务,拿出所有实例,对返回参数进行一个封装之后返回。
动态感知
Nacos 客户端会通过 subscribe() 方法,对Nacos 远端服务进行监听,NamingService 中组合了一个 HostReactor 类,该类初始化的时候,会开启一定定时任务的线程,每十秒钟向服务端 pull 一次最新的地址数据。并且服务端在检测到对应服务下线或者更新之后,也会主动推送消息给 Nacos 客户端,双方采用的是 UDP 协议。
3. 配置中心实现原理
整体架构:
Nacos 的配置中心主要提供了四种操作以及对应的 open api。
- 发布配置信息,将配置添加到对应的 Nacos 服务器。
@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema) throws NacosException
@GetMapping
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void getConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag)
throws IOException, ServletException, NacosException
@DeleteMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean deleteConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId,
@RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag) throws NacosException
- 监听配置信息,订阅感兴趣的配置信息,在配置发生变化的时候触发事件。
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException
3.1 配置的动态监听
如果 Nacos 服务器上的配置信息发生变化的时候,需要让对应的应用程序对相应的配置感知并且随之改动。所以需要客户端对感兴趣的配置信息进行动态的监听。
对于配置的实时变更,需要服务器和客户端之间的数据交互,而对于服务器和服务器的数据交互无非就是两种形式:
- PULL,客户端从服务器主动拉取数据。
- 对于 PULL 模式,由于定时任务存在时间间隔,所以不能保证实时性,并且由于更新不是经常发生,所以大部分的 PULL 是一个无效的拉取。
- PUSH,服务器主动向客户端推送更新数据。
- 对于 PUSH 模式,需要维护一个服务器端和客户端的连接,并且为了保证连接的可靠性,需要通过心跳机制的检测。这样在客户端数量很大的情况下,会耗费大量的系统资源。
Nacos 采用的是PULL 和 PUSH 相结合的模式,即一种长轮询机制,具体过程如下:
- Nacos 的客户端向服务器发送一次检查配置更新的请求,服务器端检查配置是否一致,如果不一致会直接返回更新的信息,如果一致,则会将请求保存住,添加到服务器端的 allSubs 队列中,设置一个定时 29.5 s 的定时任务,进行延时操作(该时间可能会由具体情况发生变动)。
- 如果在这 29.5s 内发生了配置的更新,服务器端则会触发对应时间,遍历 allSubs 队列,找到感兴趣的客户端,将配置信息的更新操作写入到响应中并返回。类似于完成了一次 PUSH 操作,但是不需要建立一个服务器端到客户端的 HTTP 连接。
- 如果在此时间内没有发生改变,则会进入自动检测机制,直接返回响应给客户端。
3.2 配置的加载
Spring 通过抽象了一个 Environment 来表示整个 Spring 的环境配置信息。Spring 启动的时候会把所有的配置环境信息加载到 Environment 中。之后就可以进行获取。
对于 Nacos 要实现配置的统一管理和动态刷新,主要需要解决两个问题:
- 如何将远程的配置加载到本地的 Environment 中?
- 配置进行更新的时候,如何将新的配置更新到本地的 Environment 中,并且对使用这些配置的组件进行动态刷新?
首先 SpringBoot 启动的 run 方法中,有一个准备上下文环境信息的方法:
prepareContext(context, environment, listeners, applicationArguments, printedBanner);
在 prepareContext 方法中,会对上下文进行初始化:
applyInitializers(context);
protected void applyInitializers(ConfigurableApplicationContext context) {
for (ApplicationContextInitializer initializer : getInitializers()) {
Class<?> requiredType = GenericTypeResolver.resolveTypeArgument(initializer.getClass(),
ApplicationContextInitializer.class);
Assert.isInstanceOf(requiredType, context, "Unable to call initializer.");
initializer.initialize(context);
}
}
- 在初始化器,执行初始化 initialize() 方法中,会去调用locator 去加载配置信息,这里其实就会去 Nacos 的实现中,加载远程的配置信息。
Collection<PropertySource<?>> source = locator.locateCollection(environment);
PropertySource<?> propertySource = locator.locate(environment);
@Override
public PropertySource<?> locate(Environment env) {
nacosConfigProperties.setEnvironment(env);
ConfigService configService = nacosConfigManager.getConfigService();
if (null == configService) {
log.warn("no instance of config service found, can't load config from nacos");
return null;
}
long timeout = nacosConfigProperties.getTimeout();
nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
timeout);
String name = nacosConfigProperties.getName();
String dataIdPrefix = nacosConfigProperties.getPrefix();
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = name;
}
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = env.getProperty("spring.application.name");
}
CompositePropertySource composite = new CompositePropertySource(
NACOS_PROPERTY_SOURCE_NAME);
loadSharedConfiguration(composite);
loadExtConfiguration(composite);
loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
return composite;
}
之后,在上下文准备完成之后继续进行应用的启动,应用准备完毕之后会触发一个应用准备完毕的事件,Nacos 的监听器会监听该事件,并注册一个监听器,对配置信息进行监听,从而实现配置文件的刷新,同样,该客户端和服务器之间的通信,也使用的是长轮询的机制。
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
if (this.ready.compareAndSet(false, true)) {
this.registerNacosListenersForApplications();
}
}
客户端会有一个 ClientWorker 类,其中有两个线程池:
- 执行检查配置信息的定时线程池:作用是检查本地的配置和缓存在磁盘中的配置是否相同,如果不相同则说明发生了变更,需要进行更新,每 10ms 会执行一次(目的是在本地保存一个缓存配置文件,用以故障的容灾)
- 进行长轮询的定时线程池:和上面 Nacos 的服务动态更新的情况相同,获取服务器的最新数据,每 30s 执行一次。每个长轮询任务最多处理监听 3000 个配置集,超过部分会拆分多个任务进行处理。
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
init(properties);
this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
this.executorService = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
二. Sentinel
对于 Sentinel 主要就是用于微服务的限流和降级,来应对高并发的场景。
1. 限流
对于限流,主要有以下几种的算法实现:
计数器算法 :
- 该算法是一种简单实现的限流算法,即在一个时间周期内,每一次的请求访问会给计数器的值加一,如果达到设定的计数阈值,则会触发对应的降级策略。到下一个时间周期的时候,会对计数值进行重置。
- 该算法实现简单,但是存在临界的问题,即如果第一个周期的末尾和第二个周期的开头出现了大量的请求,则无法进行控制。
滑动窗口算法 :
- 该滑动窗口类似于TCP缓冲区的滑动窗口,都是一个抽象的概念,给每个滑动窗口定义了总的流量大小,再给每个滑动窗口划分若干个小窗口,每个小窗口有独立的流量计数器,请求会在对应的时间窗口的计数器累加,如果大窗口内的总流量大于阈值,则会触发限流策略,每当开启一个新的时间段窗口,则会删除第一个小窗口的请求数量,相当于滑动窗口向右移动一格。
- 该算法解决了临界的问题,Sentinel 也使用了该方法。
令牌桶算法 :
- 对于每个请求,要被处理首先要从一个容器中获取令牌,令牌的数量有限,没有获取到令牌的请求会触发限流的策略。
- 并且会有一个定时任务不断的补充令牌。
漏桶算法 :
- 所有的请求会发到一个容器中,容器会以一定的速率放行请求,以此来实现流量的控制。
1.1 Sentinel 限流概述
对于 Sentinel 的使用就不再赘述了,直接对资源使用注解定义,配置资源名和阻塞的处理器就可以通过 DashBoard 控制台配置使用。
@SentinelResource(value = "limite", blockHandler = "limiteHandler")
@GetMapping("/limite")
public String testLimite() {
return "限流测试接口";
}
public String limiteHandler(BlockException e) {
return "被限流啦。。。。。";
}
仅对流控规则相关参数进行说明:
- 资源名:对应配置的响应资源接口的名称。
- 针对来源:限流可以针对调用者来进行,比如哪个微服务的服务名等。
- 阈值类型:
- 流控模式:
- 直接:就是直接对自己的流量进行限制,到达阈值则限流。
- 关联:当关联到的资源达到流量阈值的时候,也进行限流策略。
- 链路:只控制资源入口的流量,如果达到阈值就进行限流。
- 流控效果:
- 快速失败:直接限流,调用对应的处理器。
- Warm Up:进行预热,初始从配置阈值 / 3 的流量开始限流,根据配置的预热时间,逐渐增长到阈值大小。
- 排队等待:就是将请求串行等待之前的请求完成。会设置等待超时的时间,避免一致等待。
1.2 Sentinel 限流实现
Sentinel 总体架构如下:
对于一次请求的调用,Sentinel 会通过一连串 Slot 插槽串联成一条链(责任链模式),将功能编排在一起,每个插槽会包含自己的功能,包括链路的构建,即诶单的构建,流量的监控等等。这一块可以参照官网的详细说明。
Sentinel 官网
Sentinel 对于所有请求的拦截,主要得益于 SpringBoot 的 SentinelWebAutoConfiguration 的自动配置:
public class SentinelWebAutoConfiguration implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
if (!sentinelWebInterceptorOptional.isPresent()) {
return;
}
SentinelProperties.Filter filterConfig = properties.getFilter();
registry.addInterceptor(sentinelWebInterceptorOptional.get())
.order(filterConfig.getOrder())
.addPathPatterns(filterConfig.getUrlPatterns());
log.info(
"[Sentinel Starter] register SentinelWebInterceptor with urlPatterns: {}.",
filterConfig.getUrlPatterns());
}
}
很容易看出,该配置为容器加入了一个拦截器,并且通过 /** 对所有请求进行拦截。然后使用 URLClean(如果有的话)进行路径的清洗,也就是路径资源的归类。然后对资源添加限流埋点,对流量进行统计。
通过这样的方式,整个系统的资源也就都变成了 Sentinel 的资源。
private List<String> urlPatterns = Arrays.asList("/**");
源码分析
对于限流具体的实现主要在 CtSph 类(通过 SphU调用)的 entryWithPriority 方法中。
当调用 SphU.entry() 方法的时候,会进入该方法,并返回一个 Entry 代表这个资源可以被继续执行的凭证,类似于 Token。如果该资源需要被限流,则会抛出 BlockException 异常。
该方法主要包括三个步骤:
- 获取当时运行的上下文并进行检查。
- 获取资源处理的对应处理槽的链条。
- 通过处理链,资源和上下文得到 Entry 并检验是否需要被限流。
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
context = CtSph.InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
对于上下文的校验,就不多跟源码了,就是得到该资源的运行环境,包括取得的资源,来源,链路等等。
获取处理链通过 lookProcessChain 方法,就是一个单例双重检索的获取过程,有则从 map 中获取,没有则创建。创建的原理和CopyOnWriteArrayList 一样使用写时复制避免在一个线程读取一个线程更新的时候抛出 ConcurrentModificationException。
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
对于 chain 的创建,会使用 newSlotChain 方法,也就是把编排过的 slot 一个一个通过 addLast 加入到链条中,Slot 的具体实现有很多,对于限流相关的主要就是 FlowSlot(流量校验)和 StatisticSlot(指标数据的统计)。接下来就对这两个的具体实现进行分析,Sentinel 如何进行限流。
FlowSlot
该处理槽用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;对于访问的资源,会先进入 entry 方法,再通过 checkFlow 方法对流控规则进行检测。
@SpiOrder(-2000)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
}
在 checkFlow 方法中封装的比较深,会从 checkFlow ? canPassCheck ? passLocalCheck
passLocalCheck 中有两个比较重要的方法,分别是:对具体流量处理策略的获取,以及是否限流的判断。
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
static Node selectNodeByRequesterAndStrategy( FlowRule rule, Context context, DefaultNode node) {
String limitApp = rule.getLimitApp();
int strategy = rule.getStrategy();
String origin = context.getOrigin();
if (limitApp.equals(origin) && filterOrigin(origin)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return node.getClusterNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
return null;
}
之后对于是否需要限流的判断,也分四种具体的不同实现,也就是以上配置种说明的四种情况:
- DefaultController:直接失败,也就是限流直接拒绝。
- WarmUpController:预热,也就是根据设定时间增长流量到设定阈值。
- WarmUpRateLimiterController:预热 + 排队等待。
- RateLimiterController:排队等待,被限流的请求放入队列中排队。
StatisticSlot
该槽是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据。和 FlowSlot 配合使用来进行限流的操作。
@SpiOrder(-7000)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
context.getCurEntry().setBlockError(e);
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseBlockQps(count);
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
throw e;
}
}
}
很显然,流量控制的核心,也就是对线程或者请求数的统计,因为 Sentinel 主要就通过这两个参数来进行是否限流的判断,这两个参数基于以下两个方法统计:
node.increaseThreadNum();
node.addPassRequest(count);
该 node 的类型为 DefaultNode(继承于 StatisticNode),但是在 DefaultNode 中又调用了 super.increaseThreadNum(),所以真实的实现,还是在 StatisticNode 中。
public class StatisticNode implements Node {
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
private LongAdder curThreadNum = new LongAdder();
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
@Override
public void addRtAndSuccess(long rt, int successCount) {
rollingCounterInSecond.addSuccess(successCount);
rollingCounterInSecond.addRT(rt);
rollingCounterInMinute.addSuccess(successCount);
rollingCounterInMinute.addRT(rt);
}
}
很明显可以看出对于窗口的参数计数器都是通过 Metric 对象实现的,StatisticNode 通过组合了 Metric 对象来完成计数。Metric 是记录受保护资源的调用度量,也就是定义了各个指标记录行为的接口,其实现类为 ArrayMetric。
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
@Override
public void addSuccess(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addSuccess(count);
}
@Override
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
}
从中又很容易看出对于指标的统计都是对其中 data 属性进行操作,也就是一个 LeapArray 类型的字段,每次会取出当前的窗口 WindowWrap 进行计数。
从对于下标的计算 (int)(timeId % array.length()); 可以看出 LeapArray 是一个环形的结构,初始化的时候会创建固定个窗口对象,并且只保存最近一分钟的,之后新增的数据会覆盖之前的。
public abstract class LeapArray<T> {
protected int windowLengthInMs;
protected int sampleCount;
protected int intervalInMs;
protected final AtomicReferenceArray<WindowWrap<T>> array;
private final ReentrantLock updateLock = new ReentrantLock();
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
}
这个地方我有个疑惑,对于 else if (windowStart < old.windowStart()) 这种情况,给出的注释是理论上不会进入该分支,但是对于 windowStart 的时间是在循环之前计算的,索引也是,但是循环中 get 方法是实时的,我感觉还是有实效性的问题
以上部分就讲清楚了滑动窗口的数据结构以及如何获取对应的当前窗口,之后就要弄明白窗口是什么,又如何进行数据的统计。
首先窗口是什么,是一个 WindowWrap 类型的窗口包装,包含了窗口相关的时间跨度以及开始时间属性,包装的是一个 MetricBucket 对象,用来真正的统计计数。
public class MetricBucket {
private final LongAdder[] counters;
private volatile long minRt;
}
知道了如何对于流量统计,再回到之前限流判断的方法,以默认直接失败的为例:会通过统计的数据和设置的阈值对比,来进行限流的判断。
public class DefaultController implements TrafficShapingController {
private double count;
private int grade;
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
}
1.3 限流总结
- 首先,Sentinel 整合 SpringBoot 的话,会为容器导入一个 SentinelWebAutoConfigration, 该配置类为容器增加了一个拦截器,会对所有资源进行拦截,进而转化为 Sentinel 的资源。
- 之后的请求就会进入 SphU.entry() 方法,该方法会对是否限流进行判断,不需要限流则会返回一个 Entry ,类似于通行的凭证。需要限流则会根据具体的策略执行。
- 该方法又会进入 CtSph 的 entryWithPriority 方法,该方法会得到一条单例的处理槽链,这是 Sentinel 的核心,再通过这条链条进行限流的具体处理。
- 限流相关的槽是 Statistic Slot 统计数据指标的槽以及 Flow Slot 进行流量控制的槽。
- Statistic Slot:使用的是滑动窗口的统计算法,划分一个大窗口,内部又分割了很多小窗口,每次会获取对应的小窗口,再使用窗口包装的 MetricBucket 中的 LongAddr 数组对对应的统计指标进行累加。
- Flow Slot:限流控制的槽,主要用于获取限流的策略,再对流量进行比对处理。
2. 降级
前面对于 Sentinel 的处理链以及槽位和设计思想有了一定的了解之后,降级的实现就变得很好分析,降级主要是通过 DegradeSlot 处理槽来实现的。
@SpiOrder(-1000)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
}
在 DegradeSlot 中的 entry 方法会对降级进行判断,通过资源名获取所有的断路器,循环进行判断,判断逻辑主要在 cb.tryPass(context) 中实现。
该方法的实现很简单,就是检查现在连路的状态,如果是 CLOSED 说明链路关闭政策,则继续通行,如果是 OPEN 说明已经垄断,检查是否重试或者转化为半开。
@Override
public boolean tryPass(Context context) {
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
那么这个状态是在什么地方统计并且改变的呢,这里只是做了是否垄断的校验。这就得益于它的两个具体实现类。
ExceptionCircuitBreaker :异常的断路器,用于降级对应的异常比例的规则以及异常数的规则。ResponseTimeCircuitBreaker :响应时间的断路器,对应RT时间判断的降级规则。
以上两个类内部都有一个 Counter 的计数类,包含了两个字段,都是 LongAddr 类型的,用于总数的统计以及不满足要求请求(例如异常或者超时)的统计,如果比例大于配置的降级规则,则改变对应的降级状态,之后在 tryPass 中检测从而进行降级。
另外,根据滑动窗口的算法规则,这次 WindowWrap 包装的就是这些 Counter,每次会取出本次窗口的 Counter 进行数量的统计。
|