一、什么是死信、死信队列?
1.死信
一条消息经历过下面4种情况任意一种时,会变成死信
- 1.消费者端配置自动ack
acknowledge-mode: auto 在重试几次后(配置文件配置默认是3)后进入到死信队列 - 2.消费者端使用basic.reject和basic.nack拒绝签收消息,并且配置requeue参数是false(即不重回原来的队列)
- 3.消息在队列存活的时间大于TTL
- 4.消息队列中消息的数量超过最大长度
2.死信队列
一条消息变为死信时会被重新发布的交换机叫死信交换机,与死信交换机绑定的队列叫做死信队列 。跟普通的队列没有多大的区别,多了几个参数而已。如果没有为队列配置死信交换机,则原有的消息被抛弃。用下面的图来展示一下
二、如何配置死信队列?
项目版本:springboot:2.3.6.RELEASE。 引入的依赖:spring-boot-starter-amqp 和spring-rabbit
1.yml配置文件
spring:
rabbitmq:
host: 192.168.184.128
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: auto
prefetch: 1
default-requeue-rejected: false
retry:
enabled: true
max-attempts: 3
max-interval: 1000ms
2.声明业务队列和死信队列
通过下面的配置将业务队列和死信队列绑定起来,当一条消息从业务队列中变成死信后会重发到死信交换机上,重新消费。
@Bean
public Queue queue(){
Map<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key",DEAD_LETTER_ROUTING_KEY);
args.put("x-max-length",3);
args.put("x-message-ttl",new Long(3000));
Queue queue = new Queue(QUEUE, true,false,false,args);
return queue;
}
上面的配置中两个条件会让消息变为死信
- args.put(“x-max-length”,3):队列中消息总数大于3
- args.put(“x-message-ttl”,new Long(3000)):消息超过3s还未被消费
3.生产者
正常使用RabbitTemplate提供的api发送消息即可
rabbitTemplate.convertAndSend(RabbitMQStudentCourseTopicConfig.EXCHANGE, RabbitMQStudentCourseTopicConfig.ROUTING_KEY_NAME,new MqMessage(studentId,courseId));
4.消费者
消费者分为两类,业务消费者和死信消费者
业务消费者
会报除数不能0的异常
int count=0;
@RabbitListener(queues = RabbitMQStudentCourseTopicConfig.QUEUE)
public void doChooseCourse(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
MqMessage mqMessage = MessageHelper.msgToObj(message, MqMessage.class)
log.info("重试次数:{}",++count);
int i = 5/0;
log.info("消费成功");
}
死信队列监听者
@RabbitListener(queues = RabbitMQStudentCourseTopicConfig.DEAD_LETTER_QUEUE)
public void doChooseCourse(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException {
System.out.println("收到死信消息:" + new String(message.getBody()));
channel.basicAck(deliveryTag,false);
}
5.RabbitMQ控制台检验
已经创建好了两个队列
三、测试死信及如何处理死信
1.自动ack超过重试次数进入死信队列
从上图的重试时间及次数来看也符合配置max-interval: 1000ms 、max-attempts: 3
2.手动签收失败不重回队列进入死信队列
yml文件配置:acknowledge-mode: manual 。
消费者代码修改如下
int count=0;
@RabbitListener(queues = RabbitMQStudentCourseTopicConfig.QUEUE)
public void doChooseCourse(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
MqMessage mqMessage = MessageHelper.msgToObj(message, MqMessage.class);
log.info("重试次数:{}",++count);
try{
int i = 5/0;
}catch (Exception e){
channel.basicNack(deliveryTag,false,false);
log.info("消费失败");
throw e;
}
log.info("消费成功");
}
注意:当yml文件配置消费方签收方式为手动时,重试次数有时候起作用,有时候不起作用。但是起作用时不是失败多少次才会进死信队列,而是只要执行channel.basicNack(deliveryTag,false,false); 这行代码就会进死信队列
那么如何实现手动签收时,失败3次才进入死信队列呢?我们放到下一篇文章中说。
3.消息在队列存活的时间大于TTL进入死信队列
这个测试只要将业务消费者注释掉即可,设置的TTL是3s,测试结果符合预期
4.消息队列中消息的数量超过最大长度进入死信队列
还是把消费者端代码注释,然后运行,看结果
生产者投递消息时间:2021-10-05 18:30:51,id:2904,
生产者投递消息时间:2021-10-05 18:31:08,id:430,
生产者投递消息时间:2021-10-05 18:31:15,id:4646,
生产者投递消息时间:2021-10-05 18:31:29,id:395,
收到死信消息:{"id":2904}
因为队列长度设置的是3,当投放第4个消息即id为395时,最早进入队列id2904被弹出进入死信队列 ,可见rabbitmq是一个先进先出的队列。
配置踩的坑
问题1
原因1:在为队列配置死信交换机前已经声明了该队列,rabbitmq不允许重复,所以把之前的队列删掉就可以了 原因2:如果在消费端使用了@RabbitListener的queuesToDeclare属性一定要配置好,这个属性就是显示创建queue。否则也会发生上面的错误
解决办法:通过管理控制台将原来的queue删掉,使用queues属性指定要创建的queue就可以。
|