IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMQ的过期时间与死信队列 -> 正文阅读

[Java知识库]RabbitMQ的过期时间与死信队列

📢📢📢📣📣📣

哈喽!大家好,我是【一心同学】,一位上进心十足的【Java领域博主】!😜😜😜

?【一心同学】的写作风格:喜欢用【通俗易懂】的文笔去讲解每一个知识点,而不喜欢用【高大上】的官方陈述。

?【一心同学】博客的领域是【面向后端技术】的学习,未来会持续更新更多的【后端技术】以及【学习心得】。

?如果有对【后端技术】感兴趣的【小可爱】,欢迎关注一心同学】💞💞💞

??????感谢各位大可爱小可爱!???????


目录

准备工作

一、过期时间TTL

🚀 介绍

🌴 队列过期

🌴 消息过期

二、死信队列

🚀 介绍

🌴 死信队列的创建

🌴 连接死信队列

🌵 消息数量限制

小结


准备工作

(1)创建一个Spring Boot项目对应生产者

(2)导入依赖。

<dependency>
? ? <groupId>org.springframework.boot</groupId>
? ? <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
? ? <groupId>org.springframework.boot</groupId>
? ? <artifactId>spring-boot-starter-web</artifactId>
</dependency>

(3)定义生产者的配置文件。

application.yml:

server:
  port: 8021
spring:
  #给项目来个名字
  application:
    name: rabbitmq-provider
  #配置rabbitMq 服务器
  rabbitmq:
    host: 服务器地址
    port: 5672
    username: yixin
    password: 123456
    #虚拟host 可以不设置,使用server默认host
    virtual-host: /

一、过期时间TTL

🚀 介绍

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除

RabbitMQ可以对消息队列设置TTL,目前有两种方法可以设置:

第一种方法:通过队列属性设置,队列中所有消息都有相同的过期时间。

第二种方法:对消息进行单独设置,每条消息TTL可以不同。

注意:

如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。

🌴 队列过期

(1)编写配置类

在我们的配置类中定义队列时配置如下:

    @Bean
    public Queue queueTTl() {
        Map<String,Object> args=new HashMap<>();
        args.put("x-message-ttl",5000);//设置过期时间
        return new Queue("ttl_queue",true,false,false,args);
    }

整个配置类如下:

package com.yixin.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TTLRabbitMQ {
    @Bean
    public Queue queueTTl() {
        Map<String,Object> args=new HashMap<>();
        args.put("x-message-ttl",5000);//设置过期时间为5秒
        return new Queue("ttl_queue",true,false,false,args);
    }

    //定义交换机
    @Bean
    FanoutExchange ttlExchange() {
        return new FanoutExchange("TTLExchange");
    }

    //绑定队列和交换机
    @Bean
    Binding bindingExchangeTTL() {
        return BindingBuilder.bind(queueTTl()).to(ttlExchange());
    }
}

(2)编写Controller类,进行消息推送。

package com.yixin.controller;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RestController
public class TTLController {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

    @GetMapping("/sendTTLMessage")
    public String sentMessage(){
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "用户成功下单了!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);

        rabbitTemplate.convertAndSend("TTLExchange", null, map);
        return "消息发送成功!";
    }
}

启动我们的生产者项目,到控制台进行查看:

?可以发现,已经显示了TTL标志,代表我们给这个队列设置了过期时间。

我们在浏览器输入 http://localhost:8021/sendTTLMessage 进行推送消息:

?我们到控制台进行查看:

发现里面存在了一条消息,但过了5秒之后,这条消息就过期了:

🌴 消息过期

(1)编写配置类。

package com.yixin.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TTLMesageRabbitMQ {

    @Bean
    public Queue queueTTlMessage() {

        return new Queue("ttl_message_queue",true);
    }

    //定义交换机
    @Bean
    FanoutExchange ttlExchangeMessage() {
        return new FanoutExchange("TTLMessageExchange");
    }

    //绑定队列和交换机
    @Bean
    Binding bindingExchangeTTLMessage() {
        return BindingBuilder.bind(queueTTlMessage()).to(ttlExchangeMessage());
    }
}

(2)编写Controller类进行推送消息。

我们设置的是消息本身的过期时间,所以需要编写以下代码:

        MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");//设置这条消息5秒就过期,注意这里是字符串
                message.getMessageProperties().setContentEncoding("UTF-8");//设置编码
                return message;
            }
        };

整个Controller类如下:

package com.yixin.controller;


import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RestController
public class TTLMessage {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

    @GetMapping("/sendTTLMessage2")
    public String sentMessage2(){
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "用户成功下单了!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);

        MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");//设置这条消息5秒就过期,注意这里是字符串
                message.getMessageProperties().setContentEncoding("UTF-8");//设置编码
                
                return message;
            }
        };
        rabbitTemplate.convertAndSend("TTLMessageExchange", null,map,messagePostProcessor);
        return "消息发送成功!";
    }
}

启动项目,并在浏览器输入?http://localhost:8021/sendTTLMessage2 进行推送消息:

?去查看我们的控制台页面:

?然后过了5秒之后,这条消息就过期了。

二、死信队列

🚀 介绍

DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。


与普通交换机和队列有何区别?

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数?x-dead-letter-exchange?指定交换机即可。

消息变成死信,可能是由于以下的原因:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

🌴 死信队列的创建

死信队列的创建跟创建一个普通的队列和交换机没什么区别,如下:

package com.yixin.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class DeadRabbitMQ {

    @Bean
    public Queue queueDead() {

        return new Queue("dead_queue",true);
    }

    //定义交换机
    @Bean
    FanoutExchange deadMessage() {
        return new FanoutExchange("DeadExchange");
    }

    //绑定队列和交换机
    @Bean
    Binding bindingExchangeDead() {
        return BindingBuilder.bind(queueDead()).to(deadMessage());
    }
}

🌴 连接死信队列

(1)编写配置类。

TTL过期的配置类中,我们只需要为过期后的消息指定一个死信队列即可。

注意:由于刚刚我们在测试TTL的时候已经创建过ttl_queue,由于我们改动到了其参数,只需先去控制台将ttl_queue删除即可或者重写编写一个队列即可,否则启动会报错。

package com.yixin.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TTLRabbitMQ {
    @Bean
    public Queue queueTTl() {
        Map<String,Object> args=new HashMap<>();
        args.put("x-message-ttl",5000);//设置过期时间
        args.put("x-dead-letter-exchange","DeadExchange");//过期后即将前往的队列名(死信队列)
        //注意:如果是direct或其他需要routingkey的模式那么还需要设置  args.put("x-dead-letter-routing-key","dead");
        return new Queue("ttl_queue",true,false,false,args);
    }

    //定义交换机
    @Bean
    FanoutExchange ttlExchange() {
        return new FanoutExchange("TTLExchange");
    }

    //绑定队列和交换机
    @Bean
    Binding bindingExchangeTTL() {
        return BindingBuilder.bind(queueTTl()).to(ttlExchange());
    }
}

(2)编写Controller进行推送消息。

package com.yixin.controller;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RestController
public class TTLController {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

    @GetMapping("/sendTTLMessage")
    public String sentMessage(){
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "用户成功下单了!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);

        rabbitTemplate.convertAndSend("TTLExchange", null, map);
        return "消息发送成功!";
    }
}

启动项目,并在浏览器输入?http://localhost:8021/sendTTLMessage?进行推送消息:

?查看我们的控制台:

🌵 消息数量限制

?我们也可以设置队列达到最大长度时额外的消息则进入死信队列,设置如下:

    @Bean
    public Queue queueTTl() {
        Map<String,Object> args=new HashMap<>();
        args.put("x-message-ttl",5000);//设置过期时间
        args.put("x-max-length",5);//当我们的长度超过5条数据时,额外的数据将会被放入死信队列中
        args.put("x-dead-letter-exchange","DeadExchange");
       
        //注意:如果是direct或其他需要routingkey的模式那么还需要设置  args.put("x-dead-letter-routing-key","dead");
        return new Queue("ttl_queue",true,false,false,args);
    }

分析:结合“x-message-ttl”和"x-max-length"之后,现在放入死信队列的规则就是:超过5条数据的部分放入死信队列,并且如果过了5秒之后原本队列里的数据没有被消费也会被放到死信队列中。

编写Controller类进行发放10条数据:

package com.yixin.controller;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RestController
public class TTLController {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

    @GetMapping("/sendTTLMessage")
    public String sentMessage(){
        
        for(int i=0;i<10;i++) {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "用户成功下单了!";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

            Map<String, Object> map = new HashMap<>();
            map.put("messageId", messageId);
            map.put("messageData", messageData);
            map.put("createTime", createTime);

            rabbitTemplate.convertAndSend("TTLExchange", null, map);
        }
        return "消息发送成功!";
    }
}

启动项目,并在浏览器输入?http://localhost:8021/sendTTLMessage?进行推送消息:

?我们到控制台进行查看:

?过了5秒之后,在ttl_queue队列中的消息也过期了:

?至此,测试成功!


小结

以上就是【一心同学】讲解的如何在【Spring Boot】中操作【RabbitMQ】的【消息过期】机制以及对【死信队列】如何进行操作,这些知识都非常有用,例如我们在购物的时候,是先进行提交订单,后支付,而此时我们就可以设置消息过期,如果提交订单后24小时内没有支付,那么我们就将其加入死信队列中。

如果这篇【文章】有帮助到你,希望可以给【一心同学】点个👍,创作不易,相比官方的陈述,我更喜欢用【通俗易懂】的文笔去讲解每一个知识点,如果有对【后端技术】感兴趣的小可爱,也欢迎关注???????【一心同学】??????,我将会给你带来巨大的【收获与惊喜】💕💕!

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-04-06 16:04:04  更:2022-04-06 16:04:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 7:17:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码