准备工作
1.了解lua语言的基本类型、语法、简单函数等(学习一下)
2.搭建redis集群环境redis-cli 3.2.11
3.自定义注解以及切面配置拦截器
1.项目配置
<!-- spring切面 -->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.6</version>
</dependency>
<!-- jedisCluster -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.1.1</version>
</dependency>
2.限流注解配置
限流注解支持配置时间段(限流需要统计的时间片段)、水流速率(漏桶算法中的水流速、令牌桶算法中的生成令牌速率)、最大访问数(限流阀值)、限流方式(目前支持自定义key和IP,部分代码没贴出来,我把git地址贴在文章最后,有需要可以看一下)、自定义key(需要限流的key,支持spel表达式)、限流算法(本篇文章主要讲市面上主流的几种算法:滑动窗口、漏桶算法、令牌桶算法)
package com.dubbo.core.flowcontrol;
import com.dubbo.core.flowcontrol.enums.AlgorithmEnum;
import com.dubbo.core.flowcontrol.enums.FlowControlEnum;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 限流
* @author lizixiang
* @since 2022/4/8
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface FlowControl {
/**
* 时间段(s)
*/
long duration() default 60;
/**
* 水流速率(/s)
*/
int flowRate() default 1;
/**
* 最大访问数
*/
int max() default 1000;
/**
* 限流方式
*/
FlowControlEnum[] method() default FlowControlEnum.IP;
/**
* 自定义key
*/
String key() default "" ;
/**
* 限流算法
*/
AlgorithmEnum algorithm() default AlgorithmEnum.SLIDE_WINDOW;
}
3.配置切面
注解的参数不是所有都用到,后面会仔细讲解,切面主要就是对注解参数的解析再封装,我这里只写了自定义key,获取IP的方式各个公司都不一样,这里就不贴出来了。最后一句代码用到了自定义工厂类和策略模式的组合,不同的算法写到了指定的策略类当中,看不懂的同学自行百度学习一下。
strategy.flowControl(dto);
package com.dubbo.core.flowcontrol;
import com.dubbo.core.flowcontrol.enums.AlgorithmEnum;
import com.dubbo.core.flowcontrol.enums.FlowControlEnum;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
/**
* @author lizixiang
* @since 2022/4/8
*/
@Aspect
@Component
public class FlowControlAspect {
private static final Logger logger = LoggerFactory.getLogger(FlowControlAspect.class);
@Pointcut("@annotation(com.dubbo.core.flowcontrol.FlowControl)")
public void flowControl() {
}
@Before(value = "flowControl()")
public void before(JoinPoint pjp) {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
FlowControl flowControl = method.getAnnotation(FlowControl.class);
String key = flowControl.key();
AlgorithmEnum algorithm = flowControl.algorithm();
FlowControlEnum[] enums = flowControl.method();
long duration = flowControl.duration();
int flowRate = flowControl.flowRate();
int max = flowControl.max();
for (FlowControlEnum controlEnum :enums){
FlowControlStrategy strategy = FlowControlStrategyFactory.getFlowControlStrategy(algorithm);
FlowControlDto dto = FlowControlDto.builder().key(key).algorithm(algorithm).method(controlEnum).duration(duration).flowRate(flowRate).max(max).build();
switch (controlEnum) {
case KEY:
if (pjp.getArgs() != null && pjp.getArgs().length > 0) {
StandardEvaluationContext context = new StandardEvaluationContext();
LocalVariableTableParameterNameDiscoverer discover = new LocalVariableTableParameterNameDiscoverer();
String[] parameterNames = discover.getParameterNames(method);
Object[] args = pjp.getArgs();
for (int i = 0; i < parameterNames.length; i++) {
context.setVariable(parameterNames[i], args[i]);
}
SpelExpressionParser parser = new SpelExpressionParser();
String parseKey = parser.parseExpression(key).getValue(context, String.class);
dto.setKey(parseKey);
}
break;
case IP:
break;
}
strategy.flowControl(dto);
}
}
}
4.滑动窗口
具体逻辑:
定义S秒内最大请求数为M,当前请求数N
当N<M,缓存push最新请求以及时间
当N>M,取最早的请求及时间,当前时间-最早的时间<S,则认为在S秒内请求数超过了M(关键点),触发限流,否则继续缓存push最新请求以及时间,并清掉缓存最早的那一条请求
package com.dubbo.core.flowcontrol;
import com.dubbo.core.exception.ErrorCode;
import com.dubbo.core.exception.ServiceException;
import com.dubbo.core.flowcontrol.enums.AlgorithmEnum;
import com.dubbo.core.util.RedisUtils;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 滑动窗口
* @author lizixiang
* @since 2022/4/8
*/
@Component
public class SlideWindowStrategy extends AbstractFlowControlStrategy {
private static final Logger logger = LoggerFactory.getLogger(SlideWindowStrategy.class);
@Autowired
private RedisUtils redisUtils;
@Override
protected void doControl(FlowControlDto dto) {
StringBuilder script = new StringBuilder();
script.append("redis.replicate_commands()");
script.append("\n local count = redis.call('llen', KEYS[1])");
script.append("\n if count and tonumber(count) < tonumber(ARGV[1]) then");
script.append("\n local now = redis.call('time')");
script.append("\n redis.call('lpush', KEYS[1], now[1] * 1000000 + now[2])");
script.append("\n redis.call('expire', KEYS[1], tonumber(ARGV[2]) + 1)");
script.append("\n else");
script.append("\n local now = redis.call('time')");
script.append("\n local time = redis.call('lindex', KEYS[1], -1)");
script.append("\n if now[1] * 1000000 + now[2] - time < tonumber(ARGV[2]) * 1000000 then");
script.append("\n return -1");
script.append("\n else");
script.append("\n redis.call('lpush', KEYS[1], now[1] * 1000000 + now[2])");
script.append("\n redis.call('expire', KEYS[1], tonumber(ARGV[2]) + 1)");
script.append("\n redis.call('ltrim', KEYS[1], 0, tonumber(ARGV[1]) - 1)");
script.append("\n end");
script.append("\n end");
script.append("\n return 1");
long r = (long) redisUtils.eval(script.toString(), Lists.newArrayList(dto.getKey()), Lists.newArrayList(dto.getMax() + "", dto.getDuration() + ""));
if (r == -1L) {
logger.error("资源:"+ dto.getKey() +",在"+ dto.getDuration() +"秒内调用超过"+ dto.getMax() +"次,触发限流");
throw new ServiceException("在"+ dto.getDuration() +"秒内调用超过"+ dto.getMax() +"次,触发限流", ErrorCode.FLOW_CONTROL);
}
}
@PostConstruct
public void init() {
FlowControlStrategyFactory.register(AlgorithmEnum.SLIDE_WINDOW, this);
}
}
?
优点: 1.滑动窗口解决了计数器算法的临界值问题(不懂自行百度),实现简单。 2.可以解决少量流量突发的业务场景,但是流量突发多见于微博、直播、电商等行业内,且令牌桶算法更被行业认可 缺点: 1.滑动窗口的时间复杂度为O(n),因为这里采用list数据结构,查询、修改元素需要逐个查询链表指针,效率要比令牌桶算法低一些。 2.滑动窗口需要存储更多的数据,存储N(请求阀值)个数据,过期时间为S(S为一个滑动窗口的计算周期),随着窗口细粒度越高,流量峰值设的越高,存储的数据也就越大
5.漏桶算法
具体逻辑: 定义水流速S、水桶容量L、当前水量C 当C>L,返回-1,触发限流 当C<L,首先缓存取上一次请求的时间,(当前时间(秒)-上一次请求时间(秒))*S = 流水量 将redis list通过ltrim裁剪已流出的水量,并将当前请求push到缓存当中 L/S代表漏桶流空所需的时间,并将其设为过期时间
?
package com.dubbo.core.flowcontrol;
import com.dubbo.core.exception.ErrorCode;
import com.dubbo.core.exception.ServiceException;
import com.dubbo.core.flowcontrol.enums.AlgorithmEnum;
import com.dubbo.core.util.RedisUtils;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 漏桶算法
* @author lizixiang
* @since 2022/4/8
*/
@Component
public class LeakyBucketStrategy extends AbstractFlowControlStrategy {
private static final Logger logger = LoggerFactory.getLogger(LeakyBucketStrategy.class);
@Autowired
private RedisUtils redisUtils;
@Override
protected void doControl(FlowControlDto dto) {
StringBuilder script = new StringBuilder();
script.append("redis.replicate_commands()");
script.append("\n local count = redis.call('llen', KEYS[1])");
script.append("\n if count and tonumber(count) < tonumber(ARGV[1]) then");
script.append("\n local now = redis.call('time')");
script.append("\n if count and tonumber(count) > 0 then");
script.append("\n local time = redis.call('lindex', KEYS[1], tonumber(count) - 1)");
script.append("\n local t = ((now[1] - tonumber(time)) * ARGV[2])");
script.append("\n redis.call('ltrim', KEYS[1], tonumber(t), tonumber(count) - 1)");
script.append("\n end");
script.append("\n redis.call('lpush', KEYS[1], now[1])");
script.append("\n redis.call('expire', KEYS[1], (ARGV[1] / tonumber(ARGV[2])) + 1)");
script.append("\n else");
script.append("\n return -1");
script.append("\n end");
script.append("\n return 1");
long r = (long) redisUtils.eval(script.toString(), Lists.newArrayList(dto.getKey()), Lists.newArrayList(dto.getMax() + "", dto.getFlowRate() + ""));
if (r == -1L) {
logger.error("资源:"+ dto.getKey() +",调用超过"+ dto.getMax() +"次,触发限流");
throw new ServiceException("资源:"+ dto.getKey() +",调用超过"+ dto.getMax() +"次,触发限流", ErrorCode.FLOW_CONTROL);
}
}
@PostConstruct
public void init() {
FlowControlStrategyFactory.register(AlgorithmEnum.LEAKY_BUCKET, this);
}
}
优点: 1.漏桶算法不允许流量突发,一般适用于银行金融项目或者调取第三方api 缺点: 1.漏桶算法的时间复杂度为O(n),因为这里采用list数据结构,查询、修改元素需要逐个查询链表指针,效率要比令牌桶算法低一些。 2.漏桶算法是以一个恒定的速率出水,瞬时进来一批请求,如果进水量大于漏桶容量则溢出。多余的请求触发限流。 3.漏桶算法需要存储更多的数据,存储N(请求阀值)个数据,过期时间为水桶容量L/水流速S
6.令牌桶算法
具体逻辑: 定义桶容量M、当前令牌量C、令牌增速S 单独起线程C每秒增加S,当C=M,不再继续增加 当C>0,获取令牌,否则触发限流
package com.dubbo.core.flowcontrol;
import com.dubbo.core.exception.ErrorCode;
import com.dubbo.core.exception.ServiceException;
import com.dubbo.core.flowcontrol.enums.AlgorithmEnum;
import com.dubbo.core.util.LuaUtils;
import com.dubbo.core.util.RedisUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 令牌桶算法
* @author lizixiang
* @since 2022/4/24
* @return -1:触发限流 0:代表首次请求
*/
@Component
public class TokenBucketStrategy extends AbstractFlowControlStrategy{
private static final Logger logger = LoggerFactory.getLogger(TokenBucketStrategy.class);
private static ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1);
private static final String TOKEN_BUCKETS = "tokenbuckets-{wl}";
@Autowired
private RedisUtils redisUtils;
@Autowired
private LuaUtils luaUtils;
@PostConstruct
public void init() {
FlowControlStrategyFactory.register(AlgorithmEnum.TOKEN_BUCKET, this);
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
StringBuilder script = new StringBuilder();
script.append("redis.replicate_commands()");
script.append(luaUtils.buildSplitFunc());
script.append("\n local map = redis.call('hgetall', KEYS[1])");
script.append("\n for i = 1, #map, 2 do");
script.append("\n local field = map[i]");
script.append("\n local t = field:split('-')");
script.append("\n local max = t[2]");
script.append("\n local flowRate = t[3]");
script.append("\n local count = map[i+1]");
script.append("\n if count and tonumber(count) <= (tonumber(max) - tonumber(flowRate)) then");
script.append("\n redis.call('hincrBy', KEYS[1], field, tonumber(flowRate))");
script.append("\n end");
script.append("\n end");
script.append("\n return map");
String fields = redisUtils.eval(script.toString(), 1, TOKEN_BUCKETS).toString();
logger.info("current token buckets:{}", fields);
}
}, 10, 1, TimeUnit.SECONDS);
}
@Override
protected void doControl(FlowControlDto dto) {
StringBuilder script = new StringBuilder();
script.append("redis.replicate_commands()");
script.append("\n local count = redis.call('hget', KEYS[1], KEYS[2])");
script.append("\n if count then");
script.append("\n if tonumber(count) > 0 then");
script.append("\n local incr = redis.call('hincrBy', KEYS[1], KEYS[2], -1)");
script.append("\n return incr");
script.append("\n else");
script.append("\n return -1");
script.append("\n end");
script.append("\n else");
script.append("\n redis.call('hset', KEYS[1], KEYS[2], 0)");
script.append("\n return 0");
script.append("\n end");
long r = (long) redisUtils.eval(script.toString(), 2, new String[]{TOKEN_BUCKETS, (dto.getKey() + "-" + dto.getMax() + "-" + dto.getFlowRate() + "-{wl}")});
logger.info("current token buckets count:{}", r);
if (r == -1L) {
logger.warn("资源:"+ dto.getKey() +",调用超过"+ dto.getMax() +"次,触发限流");
throw new ServiceException("资源:"+ dto.getKey() +",调用超过"+ dto.getMax() +"次,触发限流", ErrorCode.FLOW_CONTROL);
}
}
}
?优点: 1.为了支持可配置多个令牌key,这里我们采用哈希结构,查找,修改元素只需要找到对应的key即可,时间复杂度为O(1)。 2.令牌桶算法存储所需要的空间更小,存储令牌key以及当前令牌数 3.令牌桶算法允许流量突发,因为拿令牌不需要耗费时间,适用于电商、微博、直播等业务 缺点: 1.令牌桶算法是以一个恒定的速率发放令牌,达到阀值不再继续发放,如果在短时间内一批请求进来,可能会出现有的请求拿不到令牌,即触发了限流。
测试demo:
@FlowControl(key = "testSlideWindow", method = FlowControlEnum.KEY, max = 10, duration = 60)
@GetMapping("/testSlideWindow")
public void testSlideWindow() {
System.out.println(10);
}
@FlowControl(key = "testLeakyBucket", method = FlowControlEnum.KEY, algorithm = AlgorithmEnum.LEAKY_BUCKET, max = 10, flowRate = 1)
@GetMapping("/testLeakyBucket")
public void testLeakyBucket() {
System.out.println(11);
}
@FlowControl(key = "testTokenBucket", method = FlowControlEnum.KEY, algorithm = AlgorithmEnum.TOKEN_BUCKET, max = 10, flowRate = 1)
@GetMapping("/testTokenBucket")
public String testTokenBucket() {
return "success";
}
如您有建议和想法,欢迎吐槽!
git地址:dubbo-zk/dubbo-core/src/main/java/com/dubbo/core/flowcontrol at main · Lizixiang/dubbo-zk · GitHub基于dubbo+zk脚手架. Contribute to Lizixiang/dubbo-zk development by creating an account on GitHub.https://github.com/Lizixiang/dubbo-zk/tree/main/dubbo-core/src/main/java/com/dubbo/core/flowcontrol
|