IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> SpringBoot整合RabbitMQ实现延时队列 -> 正文阅读

[Java知识库]SpringBoot整合RabbitMQ实现延时队列

组件的版本

SpringBoot : V2.5.5
RabbitMQ: 3.9.9

延时队列

延时队列的典型应用场景,例如购买火车票,下单占座后20分钟内未支付的订单会被强制取消,避免在余票紧张的情况下,车票一直被占用,其他人无法购买。还有电商平台,客户下单后,订单进入购物车,如果购物车内的订单超过特定时间未支付,则会失效,回滚库存。

RabbitMQ实现延时队列

利用 RabbitMQ 做延时队列是比较常见的一种方式,而实际上RabbitMQ 自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTL和 DXL这两个属性间接实现的。

Time To Live(TTL) :

TTL 指的是消息的存活时间,RabbitMQ可以通过x-message-ttl参数来设置指定Queue上消息的存活时间,它的值是一个非负整数,单位为微秒。

RabbitMQ 可以从两种维度设置消息过期时间,分别为队列和消息本身。
设置队列过期时间,那么队列中所有消息都具有相同的过期时间。
设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。
注1:如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。
注2:队列过期后,队列内的所有消息全部变为死信。
注3:消息过期后,只有消息位于队列的顶端,才会判断其是否过期,过期的消息变为死信Dead Letter

Dead Letter Exchanges(DLX)

DLX即死信交换机,绑定在死信交换机上的即死信队列。RabbitMQ的 Queue(队列)可以配置两个参数x-dead-letter-exchange 和 x-dead-letter-routing-key,一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费。
在这里插入图片描述

示例代码

1.消息生产方的配置类

@Configuration
public class RabbitMQConf {
    //正常交换机的名称
    public static final String ITEM_TOPIC_EXCHANGE = "springboot_item_topic_exchange";
    //死信交换机的名称
    public static final String DLX_TOPIC_EXCHANGE = "dlx_topic_exchange";
    //正常队列的名称
    public static final String ITEM_QUEUE = "springboot_item_queue";
    //死信队列的名称
    public static final String DLX_QUEUE = "dlx_queue";
    
    //构建正常交换机
    @Bean("itemTopicExchange")
    public Exchange topicExchange(){
        return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
    }
    //构建死信交换机
    @Bean("dlxExchange")
    public Exchange dlxExchange(){
        return ExchangeBuilder.topicExchange(DLX_TOPIC_EXCHANGE).durable(true).build();
    }
    //构建正常队列
    @Bean("itemQueue")
    public Queue itemQueue(){
        return QueueBuilder
                .durable(ITEM_QUEUE)
                .deadLetterExchange(DLX_TOPIC_EXCHANGE)//设置死信交换机
                .deadLetterRoutingKey("dlx.hello")//设置路由规则
                .ttl(10000)//设置队列TTL=10s
                .maxLength(10)//设置队列队列最大容量=10
                .build();
    }
    //构建死信队列
    @Bean("dlxQueue")
    public Queue dlxQueue(){
        return QueueBuilder.durable(DLX_QUEUE).build();
    }
    //绑定正常队列和正常交换机
    @Bean
    public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
                                     @Qualifier("itemTopicExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
    }
    //绑定死信队列和死信交换机
    @Bean
    public Binding dlxQueueExchange(@Qualifier("dlxQueue") Queue queue,
                                    @Qualifier("dlxExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
    }
}
  1. 正常消息消费方
@Component
public class DlxListener implements ChannelAwareMessageListener {
    @RabbitListener(queues = "springboot_item_queue")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("接收到的消息是:"+new String(message.getBody()));
        try {
            System.out.println("开始处理业务逻辑………………");
           //int i = 3/0;//业务处理过程中发生异常
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("出现异常,拒绝接收");
            //拒绝签收,消息不重回队列,requeue设置为false
            channel.basicNack(deliveryTag,true,false);
        }
    }
}
  1. 死信消息消费方
@Component
public class DLListener implements ChannelAwareMessageListener {
    @RabbitListener(queues = "dlx_queue")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("接收到的消息是:"+new String(message.getBody()));
        try {
            System.out.println("开始处理业务逻辑………………");
            System.out.println("根据订单id查询其状态...");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单,回滚库存....");
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            e.printStackTrace();         
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

延时任务运行结果

在这里插入图片描述

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-12-13 12:40:04  更:2021-12-13 12:42:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 5:26:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码