📢📢📢📣📣📣
哈喽!大家好,我是【一心同学】,一位上进心十足的【Java领域博主】!😜😜😜
?【一心同学】的写作风格:喜欢用【通俗易懂】的文笔去讲解每一个知识点,而不喜欢用【高大上】的官方陈述。
?【一心同学】博客的领域是【面向后端技术】的学习,未来会持续更新更多的【后端技术】以及【学习心得】。
?如果有对【后端技术】感兴趣的【小可爱】,欢迎关注【一心同学】💞💞💞
??????感谢各位大可爱小可爱!???????
目录
准备工作
一、过期时间TTL
🚀 介绍
🌴 队列过期
🌴 消息过期
二、死信队列
🚀 介绍
🌴 死信队列的创建
🌴 连接死信队列
🌵 消息数量限制
小结
准备工作
(1)创建一个Spring Boot项目对应生产者。
(2)导入依赖。
<dependency>
? ? <groupId>org.springframework.boot</groupId>
? ? <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
? ? <groupId>org.springframework.boot</groupId>
? ? <artifactId>spring-boot-starter-web</artifactId>
</dependency>
(3)定义生产者的配置文件。
application.yml:
server:
port: 8021
spring:
#给项目来个名字
application:
name: rabbitmq-provider
#配置rabbitMq 服务器
rabbitmq:
host: 服务器地址
port: 5672
username: yixin
password: 123456
#虚拟host 可以不设置,使用server默认host
virtual-host: /
一、过期时间TTL
🚀 介绍
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。
RabbitMQ可以对消息和队列设置TTL,目前有两种方法可以设置:
第一种方法:通过队列属性设置,队列中所有消息都有相同的过期时间。
第二种方法:对消息进行单独设置,每条消息TTL可以不同。
注意:
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。
🌴 队列过期
(1)编写配置类
在我们的配置类中定义队列时配置如下:
@Bean
public Queue queueTTl() {
Map<String,Object> args=new HashMap<>();
args.put("x-message-ttl",5000);//设置过期时间
return new Queue("ttl_queue",true,false,false,args);
}
整个配置类如下:
package com.yixin.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TTLRabbitMQ {
@Bean
public Queue queueTTl() {
Map<String,Object> args=new HashMap<>();
args.put("x-message-ttl",5000);//设置过期时间为5秒
return new Queue("ttl_queue",true,false,false,args);
}
//定义交换机
@Bean
FanoutExchange ttlExchange() {
return new FanoutExchange("TTLExchange");
}
//绑定队列和交换机
@Bean
Binding bindingExchangeTTL() {
return BindingBuilder.bind(queueTTl()).to(ttlExchange());
}
}
(2)编写Controller类,进行消息推送。
package com.yixin.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RestController
public class TTLController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendTTLMessage")
public String sentMessage(){
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "用户成功下单了!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("TTLExchange", null, map);
return "消息发送成功!";
}
}
启动我们的生产者项目,到控制台进行查看:
?可以发现,已经显示了TTL标志,代表我们给这个队列设置了过期时间。
我们在浏览器输入 http://localhost:8021/sendTTLMessage 进行推送消息:
?我们到控制台进行查看:
发现里面存在了一条消息,但过了5秒之后,这条消息就过期了:
🌴 消息过期
(1)编写配置类。
package com.yixin.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TTLMesageRabbitMQ {
@Bean
public Queue queueTTlMessage() {
return new Queue("ttl_message_queue",true);
}
//定义交换机
@Bean
FanoutExchange ttlExchangeMessage() {
return new FanoutExchange("TTLMessageExchange");
}
//绑定队列和交换机
@Bean
Binding bindingExchangeTTLMessage() {
return BindingBuilder.bind(queueTTlMessage()).to(ttlExchangeMessage());
}
}
(2)编写Controller类进行推送消息。
我们设置的是消息本身的过期时间,所以需要编写以下代码:
MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");//设置这条消息5秒就过期,注意这里是字符串
message.getMessageProperties().setContentEncoding("UTF-8");//设置编码
return message;
}
};
整个Controller类如下:
package com.yixin.controller;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RestController
public class TTLMessage {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendTTLMessage2")
public String sentMessage2(){
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "用户成功下单了!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");//设置这条消息5秒就过期,注意这里是字符串
message.getMessageProperties().setContentEncoding("UTF-8");//设置编码
return message;
}
};
rabbitTemplate.convertAndSend("TTLMessageExchange", null,map,messagePostProcessor);
return "消息发送成功!";
}
}
启动项目,并在浏览器输入?http://localhost:8021/sendTTLMessage2 进行推送消息:
?去查看我们的控制台页面:
?然后过了5秒之后,这条消息就过期了。
二、死信队列
🚀 介绍
DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
与普通交换机和队列有何区别?
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。 要想使用死信队列,只需要在定义队列的时候设置队列参数?x-dead-letter-exchange ?指定交换机即可。
消息变成死信,可能是由于以下的原因:
🌴 死信队列的创建
死信队列的创建跟创建一个普通的队列和交换机没什么区别,如下:
package com.yixin.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeadRabbitMQ {
@Bean
public Queue queueDead() {
return new Queue("dead_queue",true);
}
//定义交换机
@Bean
FanoutExchange deadMessage() {
return new FanoutExchange("DeadExchange");
}
//绑定队列和交换机
@Bean
Binding bindingExchangeDead() {
return BindingBuilder.bind(queueDead()).to(deadMessage());
}
}
🌴 连接死信队列
(1)编写配置类。
在TTL过期的配置类中,我们只需要为过期后的消息指定一个死信队列即可。
注意:由于刚刚我们在测试TTL的时候已经创建过ttl_queue,由于我们改动到了其参数,只需先去控制台将ttl_queue删除即可或者重写编写一个队列即可,否则启动会报错。
package com.yixin.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TTLRabbitMQ {
@Bean
public Queue queueTTl() {
Map<String,Object> args=new HashMap<>();
args.put("x-message-ttl",5000);//设置过期时间
args.put("x-dead-letter-exchange","DeadExchange");//过期后即将前往的队列名(死信队列)
//注意:如果是direct或其他需要routingkey的模式那么还需要设置 args.put("x-dead-letter-routing-key","dead");
return new Queue("ttl_queue",true,false,false,args);
}
//定义交换机
@Bean
FanoutExchange ttlExchange() {
return new FanoutExchange("TTLExchange");
}
//绑定队列和交换机
@Bean
Binding bindingExchangeTTL() {
return BindingBuilder.bind(queueTTl()).to(ttlExchange());
}
}
(2)编写Controller进行推送消息。
package com.yixin.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RestController
public class TTLController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendTTLMessage")
public String sentMessage(){
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "用户成功下单了!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("TTLExchange", null, map);
return "消息发送成功!";
}
}
启动项目,并在浏览器输入?http://localhost:8021/sendTTLMessage?进行推送消息:
?查看我们的控制台:
🌵 消息数量限制
?我们也可以设置队列达到最大长度时额外的消息则进入死信队列,设置如下:
@Bean
public Queue queueTTl() {
Map<String,Object> args=new HashMap<>();
args.put("x-message-ttl",5000);//设置过期时间
args.put("x-max-length",5);//当我们的长度超过5条数据时,额外的数据将会被放入死信队列中
args.put("x-dead-letter-exchange","DeadExchange");
//注意:如果是direct或其他需要routingkey的模式那么还需要设置 args.put("x-dead-letter-routing-key","dead");
return new Queue("ttl_queue",true,false,false,args);
}
分析:结合“x-message-ttl”和"x-max-length"之后,现在放入死信队列的规则就是:超过5条数据的部分放入死信队列,并且如果过了5秒之后原本队列里的数据没有被消费也会被放到死信队列中。
编写Controller类进行发放10条数据:
package com.yixin.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RestController
public class TTLController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendTTLMessage")
public String sentMessage(){
for(int i=0;i<10;i++) {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "用户成功下单了!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("TTLExchange", null, map);
}
return "消息发送成功!";
}
}
启动项目,并在浏览器输入?http://localhost:8021/sendTTLMessage?进行推送消息:
?我们到控制台进行查看:
?过了5秒之后,在ttl_queue队列中的消息也过期了:
?至此,测试成功!
小结
以上就是【一心同学】讲解的如何在【Spring Boot】中操作【RabbitMQ】的【消息过期】机制以及对【死信队列】如何进行操作,这些知识都非常有用,例如我们在购物的时候,是先进行提交订单,后支付,而此时我们就可以设置消息过期,如果提交订单后24小时内没有支付,那么我们就将其加入死信队列中。
如果这篇【文章】有帮助到你,希望可以给【一心同学】点个赞👍,创作不易,相比官方的陈述,我更喜欢用【通俗易懂】的文笔去讲解每一个知识点,如果有对【后端技术】感兴趣的小可爱,也欢迎关注???????【一心同学】??????,我将会给你带来巨大的【收获与惊喜】💕💕!
|