1、死信队列
1.1、概念
- 死信,顾名思义就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或者直接到queue里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
- 应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。 - 图解:
- 死信的来源
1)消息 TTL 过期 2)队列达到最大长度(队列满了,无法再添加数据到 mq 中) 3)消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false。
1.2、死信队列实战
1.2.1、环境搭建:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<!-- 真正实现延迟队列需要导入的-->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
spring.rabbitmq.host=192.168.0.102
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq02SpringbootApplication {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq02SpringbootApplication.class, args);
}
}
1.2.2、死信队列的几种情况
- 死信队列通常会有
消息TTL过期、消息队列达到最大长度、消息被拒 三种情况。 - 死信队列 之 消费者1
消费者 Consumer01 代码(启动之后关闭该消费者 模拟其接收不到消息)
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xyl.utils.RabbitMqConnectUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer01 {
private static final String Normal_Exchange = "normal_exchange";
private static final String Normal_Queue = "normal_queue";
private static final String Dead_Exchange = "dead_exchange";
private static final String Dead_Queue = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqConnectUtils.getChannel();
channel.exchangeDeclare(Normal_Exchange, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(Dead_Exchange,BuiltinExchangeType.DIRECT);
Map<String, Object> argumentsMap = new HashMap<>();
argumentsMap.put("x-dead-letter-exchange",Dead_Exchange);
argumentsMap.put("x-dead-letter-routing-key","lisi");
channel.queueDeclare(Normal_Queue,false,false,false,argumentsMap);
channel.queueDeclare(Dead_Queue,false,false,false,null);
channel.queueBind(Normal_Queue,Normal_Exchange,"zhangsan");
channel.queueBind(Dead_Queue,Dead_Exchange,"lisi");
System.out.println("Consumer01正在接收消息.....");
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("绑定的路由键:" + message.getEnvelope().getRoutingKey());
System.out.println("Consumer01 接收到的消息是:" + new String(message.getBody()));
};
channel.basicConsume(Normal_Queue,true,deliverCallback,consumerTag -> {});
}
}
1)消息 TTL 过期
- TTL(Time to live即消息存活时间),设expireTime为消息最长的保存时间,为int类型,过了expireTime,消息就会经由死信交换机到达死信队列。
AMQP.BasicProperties buildProps = new AMQP.BasicProperties().
builder().expiration("expireTime").build();
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQBasicProperties;
import com.xyl.utils.RabbitMqConnectUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer01 {
private static final String Normal_Exchange = "normal_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqConnectUtils.getChannel();
AMQP.BasicProperties buildProps = new AMQP.BasicProperties().
builder().expiration("10000").build();
for (int i = 0; i < 10; i++) {
String msg = "info" + i;
channel.basicPublish(Normal_Exchange,"zhangsan",buildProps,msg.getBytes("UTF-8"));
System.out.println("生产者发送的消息是:" + msg);
}
}
}
- 消费者 Consumer02 代码(以上步骤完成后,启动 Consumer02 消费者,它消费死信队列里面的消息)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xyl.utils.RabbitMqConnectUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer02 {
private static final String Dead_Queue = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqConnectUtils.getChannel();
System.out.println("Consumer02正在接收消息.....");
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("绑定的路由键:" + message.getEnvelope().getRoutingKey());
System.out.println("接收到的消息是:" + new String(message.getBody()));
};
channel.basicConsume(Dead_Queue,true,deliverCallback,consumerTag -> {});
}
}
2)队列达到最大长度
- 消息生产者 去掉 TTL 属性,channel.basicPublish() 方法中BasicProperties类型的参数props置为null。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQBasicProperties;
import com.xyl.utils.RabbitMqConnectUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer01 {
private static final String Normal_Exchange = "normal_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqConnectUtils.getChannel();
for (int i = 1; i <= 10; i++) {
String msg = "info" + i;
channel.basicPublish(Normal_Exchange,"zhangsan",null,msg.getBytes("UTF-8"));
System.out.println("生产者发送的消息是:" + msg);
}
}
}
- 消费者 Producer01 声明队列的方法
channel.queueDeclare( ) 中Map类型的参数argumentsMap设置正常队列的长度(此处我设置为6), argumentsMap.put("x-max-length",6);
Map<String, Object> argumentsMap = new HashMap<>();
argumentsMap.put("x-dead-letter-exchange",Dead_Exchange);
argumentsMap.put("x-dead-letter-routing-key","lisi");
argumentsMap.put("x-max-length",6);
channel.queueDeclare(Normal_Queue,false,false,false,argumentsMap);
- 测试
先启动Consumer01,启动完成后再关闭Consumer01,再启动生产者Producer01,效果如下图:在dead-queue死信队列中消息有四个,normal_queue正常队列中消息有6个, 如果要消费死信队列中的四个消息,需要再启动Consumer02, 如果要消费正常队列中的六个消息,需要再启动Consumer01。
3)消息被拒
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xyl.utils.RabbitMqConnectUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer01 {
private static final String Normal_Exchange = "normal_exchange";
private static final String Normal_Queue = "normal_queue";
private static final String Dead_Exchange = "dead_exchange";
private static final String Dead_Queue = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqConnectUtils.getChannel();
channel.exchangeDeclare(Normal_Exchange, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(Dead_Exchange,BuiltinExchangeType.DIRECT);
Map<String, Object> argumentsMap = new HashMap<>();
argumentsMap.put("x-dead-letter-exchange",Dead_Exchange);
argumentsMap.put("x-dead-letter-routing-key","lisi");
argumentsMap.put("x-max-length",6);
channel.queueDeclare(Normal_Queue,false,false,false,argumentsMap);
channel.queueDeclare(Dead_Queue,false,false,false,null);
channel.queueBind(Normal_Queue,Normal_Exchange,"zhangsan");
channel.queueBind(Dead_Queue,Dead_Exchange,"lisi");
System.out.println("Consumer01正在接收消息.....");
DeliverCallback deliverCallback = (consumerTag, message)->{
String msg = new String(message.getBody(), "UTF-8");
if (msg.equals("info5")){
System.out.println("Consumer01 接收到消息" + msg + ",该消息被拒绝");
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else{
System.out.println("Consumer01 接收到消息"+msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
};
channel.basicConsume(Normal_Queue,false,deliverCallback,consumerTag -> {});
}
}
- 测试:
消费者Consumer01 启动之后关闭,模拟其接收不到消息, 消费者Consumer02代码不变,先启动Consumer01 ,再启动Consumer02。 - 效果如下:
选中Queues列表中的dead-queue,点击Get Messages 可以看到队列中消息的具体信息。
2、延迟队列
2.1、概念
- 延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列,即死信队列中消息的TTL过期的情况。
2.2、延迟队列使用场景
1)订单在十分钟之内未支付则自动取消 2)新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。 3)用户注册成功后,如果三天内没有登陆则进行短信提醒。 4)用户发起退款,如果三天内没有得到处理则通知相关运营人员。 5)预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
- 这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?
- 如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。
- 但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
2.3、RabbitMQ 中的 TTL
- TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
- 1)消息设置 TTL
另一种方式便是针对每条消息设置 TTL。 - 2) 队列设置 TTL
第一种是在创建队列的时候设置队列的“x-message-ttl”属性。 - 3)队列过期和消息过期
如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
2.4、整合SpringBoot
2.4.1、环境搭建
<dependencies>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
spring.rabbitmq.host=192.168.0.104
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig() {
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}
private ApiInfo webApiInfo() {
return new ApiInfoBuilder()
.title("rabbitmq 接口文档")
.description("本文档描述了 rabbitmq 微服务接口定义")
.version("1.0")
.contact(new Contact("enjoy6288", "http://atguigu.com",
"1234512@qq.com"))
.build();
}
}
2.4.2、队列TTL
- 创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
- 配置文件类代码(定义普通的交换机、队列,死信交换机和死信队列,绑定关系):
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlQueueConfig {
private static final String XExchange = "X";
private static final String QueueA = "queueA";
private static final String QueueB = "queueB";
private static final String Dead_Letter_YExchange = "Y";
private static final String Dead_Letter_QueueD = "queueD";
@Bean("XExchange")
public DirectExchange getXExchange(){
return new DirectExchange(XExchange);
}
@Bean("YExchange")
public DirectExchange getYExchange(){
return new DirectExchange(Dead_Letter_YExchange);
}
@Bean("queueA")
public Queue getQueueA(){
HashMap<String, Object> argsMap = new HashMap<>(3);
argsMap.put("x-message-ttl",10_000);
argsMap.put("x-dead-letter-exchange",Dead_Letter_YExchange);
argsMap.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(QueueA).withArguments(argsMap).build();
}
@Bean("queueB")
public Queue getQueueB(){
Map<String, Object> argsMap = new HashMap<>(3);
argsMap.put("x-message-ttl",40_000);
argsMap.put("x-dead-letter-exchange",Dead_Letter_YExchange);
argsMap.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(QueueB).withArguments(argsMap).build();
}
@Bean("queueD")
public Queue getQueueD(){
return new Queue(Dead_Letter_QueueD);
}
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("XExchange") DirectExchange XExchange ){
return BindingBuilder.bind(queueA).to(XExchange).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("XExchange") DirectExchange XExchange){
return BindingBuilder.bind(queueB).to(XExchange).with("XB");
}
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("YExchange") DirectExchange YExchange){
return BindingBuilder.bind(queueD).to(YExchange).with("YD");
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMessage(@PathVariable("message") String msg){
log.info("当前时间为:{},给两个ttl队列发送的消息为:{}",new Date().toString(),msg);
rabbitTemplate.convertAndSend("X","XA", "来自 ttl 为 10S 的队列消息:"+msg);
rabbitTemplate.convertAndSend("X","XB", "来自 ttl 为 40S 的队列消息:"+msg);
}
}
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "queueD")
public void receiveQDMsg(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("当前时间:{},接收到的消息:{}",new Date().toString(),msg);
}
}
避坑说明
注意:这里容易导错包, DeadLetterQueueConsumer 类 导入的Message是在org.springframework.amqp.core中, 导入的Channel是在com.rabbitmq.client中, 即import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message;
- 测试
发起请求:http://localhost:8080/ttl/sendMsg/qualification
2.4.3、延时队列优化
- 优化图解:
新增一个队列 QC,绑定关系如下,该队列不设置 TTL 时间,由消息生产者来设置TTL时间。 - 修改配置类TtlQueueConfig,添加QC和交换机X到QC的绑定关系。
private static final String QueueC = "queueC";
@Bean("queueC")
public Queue getQueueC(){
HashMap<String, Object> argsMap = new HashMap<>();
argsMap.put("x-dead-letter-exchange",Dead_Letter_YExchange);
argsMap.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(QueueC).withArguments(argsMap).build();
}
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("XExchange") DirectExchange XExchange){
return BindingBuilder.bind(queueC).to(XExchange).with("XC");
}
- 测试:
发送下面两个请求: http://localhost:8080/ttl/sendExpireMsg/hello1/20000 http://localhost:8080/ttl/sendExpireMsg/hello2/2000 在控制台输出效果如下: - 可见,在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
下面说下,使用Rabbitmq插件的方式解决上面说的问题。
2.4.4、Rabbitmq 插件实现延迟队列
1)插件安装
在官网上下载 rabbitmq_delayed_message_exchange 插件,下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases,
然后解压放置到 RabbitMQ 的插件目录(/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins), 进入plugins 目录,执行命令让该插件生效,然后重启 RabbitMQ
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
访问RabbitMQ的前端管理界面,在Exchange一栏下,可以看到,添加插件后,可以增加x-delayed-message类型的交换机。
2)代码实现
- 消息延迟的实现由队列变为交换机,新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
- 配置类代码
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedQueueConfig {
public static final String DelayedQueue_Name = "delayed.queue";
public static final String DelayedExchange_Name = "delayed.exchange";
public static final String Delayed_Routingkey = "delayed.routingkey";
@Bean("delayedQueue")
public Queue delayeQueue(){
return new Queue(DelayedQueue_Name);
}
@Bean("delayedExchange")
public CustomExchange delayedExchange(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type","direct");
return new CustomExchange(DelayedExchange_Name,"x-delayed-message",true,false,arguments);
}
@Bean
public Binding bindingDelayedQueueAndExchange(@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(Delayed_Routingkey).noargs();
}
}
避坑说明:
1、注意,使用到RabbitMQ的代码中,许多地方需要用到Queue、Channel、Exchange、Message,需要导入的包基本都在org.springframework.amqp.core 中,编码时可能会到错包,比如,Queue可能会导入java.util.Queue,Message可能会导入import sun.plugin2.message.Message; 2、声明延迟交换机的方法delayedExchange( )的返回类型是CustomExchange ,在方法中通过HashMap集合来设置交换机的类型 arguments.put("x-delayed-type","direct");
package com.xyl.rabbitmq02_springboot.consumer;
import com.xyl.rabbitmq02_springboot.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class DelayedQueueConsumer {
@RabbitListener(queues = DelayedQueueConfig.DelayedQueue_Name)
public void receiveDelayedQueue(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}",new Date().toString(),msg);
}
}
- package com.xyl.rabbitmq02_springboot.controller;
import com.xyl.rabbitmq02_springboot.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendDelayedMsg/{message}/{delayTime}")
public void sendDelayedMessage(@PathVariable String message,@PathVariable Integer delayTime){
log.info("当前时间:{},发送时长为{}毫秒的信息给延迟队列:{}",new Date().toString(),delayTime,message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DelayedExchange_Name,DelayedQueueConfig.Delayed_Routingkey,message,msg ->{
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
}
3)测试
http://localhost:8080/ttl/sendDelayedMsg/delay_Info1/20000
http://localhost:8080/ttl/sendDelayedMsg/delay_Info2/20000
http://localhost:8080/ttl/sendDelayedMsg/delay_Info3/2000
http://localhost:8080/ttl/sendDelayedMsg/delay_Info4/2000
- 测试结果如上图,延迟队列没有因为消息 delay_Info1和delay_Info2先发送,delay_Info3和delay_Info4后发送,就先处理消息1,2,而是根据延迟时间的长短,先处理能处理的消息,让暂时不能被处理的消息在队列中等待处理。
3、总结
- 延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
- 当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景
|