IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Redis延迟队列手写实现 -> 正文阅读

[大数据]Redis延迟队列手写实现

延迟消息队列在我们的日常工作中经常会被用到,比如支付系统中超过 30 分钟未支付的订单,将会被取消,这样就可以保证此商品库存可以释放给其他人购买,还有外卖系统如果商家超过 5 分钟未接单的订单,将会被自动取消,以此来保证用户可以更及时的吃到自己点的外卖,等等诸如此类的业务场景都需要使用到延迟消息队列,又因为它在业务中比较常见,因此这个知识点在面试中也会经常被问到。

使用 Redis 如何实现延迟消息队列?

延迟消息队列的常见实现方式是通过 ZSet 的存储于查询来实现,它的核心思想是在程序中开启一个一直循环的延迟任务的检测器,用于检测和调用延迟任务的执行,如下图所示:

在这里插入图片描述

延时队列的实现

我们可以使用 ZSet这个命令,将延迟任务用设置好的当前时间戳加上延迟时间作为score进行排序放入ZSet中,使用 zadd score1 value1 …命令就可以一直往内存中生产消息。

再利用 zrangebysocre 查询符合条件的所有待处理的任务,通过循环执行队列任务即可。也可以通过 zrangebyscore key 0 当前时间戳 limit 0 1 查询队列中已经到期的一条任务,来进行消费

Redis延时队列优势

1.Redis zset支持高性能的 score 排序。

2.Redis是在内存上进行操作的,速度非常快。

3.Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。

4.Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性

Redis延时队列劣势

使用 Redis 实现的延时消息队列也存在数据持久化, 消息可靠性的问题

没有重试机制 - 处理消息出现异常没有重试机制, 这些需要自己去实现, 包括重试次数的实现等

没有 ACK 机制 - 例如在获取消息并已经删除了消息情况下, 正在处理消息的时候客户端崩溃了, 这条正在处理的这些消息就会丢失, MQ 是需要明确的返回一个值给 MQ 才会认为这个消息是被正确的消费了

如果对消息可靠性要求较高, 推荐使用 MQ 来实现

手写代码实现

依赖

<!--引入jedis连接依赖-->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

代码

/**
 * @author wangdi
 * @date 22-6-21
 */
@Slf4j
public class DelayQueueRedisTest {

    private Jedis jedis;

    private static final String KEY = "delay_queue";

    @Before
    public void before() {
        jedis = new Jedis("127.0.0.1", 6379);
    }


    @Test
    public void test() {

        // 生产延时消息两个 延时5秒
        product(UUID.randomUUID().toString(), System.currentTimeMillis() + 5000);
        product(UUID.randomUUID().toString(), System.currentTimeMillis() + 5000);


        consumer();

        try {
            Thread.sleep(9999999);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }


    /**
     * 生产者 -> 生产消息
     *
     * @param taskId  任务id  实际开发建议使用业务 ID 和随机生成的唯一 ID 作为 value, 随机生成的唯一 ID 可以保证消息的唯一性, 业务 ID 可以避免 value 携带的信息过多
     * @param exeTime 延迟时间
     */
    public void product(String taskId, long exeTime) {
        System.out.println("加入任务, taskId: " + taskId + ", exeTime: " + exeTime + ", 当前时间:" + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date()));
        jedis.zadd(this.KEY, exeTime, String.valueOf(taskId));
    }


    /**
     * 消费者 -> 消费消息
     */
    public void consumer() {
        Executors.newSingleThreadExecutor().execute(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    Set<String> taskIdSet = jedis.zrangeByScore(KEY, 0, System.currentTimeMillis(), 0, 1);
                    if (taskIdSet == null || taskIdSet.isEmpty()) {
                        log.info("没有任务");

                    } else {
                        taskIdSet.forEach(id -> {
                            long result = jedis.zrem(KEY, id);
                            if (result == 1L) {
//                                log.info("从延时队列中获取到任务,taskId: {}" + id + " , 当前时间:" + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date()));
                                log.info("从延时队列中获取到任务,taskId: {}, 当前时间:{}", id, new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date()));
                                handleMsg(id);
                            }
                        });
                    }
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });



    }


    /**
     * 消息处理业务
     *
     * @param msg
     */
    private void handleMsg(String msg) {
        log.info("消息处理: {}", msg);
    }


    @After
    public void after() {
        jedis.close();
    }
}

SpringBoot集成Redisson实现延迟队列

下一篇 SpringBoot集成Redisson实现延迟队列

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-25 18:11:43  更:2022-06-25 18:14:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:35:44-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码