利用aop、redis、注解来实现超时熔断机制
author:陈镇坤27
创建时间:2022年3月31日
创作不易,转载请注明来源
摘要:利用AOP切面、Reids、自定义注解等知识,实现简单的服务超时熔断。
——————————————————————————————
前言
c项目还有半小时要上线了,临时给我加个紧急任务。c服务调用弹窗广告时,需要用到某个三方api接口,该接口调用在某段时间内会频繁超时, 导致c服务一直不响应,连锁影响到Nginx认定该服务宕机,导致服务压力全部给到了其他机子。其他机子又出现这个问题,又给到别的机子,颇有我方唱罢你登场,你下场来我来唱的感觉。
计划使用超时熔断,但因为是ssm架构的旧项目,没有引入openFegin组件,用不了hystrix的熔断机制。
意外的是,项目里面有个陈年的注解,结合aop、redis进行了一个比较简单的熔断控制。
用了下,结果原始代码里面一堆bug,熔断功能根本不生效。一番操作下来,解决了几个bug,并且进行了一定的优化。
接下来给大家介绍下一个比较简单的,超时熔断控制机制。
核心思路
利用aop环切注解配置的方法,执行方法前后计算时长,若超时则以方法全路径名组合的key增值+1,该key设置存活时长。
当该key的值大小超过指定的大小,则存储一个熔断激活标识,标识设置存活时长。
此时,再存储一个恢复访问标识,该标识会在区间时间之后固定往redis存储一个不断增值的恢复熔断尝试key。
此后,其他的请求,都会向判断是否有开启熔断,有开启恢复访问标识,在标识启动的情况下,会减值访问,记录指定时间区间的非超时的请求次数。
在达到非超时成功次数后,会关闭熔断标识,关闭恢复访问标识。服务正常访问。
正文
废话不多说,贴代码
PS:需要注意,CacheService是项目二次封装的服务,把这些地方代码替换为对应的redis api或对应服务方法即可。
@Component
@Aspect
public class CircuitBreakerInterceptor {
@Autowired
private CacheService cacheService;
private static final int criterionMillOfTimeOut = 30 * 1000;
private static final int millWhenStartRecover = 20 * 60 * 1000;
private static final ReentrantLock lock = new ReentrantLock();
private static final String BREAKER_KEY = "breaker";
private static final String RECOVER_KEY = "recover";
private static final String FAILED_COUNTS_KEY = "failed_counts";
private static final String TIMEOUT_COUNTS_KEY = "timeout_counts";
private static final String RECOVER_COUNTS = "recover_counts";
private static final String SUCCESS_COUNTS = "success_counts";
@Pointcut("@annotation(com.emodor.attendance.annotation.CircuitBreaker)")
private void circuitBreake() {
}
@Around("circuitBreake()")
public void doAroundCircuitBreake(ProceedingJoinPoint pjp) throws Throwable {
MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
Method targetMethod = AopUtils.getMostSpecificMethod(methodSignature.getMethod(), pjp.getTarget().getClass());
String targetName = pjp.getTarget().getClass().getName();
String methodName = pjp.getSignature().getName();
CircuitBreaker circuitBreaker = targetMethod.getAnnotation(CircuitBreaker.class);
long startTime = System.currentTimeMillis();
String redisKey = getRedisKey(targetName, methodName);
String breakerFlag = cacheService.getAsString(redisKey + BREAKER_KEY);
String recoverFlag = cacheService.getAsString(redisKey + RECOVER_KEY);
if (!"on".equals(breakerFlag) || "on".equals(recoverFlag)) {
try {
if ("on".equals(recoverFlag)) {
String recoverCountsStr = cacheService.getAsString(redisKey + RECOVER_COUNTS);
long recoverCounts = 0;
if (StringUtils.isNotBlank(recoverCountsStr)) {
recoverCounts = Long.parseLong(recoverCountsStr);
}
if (recoverCounts > 0) {
recoverCounts = cacheService.decrement(redisKey + RECOVER_COUNTS);
pjp.proceed();
if (System.currentTimeMillis() - startTime <= criterionMillOfTimeOut) {
long successCounts = cacheService.increment(redisKey + SUCCESS_COUNTS, 30 * 60 * 1000L);
if (successCounts >= circuitBreaker.successCount()) {
cacheService.delete(redisKey + BREAKER_KEY);
cacheService.delete(redisKey + FAILED_COUNTS_KEY);
cacheService.delete(redisKey + TIMEOUT_COUNTS_KEY);
cacheService.delete(redisKey + RECOVER_KEY);
cacheService.delete(redisKey + SUCCESS_COUNTS);
logger.info("CircuitBreaker,服务器已经关闭熔断,熔断的方法为:" + redisKey);
}
}
}
} else {
pjp.proceed();
}
} catch (Exception ex) {
logger.info("调用失败" + redisKey,ex);
long errorCounts = cacheService.increment(redisKey + FAILED_COUNTS_KEY, circuitBreaker.timeRange() * 1000);
logger.info("调用失败次数" + redisKey + "," + errorCounts);
if (errorCounts >= circuitBreaker.failCount()) {
cacheService.put(redisKey + BREAKER_KEY, "on", 24 * 60 * 60 * 1000L);
logger.info("CircuitBreaker,服务器已经开启熔断,熔断的方法为:" + redisKey);
}
throw ex;
}
}
long endTime = System.currentTimeMillis();
if (endTime - startTime > criterionMillOfTimeOut) {
long timeoutCounts = cacheService.increment(redisKey + TIMEOUT_COUNTS_KEY, circuitBreaker.timeRange() * 1000);
if (timeoutCounts >= circuitBreaker.failCount() && !"on".equals(breakerFlag)) {
cacheService.put(redisKey + BREAKER_KEY, "on", 24 * 60 * 60 * 1000L);
logger.info("CircuitBreaker,服务器已经开启熔断,熔断的方法为:" + redisKey);
}
}
if ("on".equals(breakerFlag) && !"on".equals(recoverFlag)) {
lock.lock();
if (!"on".equals(recoverFlag)) {
cacheService.put(redisKey + RECOVER_KEY, "on",24 * 60 * 60 * 1000L);
new Thread() {
@Override
public void run() {
try {
Thread.sleep(millWhenStartRecover);
logger.info("开启半熔断恢复----------------------------------------");
}catch(Exception e) {
}
String breakerFlag = cacheService.getAsString(redisKey + BREAKER_KEY);
while ("on".equals(breakerFlag)) {
cacheService.delete(redisKey + RECOVER_COUNTS);
for (int i=0;i<10;i++) {
cacheService.increment(redisKey + RECOVER_COUNTS, 60 * 1000L);
}
try {
Thread.sleep(60 * 1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
breakerFlag = cacheService.getAsString(redisKey + BREAKER_KEY);
}
}
}.start();
}
lock.unlock();
}
}
private String getRedisKey(String targetName, String methodName) {
StringBuilder sb = new StringBuilder("");
sb.append("emodor.circuit.breaker.").append(targetName).append(".").append(methodName).append("_");
return sb.toString();
}
}
注解如下:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Documented
public @interface CircuitBreaker {
long failCount() default Long.MAX_VALUE;
long successCount() default 20;
long timeRange() default 1;
}
逻辑演示
环切指定的方法;
熔断标识和恢复访问标识初始为null,直接执行pjp.proceed()方法。
计算执行时长,大于30分钟则累加超时次数。
仅开启熔断标识时刻,会启动恢复访问标识,并在配置时间后创建一个线程,区间时间循环累加测试访问标识次数。
其他请求访问时,若有测试次数,则再执行访问,并记录非超时访问次数。若无测试次数可用,则继续熔断。
遗留的问题
该超时机制最大的问题,在于没有强制控制该方法超时断开连接。
举个例子:
设置100分钟内,超时100个的话,进行熔断。
认为大于30s即为超时。
此时若超时时长是2分钟,则Nginx的宕机判定仍然会被触发。
这个不足暂时没想到思路解决,后面如果学习到的话,会跟大家分享。
|