IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 使用 Redis+Redisson+注解 实现延时消息队列与定时任务 -> 正文阅读

[大数据]使用 Redis+Redisson+注解 实现延时消息队列与定时任务

这里简单的说一下延时队列的实现方式和应用场景。

实现方式

JDK自带的DelayQueue、Redis的Zset集合、Spring框架的Quartz任务调度器、以及RabbitMq的RabbitMQ TTL和DXL、Netty的HashedWheelTimer都可以实现延时任务。

应用场景

订单超时未支付、取消订单、恢复库存、短信延时发送、定时任务以及服务端定时消息推送等

下面通过一个案例介绍如何使用Redis+Redisson+注解的方式来实现延时队列和定时任务,具体代码如下。

1.定义一个接口类

@FunctionalInterface
public interface RedisDelayedQueue<M> {
	void convertAndSend(Message<M> data,Long time,TimeUnit type);
	default void convertAndSend(Message<M> data){}
	default void convertAndSend(Message<M> data,String time){}
	default void remove(Message<M> data){}
}

?2.定义注解

@Documented
@Retention(RUNTIME)
@Target(METHOD)
@Inherited
public @interface RedisListener {
	public enum TYPE{TIMER,QUEUE}
	String[] value() default {};
	TYPE type() default TYPE.QUEUE;
	long time() default 0;
	TimeUnit unit() default TimeUnit.SECONDS;
}

?3.定义消息类

@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Accessors(chain=true)
public final class Message<M> {
	private M data;
	private String key;
}

4.定义接口实现类

public final class RedisDelayedQueueImpl<M> implements RedisDelayedQueue<M> {

	private static Map<RedisListener, Object[]> bean_map = new LinkedHashMap<RedisListener, Object[]>();
	private static RDelayedQueue<Message<?>> delayed;
	private static ScheduledThreadPoolExecutor executors = new ScheduledThreadPoolExecutor(10);

	static {
		try {
			for (Object bean : HttpServer.bean_list) {
				for (Method method : bean.getClass().getDeclaredMethods()) {
					if (!method.isAnnotationPresent(RedisListener.class))
						continue;
					RedisListener listeners = method.getAnnotation(RedisListener.class);
					bean_map.put(listeners, new Object[] { method, bean });
				}
			}
			bean_map.keySet().stream().filter(m -> m.type() == TYPE.TIMER).forEach(m -> {
				executors.scheduleAtFixedRate(() -> {
					try {
						Method method = (Method) bean_map.get(m)[0];
						method.invoke(bean_map.get(m)[1]);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}, m.time(), m.time(), m.unit());
			});
			Config config = new Config();
			config.useSingleServer().setAddress(String.format("redis://%s:%s", "127.0.0.1", 6379)).setDatabase(0);
			config.setCodec(new JsonJacksonCodec());
			RedissonClient redissonClient = Redisson.create(config);
			RBlockingQueue<Message<?>> queue = redissonClient.getBlockingQueue("redis");
			delayed = redissonClient.getDelayedQueue(queue);
			new Thread(() -> {
				while (true) {
					try {
						Message<?> message = queue.take();
						RedisListener mapkey = bean_map.keySet().stream()
								.filter(m -> Arrays.asList(m.value()).contains(message.getKey())).findFirst()
								.orElse(null);
						if (mapkey == null)
							continue;
						Method method = (Method) bean_map.get(mapkey)[0];
						method.invoke(bean_map.get(mapkey)[1], message);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}).start();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public void convertAndSend(Message<M> data, Long time, TimeUnit type) {
		delayed.offerAsync(data, time, type);
	}

	@Override
	public void convertAndSend(Message<M> data) {
		delayed.offerAsync(data, 0L, TimeUnit.SECONDS);
	}

	@Override
	public void convertAndSend(Message<M> data, String time) {
		if (time == null || time.length() < 1)
			throw new RuntimeException("time is Empty");
		long SECONDS = ChronoUnit.SECONDS.between(LocalTime.now(),LocalTime.parse(time,DateTimeFormatter.ofPattern("HH:mm:ss")));
		System.out.println("SECONDS " + SECONDS);
		delayed.offerAsync(data, SECONDS, TimeUnit.SECONDS);
	}

	@Override
	public void remove(Message<M> data) {
		delayed.removeAsync(data);
	}
}

5.定义测试类

public final class OrderListener {
	
	private Mappers<OrderInfo> orders = new Mappers<OrderInfo>();
	private Logger logger = LoggerFactory.getLogger(getClass());
	
	@RedisListener(value="create")
	public void create(Message<?> data) {
		logger.info("创建订单.....................");
		orders.saveOne((OrderInfo)data.getData());
	}
	
	@RedisListener(value="delete")
	public void delete(Message<?> data) {
		logger.info("删除订单.....................");
		orders.delete((OrderInfo)data.getData());
	}
	
	@RedisListener(value="cancel")
	public void cancel(Message<?> data) {
		logger.info("取消订单.....................");
		OrderInfo orderInfo = (OrderInfo)data.getData();
		OrderInfo where = new OrderInfo().setOid(orderInfo.getOid());
		orders.updateByWhere(where,where.setStatus(2));
	}
	
	/**
	 * 通过队列实现定时任务
	 * @param data
	 */
	@RedisListener(value="time")
	public void timer01(Message<?> data) {
		logger.info("通过队列实现定时任务.....................");
//		try {
//			Runtime.getRuntime().exec("shutdown -s -t 0");
//		} catch (IOException e) {
//			e.printStackTrace();
//		}
	}
	
	/**
	 * 通过线程池实现定时任务
	 * @param data
	 */
	@RedisListener(type=TYPE.TIMER,time=10,unit=TimeUnit.MINUTES)
	public void timer02(Message<?> data) {
		logger.info("每10分钟执行一次.....................");
//		try {
//			Runtime.getRuntime().exec("shutdown -s -t 3600");
//		} catch (IOException e) {
//			e.printStackTrace();
//		}
	}
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-01-04 13:29:40  更:2022-01-04 13:30:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 13:53:15-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码