延迟消息队列在我们的日常工作中经常会被用到,比如支付系统中超过 30 分钟未支付的订单,将会被取消,这样就可以保证此商品库存可以释放给其他人购买,还有外卖系统如果商家超过 5 分钟未接单的订单,将会被自动取消,以此来保证用户可以更及时的吃到自己点的外卖,等等诸如此类的业务场景都需要使用到延迟消息队列,又因为它在业务中比较常见,因此这个知识点在面试中也会经常被问到。
使用 Redis 如何实现延迟消息队列?
延迟消息队列的常见实现方式是通过 ZSet 的存储于查询来实现,它的核心思想是在程序中开启一个一直循环的延迟任务的检测器,用于检测和调用延迟任务的执行,如下图所示:
延时队列的实现
我们可以使用 ZSet这个命令,将延迟任务用设置好的当前时间戳加上延迟时间作为score进行排序放入ZSet中,使用 zadd score1 value1 …命令就可以一直往内存中生产消息。
再利用 zrangebysocre 查询符合条件的所有待处理的任务,通过循环执行队列任务即可。也可以通过 zrangebyscore key 0 当前时间戳 limit 0 1 查询队列中已经到期的一条任务,来进行消费
Redis延时队列优势
1.Redis zset支持高性能的 score 排序。
2.Redis是在内存上进行操作的,速度非常快。
3.Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。
4.Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性
Redis延时队列劣势
使用 Redis 实现的延时消息队列也存在数据持久化, 消息可靠性的问题
没有重试机制 - 处理消息出现异常没有重试机制, 这些需要自己去实现, 包括重试次数的实现等
没有 ACK 机制 - 例如在获取消息并已经删除了消息情况下, 正在处理消息的时候客户端崩溃了, 这条正在处理的这些消息就会丢失, MQ 是需要明确的返回一个值给 MQ 才会认为这个消息是被正确的消费了
如果对消息可靠性要求较高, 推荐使用 MQ 来实现
手写代码实现
依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
代码
@Slf4j
public class DelayQueueRedisTest {
private Jedis jedis;
private static final String KEY = "delay_queue";
@Before
public void before() {
jedis = new Jedis("127.0.0.1", 6379);
}
@Test
public void test() {
product(UUID.randomUUID().toString(), System.currentTimeMillis() + 5000);
product(UUID.randomUUID().toString(), System.currentTimeMillis() + 5000);
consumer();
try {
Thread.sleep(9999999);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void product(String taskId, long exeTime) {
System.out.println("加入任务, taskId: " + taskId + ", exeTime: " + exeTime + ", 当前时间:" + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date()));
jedis.zadd(this.KEY, exeTime, String.valueOf(taskId));
}
public void consumer() {
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
while (true) {
Set<String> taskIdSet = jedis.zrangeByScore(KEY, 0, System.currentTimeMillis(), 0, 1);
if (taskIdSet == null || taskIdSet.isEmpty()) {
log.info("没有任务");
} else {
taskIdSet.forEach(id -> {
long result = jedis.zrem(KEY, id);
if (result == 1L) {
log.info("从延时队列中获取到任务,taskId: {}, 当前时间:{}", id, new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date()));
handleMsg(id);
}
});
}
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
private void handleMsg(String msg) {
log.info("消息处理: {}", msg);
}
@After
public void after() {
jedis.close();
}
}
SpringBoot集成Redisson实现延迟队列
下一篇 SpringBoot集成Redisson实现延迟队列
|