IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 利用aop、redis、注解等实现简单的熔断 -> 正文阅读

[大数据]利用aop、redis、注解等实现简单的熔断

利用aop、redis、注解来实现超时熔断机制

author:陈镇坤27

创建时间:2022年3月31日

创作不易,转载请注明来源

摘要:利用AOP切面、Reids、自定义注解等知识,实现简单的服务超时熔断。


——————————————————————————————

前言

c项目还有半小时要上线了,临时给我加个紧急任务。c服务调用弹窗广告时,需要用到某个三方api接口,该接口调用在某段时间内会频繁超时, 导致c服务一直不响应,连锁影响到Nginx认定该服务宕机,导致服务压力全部给到了其他机子。其他机子又出现这个问题,又给到别的机子,颇有我方唱罢你登场,你下场来我来唱的感觉。

计划使用超时熔断,但因为是ssm架构的旧项目,没有引入openFegin组件,用不了hystrix的熔断机制。

意外的是,项目里面有个陈年的注解,结合aop、redis进行了一个比较简单的熔断控制。

用了下,结果原始代码里面一堆bug,熔断功能根本不生效。一番操作下来,解决了几个bug,并且进行了一定的优化。

接下来给大家介绍下一个比较简单的,超时熔断控制机制。

核心思路

利用aop环切注解配置的方法,执行方法前后计算时长,若超时则以方法全路径名组合的key增值+1,该key设置存活时长。

当该key的值大小超过指定的大小,则存储一个熔断激活标识,标识设置存活时长。

此时,再存储一个恢复访问标识,该标识会在区间时间之后固定往redis存储一个不断增值的恢复熔断尝试key。

此后,其他的请求,都会向判断是否有开启熔断,有开启恢复访问标识,在标识启动的情况下,会减值访问,记录指定时间区间的非超时的请求次数。

在达到非超时成功次数后,会关闭熔断标识,关闭恢复访问标识。服务正常访问。

正文

废话不多说,贴代码

PS:需要注意,CacheService是项目二次封装的服务,把这些地方代码替换为对应的redis api或对应服务方法即可。

@Component
@Aspect
public class CircuitBreakerInterceptor {


	@Autowired
	private CacheService cacheService;

	private static final int criterionMillOfTimeOut =  30 * 1000;
	private static final int millWhenStartRecover = 20 * 60 * 1000;

	private static final ReentrantLock lock = new ReentrantLock();
	
	private static final String BREAKER_KEY = "breaker";
	private static final String RECOVER_KEY = "recover";
	private static final String FAILED_COUNTS_KEY = "failed_counts";
	private static final String TIMEOUT_COUNTS_KEY = "timeout_counts";
	private static final String RECOVER_COUNTS = "recover_counts";
	private static final String SUCCESS_COUNTS = "success_counts";

	@Pointcut("@annotation(com.emodor.attendance.annotation.CircuitBreaker)")
	private void circuitBreake() {
	}

	// 声明环绕通知
	@Around("circuitBreake()")
	public void doAroundCircuitBreake(ProceedingJoinPoint pjp) throws Throwable {

		MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
		Method targetMethod = AopUtils.getMostSpecificMethod(methodSignature.getMethod(), pjp.getTarget().getClass());
		String targetName = pjp.getTarget().getClass().getName();
		String methodName = pjp.getSignature().getName();

		CircuitBreaker circuitBreaker = targetMethod.getAnnotation(CircuitBreaker.class);

		long startTime = System.currentTimeMillis();

		String redisKey = getRedisKey(targetName, methodName);
		String breakerFlag = cacheService.getAsString(redisKey + BREAKER_KEY);
		String recoverFlag = cacheService.getAsString(redisKey + RECOVER_KEY);
		// 如果开启了熔断
		if (!"on".equals(breakerFlag) || "on".equals(recoverFlag)) {
			try {
				if ("on".equals(recoverFlag)) {
					//	这里应该加锁,保证查询与减值是原子操作。留待各位优化
					String recoverCountsStr = cacheService.getAsString(redisKey + RECOVER_COUNTS);
					long recoverCounts = 0;
					if (StringUtils.isNotBlank(recoverCountsStr)) {
						recoverCounts = Long.parseLong(recoverCountsStr);
					}
					if (recoverCounts > 0) {
						recoverCounts = cacheService.decrement(redisKey + RECOVER_COUNTS);
						pjp.proceed();
						if (System.currentTimeMillis() - startTime <= criterionMillOfTimeOut) {
							long successCounts = cacheService.increment(redisKey + SUCCESS_COUNTS, 30 * 60 * 1000L);
							// 30分钟内成功次数达到规定值可以关闭熔断
							if (successCounts >= circuitBreaker.successCount()) {
								// 关闭熔断,清除相关key
								cacheService.delete(redisKey + BREAKER_KEY);
								cacheService.delete(redisKey + FAILED_COUNTS_KEY);
								cacheService.delete(redisKey + TIMEOUT_COUNTS_KEY);
								cacheService.delete(redisKey + RECOVER_KEY);
								cacheService.delete(redisKey + SUCCESS_COUNTS);
								logger.info("CircuitBreaker,服务器已经关闭熔断,熔断的方法为:"  + redisKey);
							}
						}
					}
				} else {
					pjp.proceed();
				}

			} catch (Exception ex) {
				logger.info("调用失败" + redisKey,ex);
				long errorCounts = cacheService.increment(redisKey + FAILED_COUNTS_KEY, circuitBreaker.timeRange() * 1000);
				logger.info("调用失败次数" + redisKey + "," + errorCounts);
				// 如果错误超过阀值则进行熔断降级
				if (errorCounts >= circuitBreaker.failCount()) {
					cacheService.put(redisKey + BREAKER_KEY, "on", 24 * 60 * 60 * 1000L);
					logger.info("CircuitBreaker,服务器已经开启熔断,熔断的方法为:"  + redisKey);
				}
				throw ex;
			}
		}
		long endTime = System.currentTimeMillis();

		// 如果执行超过0.5分钟
		if (endTime - startTime > criterionMillOfTimeOut) {
			long timeoutCounts = cacheService.increment(redisKey + TIMEOUT_COUNTS_KEY, circuitBreaker.timeRange() * 1000);
			// 如果超时超过阀值则进行熔断降级
			if (timeoutCounts >= circuitBreaker.failCount() && !"on".equals(breakerFlag)) {
				//熔断有效时间24小时
				cacheService.put(redisKey + BREAKER_KEY, "on", 24 * 60 * 60 * 1000L);
				logger.info("CircuitBreaker,服务器已经开启熔断,熔断的方法为:"  + redisKey);
			}
		}
		
		// 熔断进行恢复half_open
		if ("on".equals(breakerFlag) && !"on".equals(recoverFlag)) {
			lock.lock();
			if (!"on".equals(recoverFlag)) {
				cacheService.put(redisKey + RECOVER_KEY, "on",24 * 60 * 60 * 1000L);
				new Thread() {
					@Override
					public void run() {
						//20分钟之后开启熔断恢复halfOpen
						try {
							Thread.sleep(millWhenStartRecover);
							logger.info("开启半熔断恢复----------------------------------------");
						}catch(Exception e) {
						}
						String breakerFlag = cacheService.getAsString(redisKey + BREAKER_KEY);
						while ("on".equals(breakerFlag)) {
							cacheService.delete(redisKey + RECOVER_COUNTS);
							// 每60秒放10个进去
							for (int i=0;i<10;i++) {
								cacheService.increment(redisKey + RECOVER_COUNTS, 60 * 1000L);
							}
							try {
								Thread.sleep(60 * 1000L);
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
							breakerFlag = cacheService.getAsString(redisKey + BREAKER_KEY);
						}

					}
				}.start();
			}
			lock.unlock();
		}
	}

	/**
	 * 获取缓存的key值
	 */
	private String getRedisKey(String targetName, String methodName) {
		StringBuilder sb = new StringBuilder("");
		sb.append("emodor.circuit.breaker.").append(targetName).append(".").append(methodName).append("_");
		return sb.toString();
	}
}

注解如下:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Documented
public @interface CircuitBreaker {
	/**
     * 失败次数进行熔断,默认值MAX_VALUE
     */
    long failCount() default Long.MAX_VALUE;
    /**
     * 成功次数关闭熔断 默认值 20
     * @return
     */
    long successCount() default 20;
    /**
     * 时间段,单位秒,默认每秒限流大小
     */
    long timeRange() default 1;
}

逻辑演示

环切指定的方法;

熔断标识和恢复访问标识初始为null,直接执行pjp.proceed()方法。

计算执行时长,大于30分钟则累加超时次数。

仅开启熔断标识时刻,会启动恢复访问标识,并在配置时间后创建一个线程,区间时间循环累加测试访问标识次数。

其他请求访问时,若有测试次数,则再执行访问,并记录非超时访问次数。若无测试次数可用,则继续熔断。

遗留的问题

该超时机制最大的问题,在于没有强制控制该方法超时断开连接。

举个例子:

设置100分钟内,超时100个的话,进行熔断。

认为大于30s即为超时。

此时若超时时长是2分钟,则Nginx的宕机判定仍然会被触发。

这个不足暂时没想到思路解决,后面如果学习到的话,会跟大家分享。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-01 23:28:20  更:2022-04-01 23:28:24 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 14:56:29-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码