这里简单的说一下延时队列的实现方式和应用场景。
实现方式
JDK自带的DelayQueue、Redis的Zset集合、Spring框架的Quartz任务调度器、以及RabbitMq的RabbitMQ TTL和DXL、Netty的HashedWheelTimer都可以实现延时任务。
应用场景
订单超时未支付、取消订单、恢复库存、短信延时发送、定时任务以及服务端定时消息推送等
下面通过一个案例介绍如何使用Redis+Redisson+注解的方式来实现延时队列和定时任务,具体代码如下。
1.定义一个接口类
@FunctionalInterface
public interface RedisDelayedQueue<M> {
void convertAndSend(Message<M> data,Long time,TimeUnit type);
default void convertAndSend(Message<M> data){}
default void convertAndSend(Message<M> data,String time){}
default void remove(Message<M> data){}
}
?2.定义注解
@Documented
@Retention(RUNTIME)
@Target(METHOD)
@Inherited
public @interface RedisListener {
public enum TYPE{TIMER,QUEUE}
String[] value() default {};
TYPE type() default TYPE.QUEUE;
long time() default 0;
TimeUnit unit() default TimeUnit.SECONDS;
}
?3.定义消息类
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Accessors(chain=true)
public final class Message<M> {
private M data;
private String key;
}
4.定义接口实现类
public final class RedisDelayedQueueImpl<M> implements RedisDelayedQueue<M> {
private static Map<RedisListener, Object[]> bean_map = new LinkedHashMap<RedisListener, Object[]>();
private static RDelayedQueue<Message<?>> delayed;
private static ScheduledThreadPoolExecutor executors = new ScheduledThreadPoolExecutor(10);
static {
try {
for (Object bean : HttpServer.bean_list) {
for (Method method : bean.getClass().getDeclaredMethods()) {
if (!method.isAnnotationPresent(RedisListener.class))
continue;
RedisListener listeners = method.getAnnotation(RedisListener.class);
bean_map.put(listeners, new Object[] { method, bean });
}
}
bean_map.keySet().stream().filter(m -> m.type() == TYPE.TIMER).forEach(m -> {
executors.scheduleAtFixedRate(() -> {
try {
Method method = (Method) bean_map.get(m)[0];
method.invoke(bean_map.get(m)[1]);
} catch (Exception e) {
e.printStackTrace();
}
}, m.time(), m.time(), m.unit());
});
Config config = new Config();
config.useSingleServer().setAddress(String.format("redis://%s:%s", "127.0.0.1", 6379)).setDatabase(0);
config.setCodec(new JsonJacksonCodec());
RedissonClient redissonClient = Redisson.create(config);
RBlockingQueue<Message<?>> queue = redissonClient.getBlockingQueue("redis");
delayed = redissonClient.getDelayedQueue(queue);
new Thread(() -> {
while (true) {
try {
Message<?> message = queue.take();
RedisListener mapkey = bean_map.keySet().stream()
.filter(m -> Arrays.asList(m.value()).contains(message.getKey())).findFirst()
.orElse(null);
if (mapkey == null)
continue;
Method method = (Method) bean_map.get(mapkey)[0];
method.invoke(bean_map.get(mapkey)[1], message);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void convertAndSend(Message<M> data, Long time, TimeUnit type) {
delayed.offerAsync(data, time, type);
}
@Override
public void convertAndSend(Message<M> data) {
delayed.offerAsync(data, 0L, TimeUnit.SECONDS);
}
@Override
public void convertAndSend(Message<M> data, String time) {
if (time == null || time.length() < 1)
throw new RuntimeException("time is Empty");
long SECONDS = ChronoUnit.SECONDS.between(LocalTime.now(),LocalTime.parse(time,DateTimeFormatter.ofPattern("HH:mm:ss")));
System.out.println("SECONDS " + SECONDS);
delayed.offerAsync(data, SECONDS, TimeUnit.SECONDS);
}
@Override
public void remove(Message<M> data) {
delayed.removeAsync(data);
}
}
5.定义测试类
public final class OrderListener {
private Mappers<OrderInfo> orders = new Mappers<OrderInfo>();
private Logger logger = LoggerFactory.getLogger(getClass());
@RedisListener(value="create")
public void create(Message<?> data) {
logger.info("创建订单.....................");
orders.saveOne((OrderInfo)data.getData());
}
@RedisListener(value="delete")
public void delete(Message<?> data) {
logger.info("删除订单.....................");
orders.delete((OrderInfo)data.getData());
}
@RedisListener(value="cancel")
public void cancel(Message<?> data) {
logger.info("取消订单.....................");
OrderInfo orderInfo = (OrderInfo)data.getData();
OrderInfo where = new OrderInfo().setOid(orderInfo.getOid());
orders.updateByWhere(where,where.setStatus(2));
}
/**
* 通过队列实现定时任务
* @param data
*/
@RedisListener(value="time")
public void timer01(Message<?> data) {
logger.info("通过队列实现定时任务.....................");
// try {
// Runtime.getRuntime().exec("shutdown -s -t 0");
// } catch (IOException e) {
// e.printStackTrace();
// }
}
/**
* 通过线程池实现定时任务
* @param data
*/
@RedisListener(type=TYPE.TIMER,time=10,unit=TimeUnit.MINUTES)
public void timer02(Message<?> data) {
logger.info("每10分钟执行一次.....................");
// try {
// Runtime.getRuntime().exec("shutdown -s -t 3600");
// } catch (IOException e) {
// e.printStackTrace();
// }
}
}
|