IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ-实现延迟队列 -> 正文阅读

[大数据]RabbitMQ-实现延迟队列

RabbitMQ-实现延迟队列

1、为什么需要延迟队列

延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

**场景:**在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。

因为我们项目中本身就使用到了Rabbitmq,所以基于方便开发和维护的原则,我们使用了Rabbitmq延迟队列来实现定时任务,

Rabbitmq延迟队列

Rabbitmq本身是没有延迟队列的,只能通过Rabbitmq本身队列的特性来实现,想要Rabbitmq实现延迟队列,需要使用Rabbitmq的死信交换机(Exchange)和消息的存活时间TTL(Time To Live)

死信交换机
一个消息在满足如下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机可以对应很多队列。

一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

上面的消息的TTL到了,消息过期了。

队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

死信交换机就是普通的交换机,只是因为我们把过期的消息扔进去,所以叫死信交换机,并不是说死信交换机是某种特定的交换机

消息TTL(消息存活时间)
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

2、创建交换机(Exchanges)和队列(Queues)

上篇文章教大家使用代码创建交换机和队列,所以本文使用的管理平台来操作!
创建死信交换机
在这里插入图片描述

创建自动过期消息队列

创建一个一个名为delay_queue1的自动过期的队列,当然图片上面的参数并不会让消息自动过期,因为我们并没有设置x-message-ttl参数,如果整个队列的消息有消息都是相同的,可以设置,这里为了灵活,所以并没有设置
在这里插入图片描述

  • x-message-ttl:队列中消息存活时间, 单位ms, 超时则认为是死信
  • x-dead-letter-exchange:出现死信时路由到该参数指定的Exchange(dlx)
  • x-dead-letter-routing-key: dlx路由到其他队列使用的routingKey

创建消息处理队列
这个队列才是真正处理消息的队列,所有进入这个队列的消息都会被处理!
在这里插入图片描述
注意两个队列的区别!
在这里插入图片描述
消息队列绑定到交换机

进入交换机详情页面,将创建的2个队列(delay_queue1和delay_queue2)绑定到交换机上面

点击我们建立的这个交换机
在这里插入图片描述
在这里插入图片描述
绑定成功后如图:
在这里插入图片描述

在这里插入图片描述

代码创建:

@Configuration
public class DelayRabbitConfig {
    @Bean
    public Queue delayQueue01(){
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl",6000);
        map.put("x-dead-letter-exchange","delay_Exchange");
        map.put("x-dead-letter-routing-key","delay2");
        return new Queue("delay_queue1",true,false,false,map);
    }
    @Bean
    public Queue delayQueue02(){
        return new Queue("delay_queue2");
    }

    /**
     * 死信交换机
     * @return
     */
    @Bean
    DirectExchange delay_Exchange() {
        return new DirectExchange("delay_Exchange",true,false);
    }

    @Bean
    Binding bindingDirect01() {
        return BindingBuilder.bind(delayQueue01()).to(delay_Exchange()).with("delay1");
    }
    @Bean
    Binding bindingDirect02() {
        return BindingBuilder.bind(delayQueue02()).to(delay_Exchange()).with("delay2");
    }
}

实现延迟队列的流程

在这里插入图片描述

3、编写生产者代码

@RestController
@RequestMapping("/rabbit")
public class DelayQueueController {

    @Autowired
    private RabbitTemplate template;

    @GetMapping("/delay")
    public String TestMessageAck() {

        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
                            +" 延迟6秒的消息";
        MessageProperties messageProperties = new MessageProperties();
        //设置延迟时间
        messageProperties.setExpiration("6000");
        messageProperties.setCorrelationId(UUID.randomUUID().toString());

        Message message = new Message(createTime.getBytes(),messageProperties);

        template.convertAndSend("delay_Exchange", "delay1",message);
        return "ok";
    }
}

3、编写消费者代码

注意消费者绑定的队列是:delay_queue2

@Configuration
public class MessageListenerConfig {
    @Autowired
    private CachingConnectionFactory connectionFactory;
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
        //设置一个队列
        container.setQueueNames("delay_queue2");
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                byte[] body = message.getBody();
                System.out.println("消费延迟6秒队列中的消息======================");
                System.out.println("消费者收到消息:" + new String(body)+",当前时间:"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
            }
        });

        return container;
    }

}

代码运行结果如下:

在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-23 10:51:49  更:2021-07-23 10:52:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 13:49:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码