这个问题在消息队列中是一个很常见的问题,而消息的丢失又分为在哪几个阶段丢失:
- 发送者发送消息到broker时丢失消息;
- 交换机投递消息到队列时丢失消息;
- 消费者端丢失消息。
准备工作
以下基于springboot项目演示RabbitMq。
创建一个交换机、一个队列,并将它们绑定。
@Configuration
@Slf4j
public class RabbitMqConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public Queue queue01() {
return new Queue("queue01", true, false, false);
}
@Bean
public Exchange exchange01() {
return new DirectExchange("exchange01", true, false);
}
@Bean
public Binding binding() {
return new Binding("queue01", Binding.DestinationType.QUEUE, "exchange01", "send.msg", null);
}
}
发送端消息丢失
情况1:消息抵达broker之前丢失
解决:可以通过开启RabbitMQ的confirm机制来避免这种情况,当消息达到broker的时候,就回去触发ConfirmCallback中的confirm 函数。
步骤:
- 在配置文件中添加配置:
# 开启发送者confirm机制
spring.rabbitmq.publisher-confirm-type=correlated
注意,在版本较低的RabbitMq中,spring.rabbitmq.publisher-confirm-type=correlated ====> spring.rabbitmq.publisher-confirms=true ,高版本这种方式会提示已过时。
- 在配置类中实现ConfirmCallback对象,重写其
confirm 方法,并且将他注入到RabbitTemplate对象中
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData.getId();
if (!ack) {
log.info("id为: {}的消息,未能抵达broker, 原因: {}", id, cause);
} else {
log.info("broker已接收id为: {}的消息", id);
}
}
});
}
- CorrelationData:封装了当前消息的一些信息,包括当前消息的id;
- ack:消息是否成功被投递到broker中;
- cause:投递失败的原因。
- 测试,向队列中发送一条消息
@GetMapping("/send")
public String sendMsg(@RequestParam("msg") String msg) {
rabbitTemplate.convertAndSend("exchange01", "send.msg", msg, new CorrelationData(UUID.randomUUID().toString()));
return "success";
}
这里发送消息携带了CorrelationData对象,如果未携带,则confirm方法是接收不到这个参数的。
confirm机制是异步的,在等待信道返回确认的同时,你还可以继续发送下一条消息。
情况2:消息抵达队列之前丢失
还有一种情况,也就是broker接收到消息了,但是在交换机向队列投递消息的过程中,由于某种原因,消息未能抵达队列,造成的消息丢失。
解决:创建消息抵达队列失败的回调函数
步骤:
- 配置文件中添加配置:
# 开启发送者消息抵达队列的确认机制
spring.rabbitmq.publisher-returns=true
# 只要消息未能抵达队列 就触发失败回调
spring.rabbitmq.template.mandatory=true
- 在配置类中实现returnedMessage中的
returnedMessage 方法,并且注入到RabbitTemplate对象中。
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData.getId();
if (!ack) {
log.info("id为: {}的消息,未能抵达broker, 原因: {}", id, cause);
} else {
log.info("broker已接收id为: {}的消息", id);
}
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
String exchange = returned.getExchange();
Message message = returned.getMessage();
int replyCode = returned.getReplyCode();
String routingKey = returned.getRoutingKey();
String replyText = returned.getReplyText();
log.info(
"exchange: {}, message: {}, replyCode: {}, routingKey: {}, replyText: {}",
exchange, message, replyCode, routingKey, replyText
);
}
});
}
注意,该方法只有消息抵达队列失败时才会调用。
接收端消息丢失
这种情况是在接收者接收到队列中的消息之后,因为某种情况导致该消息并没有被处理到,但是在队列中已经将该消息删除了,导致了消息丢失。
解决:开启手动ack模式,确保我们在处理完该消息后,手动ack告诉队列,这条消息处理完了,你可以删除了,否则如果出现故障导致该消息并没有被处理,那么还会保留在队列中,不会被丢失。
步骤:
- 配置文件添加配置:
# 开启手动ack模式 默认:auto
spring.rabbitmq.listener.direct.acknowledge-mode=manual
- 在接收到消息时,处理完该消息后,进行手动ack
@Component
public class MyRabbitListener {
@RabbitListener(queues = "queue01")
@RabbitHandler
public void receiveMsg(String msg, Channel channel, Message message) throws IOException {
System.out.println("received msg:" + msg);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("处理消息:" + msg);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);
}
}
}
void basicAck(long deliveryTag , boolean multiple) throws IOException :
- deliveryTag:一个自增的消息唯一标识,当消息被拒绝重写投递到队列后,该消息对应的deliveryTag就会+1;
- multiple:是否批量确认,将比当前消息deliveryTag值小的所有消息都进行确认。比如,我们当前存在deliveryTag值分别为:1、2、3的消息,当前消息的deliveryTag为4,如果开启当前消息的批量确认,那么会将deliveryTag为1,2,3,4这些消息都批量确认。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) :
- deliveryTag:同上;
- multiple:是否批量;
- requeue:是否重写入队。
void basicReject(long deliveryTag, boolean requeue)
basicNack和basicReject的区别:前者支持批量,将之前为拒绝的消息都批量拒绝;后者只拒绝当前消息。
测试一下,在处理消息之前手动添加一个异常,发送一条消息:
该消息会一直被重新投递到队列中。
除了以上的这些方式,也是可以使用RabbitMQ事务机制来解决消息丢失的问题,但是这种方式不推荐使用。
|