1、消费者消费消息
如果我们设置的是自动确认的话,那么就会有当接收到消息,去处理消息的时候生成异常,那么就会有消息丢失的情况。
所以我们可以设置手动确认
2、springBoot设置手动确认消息
2.1引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
2.2application.yml配置开启自动手动确认
spring:
application:
name: rabbitmq-springboot
rabbitmq:
host: 127.0.0.1
port: 5672
username: ems
password: 123
virtual-host: /ems
# 发送者开启 confirm 确认机制
publisher-confirm-type: simple
# 发送者开启 return 确认机制
publisher-returns: true
listener:
simple:
# 设置消费端手动 ack
acknowledge-mode: manual
# 是否支持重试
retry:
enabled: true
server:
port: 7071
2.3消费者代码实现
package com.lst.hello;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class WorkCustomer {
@RabbitListener(queues = "TestQueue")
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
System.out.println("收到消息"+msg);
int i = 1/0;
System.out.println("确认消息");
//手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//是否是重新回到队列的消息
//这里有一个问题就是,代码错误的话回无限的重新接受消息,从而是cup急剧上升
System.out.println("消息即将再次返回队列处理...");
//b:是否允许多条处理 b1:是否重新回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
2.4生产者代码?(上一篇博客)
rabbitMQ-生产者可靠发送-回退模式-return
2.5测试结果
3、问题
业务代码一旦出现BUG,多数情况不会自动修复,一条消息会被无限投递进队列,消费端一直接受处理--->然后处理失败--->重新回到队列,导致了死循环。
解决思路
1、 将失败的消息拒绝之后,重新发送到队列的尾部,这样可以保证其他的正常的消息是可以被处理的。
//确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//channel对象重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(msg));
2、设置消息重试次数,达到了重试上限以后,队列删除此消息,并将消息持久化到数据库并推送报警,然后人工处理和定时任务补偿。
|