在 Sentinel 里面,所有的资源都对应一个资源名称以及一个 Entry。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 API 显式创建;每一个 Entry 创建的时候,同时也会创建一系列功能插槽(slot chain)。这些插槽有不同的职责,例如:
- NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
- ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
- StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;
- FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
- AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
- DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;
- SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;
总体的框架如下: 上面内容来自Sentinel官网给出的内容,接下来我们看看Sentinel源码层面是怎么实现逻辑的? 一般Sentinel限流都是通过:
Entry entry = SphU.entry('entryName');
这个Entry相当于是获取到了一个令牌,如果能够获取到这个令牌,表示可以通过,能够访问资源。 在Sentinel中有几个比较重要的概念:
- Entry 代表的是一个令牌,如果能够通过,则获取到entry不为空
- Context 代表的则是一次请求的上下文
- Node 代表的则是一次请求、一个资源、一个节点集群的请求调用信息记录
当执行SphU.entry 的时候,会访问:
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
return entryWithPriority(resourceWrapper, count, false, args);
}
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
这里
- 首先会获取到当前线程上线文的执行环境
Context - 然后获取到ProcessorSlot执行链,执行ProcessorSlot.entry
- 如果上一步执行成功,表示能够访问,返回
CtEntry ,否则抛出异常 另外需要注意的是,对于资源,在Sentinel抽象成了ResourceWrapper ,并重写了equals和hashCode方法 :
@Override
public int hashCode() {
return getName().hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ResourceWrapper) {
ResourceWrapper rw = (ResourceWrapper)obj;
return rw.getName().equals(getName());
}
return false;
}
只要资源的名称一样,这就是同一个资源 我们首先来看下获取Context :
public static Context getContext() {
return contextHolder.get();
}
这里的contextHolder 是一个ThreadLocal<Context> 变量,初始的时候肯定是空的, 所以开始肯定会走context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); 逻辑:
protected static Context trueEnter(String name, String origin) {
Context context = contextHolder.get();
if (context == null) {
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
LOCK.lock();
try {
node = contextNameNodeMap.get(name);
if (node == null) {
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
Constants.ROOT.addChild(node);
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context);
}
return context;
}
这里需要注意的是在ContextUtil 代码加载的时候会执行一段静态代码:
private static void initDefaultContext() {
String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
Constants.ROOT.addChild(node);
contextNameNodeMap.put(defaultContextName, node);
}
而这里的参数name 为Constants.CONTEXT_DEFAULT_NAME ,所以,开始的时候即使Context为null,node = contextNameNodeMap.get(name); 也不为null,是一个EntranceNode , 即默认情况下,每个Context初始的时候node都为EntranceNode .
这样,就得到了Context。
接下来就是获取执行链路ProcessorSlot :
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
这里也是通过Spi机制获取,在META-INF.services 下面,有这个几个文件SPI会用到,
这里首先会获取一个SlotChainBuilder ,默认获取到的就是DefaultSlotChainBuilder , 在DefaultSlotChainBuilder 会加载com.alibaba.csp.sentinel.slotchain.ProcessorSlot 里面的类,Sentinel中默认提供了如下实现:
# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
这里加载完之后,会根据ProcessorSlot的注解的order 属性进行从大到小的排序,默认几个实现的排序大小大家可对下:
public static final int ORDER_NODE_SELECTOR_SLOT = -10000;
public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;
public static final int ORDER_LOG_SLOT = -8000;
public static final int ORDER_STATISTIC_SLOT = -7000;
public static final int ORDER_AUTHORITY_SLOT = -6000;
public static final int ORDER_SYSTEM_SLOT = -5000;
public static final int ORDER_FLOW_SLOT = -2000;
public static final int ORDER_DEGRADE_SLOT = -1000;
然后这里有一点需要注意,Sentinel中,每个资源会对应一组ProcessorSlot,在这些ProcessorSlot有很多类实例变量,只会记录该资源的信息,,而有些则是全局的,属于整个节点的
NodeSelectorSlot
接下来开始执行chain.entry(context, resourceWrapper, null, count, prioritized, args); , 这里的chain是一个DefaultProcessorSlotChain ,这个里面只有了上面加载的ProcessorSlot的链表,最终会从第一个ProcessorSlot往后执行,首选in执行的是NodeSelectorSlot :
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
这里第一次的时候node=null,新建了一个DefaultNode ,注意,这里的NodeSelectorSlot不是一个单例,而是每个资源都有一个 , 然后往后面传递执行的时候,传递的是生成的这个node。另外这里context.getName() ,如果没有特别执行,每个context.getName() 返回的都是Constants.CONTEXT_DEFAULT_NAME .
ClusterBuilderSlot
ClusterBuilderSlot主要逻辑如下:
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
同样这里的clusterNode 也是该资源全局一个。接着完后执行,传递的仍是NodeSelectorSlot 中的DefaultNode
StatisticSlot
StatisticSlot的作用是记录每个资源的请求情况。
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
node.increaseThreadNum();
node.addPassRequest(count);
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
context.getCurEntry().setBlockError(e);
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseBlockQps(count);
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
throw e;
}
}
这里首先一上来就是直接给后续的ProcessSlot 处理,看是否能够通过,如果通过,则会进行相关分析数据的记录,主要是两个方面:
而这实现都是在DefaultNode 中,这块比较重要,涉及到Sentinel是怎么记录分析数据的,我们先看记录请求数:
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
最后都是给了rollingCounterInSecond 和rollingCounterInMinute 去执行, 而这两个实现定义如下:
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
实现为一个ArrayMetric ,分别按照一秒钟和一分钟来统计,我们看看一秒钟是怎么统计的,这个就是根据QPS来限流的关键:
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
这里通过data.currentWindow 获取到了一个窗口,然后对窗口进行数据的更新,而这里的data实现为OccupiableBucketLeapArray ,继承自LeapArray:
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
而这里传入的sampleCount=2,intervalInMs=500
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
这里初始化的时候,几个参数比较重要,windowLengthInMs 是每个时间窗口的大小,这里可以看到,默认一个时间窗口的大小是500ms ,因此在Sentinel中默认秒级的窗口每次滑动的范围时500ms``即滑动窗口大小是500ms ,array数组的大小为2。 我们回到addPass 中:
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
我们看看LeapArray 中currentWindow怎么实现:
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {.
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
这里首选会通过calculateTimeIdx 计算当前的这个时间属于哪个array哪个元素里面,也即是属于哪个窗口:
private int calculateTimeIdx( long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
return (int)(timeId % array.length());
}
然后通过calculateWindowStart 计算当前时间对应的时间窗口的起始范围:
protected long calculateWindowStart(long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
说白了就是将某一段500ms的时间按照计算,得到一个相同的起始位置。 接下来,如果当前对应的时间窗口为空,那么直接新建一个窗口并通过cas设置到array中去,如果当前时间的windowStart和窗口的windowStart一样,那么就属于同一个窗口,返回该窗口;如果当前时间windowStart大于窗口的windowStart,那么证明该窗口已经过期了,需要滑动窗口,重置对应的窗口时间。 然后获取到窗口之后,通过wrap.value().addPass(count);
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
private final LongAdder[] counters;
可以看到,最终是通过LongAdder 来计数的。
FlowSlot
FlowSlot则是流控实现的核心。通过StatisticSlot 我们记录相关请求的统计信息,接下来在FlowSlot 进行流控的判断处理:
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
这里每个FlowSlot都会有一个FlowRuleChecker 来进行流控的检查。FlowRuleChecker 会通过FlowRuleManager 获取当前资源的所有流控规则FlowRule ,然后
public boolean canPassCheck( FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
如果是非Cluster模式(后面我们在说这个),则会通过rule.getRater() 去判断是否需要流控,这里的getRater 返回的是一个TrafficShapingController ,有如下几个实现: 我们看看默认的DefaultController 在基于QPS限流下怎么实现流控的:
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
这里首先会判断当前资源的请求总次数:
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
而这里的node.passQps 就是获取我们上面分析的窗口里面1秒内的两个窗口的总和然后除以1就得到了每秒的QPS,然后看看这个QPS是否大于我们预设的值,如果大于的话,而这里的prioritized 默认为false,即如果当前QPS大于指定的QPS的话,那么返回false,而在FlowSlot 中则会跑出一个FlowException,继承自BlockException 异常,这样限流的功能就实现了
DegradeSlot
DegradeSlot主要用来实现服务降级。我们看下其逻辑:
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
public boolean tryPass(Context context) {
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
同样这里服务降级也是在DegradeSlot获取CircuitBreaker ,然后交由CircuitBreaker 去判断。 在Sentinel服务降级的判断主要有两类:
ExceptionCircuitBreaker 通过异常统计信息来降级ResponseTimeCircuitBreaker 通过响应时间统计信息来降级 这两个都是继承自AbstractCircuitBreaker ,在AbstractCircuitBreaker 统一对断路器状态进行了维护,会对当前断路器状态进行判断识别:- 如果当前断路器状态是
State.CLOSED 关闭状态,直接返回通过, - 如果当前断路器状态是
State.OPEN 打开,那么会判断当前断路器从打开时到目前为止是否超过了配置的熔断时间,如果超过了熔断时长,那么会通过CAS将断路器状态从State.OPEN 打开变为State.HALF_OPEN 半打开,让请求通过,即常说在断路器半打开的状态下通过部分请求,但是这里会判断如果这次请求最终还是抛出异常,那么还是会通过CAS将断路器状态State.HALF_OPEN 变为State.OPEN ,返回本次请求通过,其他情况下,返回本次请求不通过 - 如果断路器是State.HALF_OPEN`,返回本次请求不通过
在ExceptionCircuitBreaker 和ResponseTimeCircuitBreaker 则通过各自不同的统计判断逻辑,设置断路器的状态,这里还有另外一点就是,在每次请求结束后,都会调用ProcessorSlot.exit 方法。ExceptionCircuitBreaker 和ResponseTimeCircuitBreaker 会在每次请求结束后记录相关的统计信息,在DegradeSlot 会进行如下逻辑处理:
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
if (curEntry.getBlockError() != null) {
fireExit(context, r, count, args);
return;
}
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
fireExit(context, r, count, args);
return;
}
if (curEntry.getBlockError() == null) {
for (CircuitBreaker circuitBreaker : circuitBreakers) {
circuitBreaker.onRequestComplete(context);
}
}
fireExit(context, r, count, args);
}
最终会调用circuitBreaker.onRequestComplete(context) ,我们看下ExceptionCircuitBreaker 中是怎么实现的:
this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs()));
}
public void onRequestComplete(Context context) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
counter.getErrorCount().add(1);
}
counter.getTotalCount().add(1);
handleStateChangeWhenThresholdExceeded(error);
}
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
if (currentState.get() == State.OPEN) {
return;
}
if (currentState.get() == State.HALF_OPEN) {
if (error == null) {
fromHalfOpenToClose();
} else {
fromHalfOpenToOpen(1.0d);
}
return;
}
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
curCount = errCount * 1.0d / totalCount;
}
if (curCount > threshold) {
transformToOpen(curCount);
}
}
这里统计异常和调用次数信息用的是一个SimpleErrorCounterLeapArray ,通过StatisticSlot 我们知道,这个里面就是借助WindowWrap 来统计的,ExceptionCircuitBreaker 中SimpleErrorCounterLeapArray传递的sample=1,时间大小为我们配置的大小,即其滑动窗口大小就是我们配置的窗口大小,比如我们配置了统计为10s,那么10s一个窗口统计完之后,会切换一个新的10s窗口,可以理解为没有滑动。 这里在请求结束后给请求记录+1,会判断本次调用是否抛出异常,如果抛出异常,会给异常记录+1, 然后设置断路器的状态1. 如果断路器状态是打开状态,本次依然抛出了异常,不用操作设置; 2. 如果本次记录没有抛出异常,证明执行成功,并且当前断路器状态是半打开,那么将断路器状态设置为关闭状态,如果本次抛出异常且断路器状态是半打开状态,那么将断路器状态设置为打开状态;3. 如果断路器是关闭状态,需要判断是否需要将断路器打开,首先判断当前时间窗口的总请求数量如果小于设置的最小请求数,直接返回如果设置的规则是异常比例占比,将错误请求总量/请求总量是否达到阈值,否则直接判断错误请求量是否达到阈值,如果达到了阈值,那么会将当前断路器状态设置为打开;
对于ResponseTimeCircuitBreaker 则是统计每次请求是否达到设置的超时时间,如果达到+1,最后在判断比例。
通过分析可以发现,Sentinel中的断路器如果满足条件,会直接从关闭转换到打开状态,而半打开状态的时间很少,当断路器为打开状态时,如果断路器打开时间超过了设置的熔断时间,那么会将断路器设置为半打开状态通过这次请求,如果这次请求通过,那么会关闭断路器,如果不通过,则设置为打开状态
ParamFlowSlot 热点参数限流
ParamFlowSlot则是对热点参数进行流控,系统中有时会因为某个原因,比如促销,导致某个请求固定参数的请求量急剧增大。ParamFlowSlot就是对这些热点请求进行限流处理。而当我们使用热点限流的规则时, 使用热点限流规则时需要按照如下方式调用:
SphU.entry(String name, EntryType trafficType, int batchCount, Object... args)
Sentinel中的热点流控实际上就是根据调用entry 传递的调用参数列表某个参数进行流控。而在Sentinel-dashboard中,热点流控配置页面如下:
ParamFlowSlot 处理逻辑如下:
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
if (args == null) {
return;
}
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
return;
}
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
for (ParamFlowRule rule : rules) {
applyRealParamIdx(rule, args.length);
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
String triggeredParam = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
首先会根据资源名称,获取到根当前资源相关的所有ParamFlowRule 集合,然后一个规则一个规则判断,每个规则设置的获取参数的位置可能不一样;如果ParameterMetricStorage 当前资源和当前ParamFlowRule没有对应的ParameterMetric 参数规则统计信息,那么会初始化一个,已经有了则不会;然后交由ParamFlowChecker 去检查处理,如果不通过,则抛出ParamFlowException 异常,ParamFlowChecker检查逻辑如下:
public static boolean passCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object... args) {
if (args == null) {
return true;
}
int paramIdx = rule.getParamIdx();
if (args.length <= paramIdx) {
return true;
}
Object value = args[paramIdx];
if (value instanceof ParamFlowArgument) {
value = ((ParamFlowArgument) value).paramFlowKey();
}
if (value == null) {
return true;
}
if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
return passClusterCheck(resourceWrapper, rule, count, value);
}
return passLocalCheck(resourceWrapper, rule, count, value);
}
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object value) {
try {
if (Collection.class.isAssignableFrom(value.getClass())) {
for (Object param : ((Collection)value)) {
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else if (value.getClass().isArray()) {
int length = Array.getLength(value);
for (int i = 0; i < length; i++) {
Object param = Array.get(value, i);
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else {
return passSingleValueCheck(resourceWrapper, rule, count, value);
}
} catch (Throwable e) {
RecordLog.warn("[ParamFlowChecker] Unexpected error", e);
}
return true;
}
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
} else {
return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
}
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
if (exclusionItems.contains(value)) {
int itemThreshold = rule.getParsedHotItems().get(value);
return ++threadCount <= itemThreshold;
}
long threshold = (long)rule.getCount();
return ++threadCount <= threshold;
}
return true;
}
这里会根据配置的参数索引位置,从参数列表中获取对应的参数热Key,注意,这里有一个需要注意的是如果获取到的参数是ParamFlowArgument 类型,那么参与判断的是ParamFlowArgument.paramFlowKey 否则就是该参数本身。对获取到的参数如果是集合或者是数组,则会遍历集合或数组中的每一项都进行判断。 我们以默认的QPS单点限流模式:
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
if (tokenCounters == null || timeCounters == null) {
return true;
}
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
long maxCount = tokenCount + rule.getBurstCount();
if (acquireCount > maxCount) {
return false;
}
while (true) {
long currentTime = TimeUtil.currentTimeMillis();
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}
long passTime = currentTime - lastAddTokenTime.get();
if (passTime > rule.getDurationInSec() * 1000) {
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
lastAddTokenTime.set(currentTime);
return true;
} else {
long restQps = oldQps.get();
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
if (newQps < 0) {
return false;
}
if (oldQps.compareAndSet(restQps, newQps)) {
lastAddTokenTime.set(currentTime);
return true;
}
Thread.yield();
}
} else {
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
long oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
return true;
}
} else {
return false;
}
}
Thread.yield();
}
}
}
在这种情况下Sentinel中并没有使用以往的窗口统计模式,而是窗口通过一个ConcurrentLinkedHashMapWrapper 来实现,当进行判断时通过将当前时间和key调用putIfAbsent 方法,获取到了一个AtomicLong 类型,值时一个时间戳,判断这个时间戳和当前时间的间隔是否超过了设置的时间间隔,如果超过了会重置这个AtomicLong的值为当前时间戳,并且判断这段时间的QPS,如果超过了预设的值,则不通过。
热点参数限流也带来一个问题,就是如果这个参数的值过多,有可能OO
AuthoritySlot 授权限制
一般配置页面如下:
授权限制比较简单,就是根据调用方来进行设置,但是这块需要注意的一点是,授权限制是根绝context.getOrigin 来进行限制的,也就是服务调用方,而默认情况下这个origin是空的,那么则可以在调用前进行如下处理:
Context context = ContextUtil.getContext();
if (context == null) {
context=trueEnter.trueEnter(Constants.CONTEXT_DEFAULT_NAME,'调用方名称'
}
else{
context.setOrigin('调用方名称')
}
AuthoritySlot 实现大致如下:
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
checkBlackWhiteAuthority(resourceWrapper, context);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
for (AuthorityRule rule : rules) {
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
static boolean passCheck(AuthorityRule rule, Context context) {
String requester = context.getOrigin();
if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
return true;
}
int pos = rule.getLimitApp().indexOf(requester);
boolean contain = pos > -1;
if (contain) {
boolean exactlyMatch = false;
String[] appArray = rule.getLimitApp().split(",");
for (String app : appArray) {
if (requester.equals(app)) {
exactlyMatch = true;
break;
}
}
contain = exactlyMatch;
}
int strategy = rule.getStrategy();
if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
return false;
}
if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
return false;
}
return true;
}
AuthoritySlot 的实现逻辑相对来说比较简单,就是判断context的origin和配置的调用方中是否有一致的,然后看是白名单还是黑名单,如果在是白名单则通过,如果在是黑名单则不通过。
如果通过上面的各种判断仍然通过正常不抛出异常,那么会返回一个 CtEntry,然后就可以执行真正的方法了,方法执行完之后需要调用 Entry.exit方法
上面就是整个Sentinel限流的一个大致原理。
|