RabbitMQ如何保证消息不丢失
RabbitMQ消息流程模型

消息的可靠投递
- RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性模式
1.confirm 确认模式
当生产者发送消息后,消息到达broker后就会进行confim回调,在回到中根据投递标签进行消息的唯一确定。根据ack结果分为两种:
- 标识消息正常投递,被broker接受(true)
- 消息发送失败,返回进行处理(false)
2.return 退回模式
当消息未找到exchange或routingkey不正确消息最终路由错误,这两种情况都会导致消息不可达,最终执行return回调
3.Consumer ACK
ACK(Acknowledge)拥有确认的含义,是消费端收到消息的一种确认机制,消息确认有三种方式:
- 自动确认:acknowledge=“none”
- 手动确认:acknowledge=“manual”
- 根据异常情况确认:acknowledge=“auto”
可靠性解决方案
1.可靠性投递方式1,即confirm确认机制
当消息从生产者发送到交换机时,为确保过程中消息不丢失
实现:
- connectionFactory配置中,启用confirm确认
- 在rabbitTemplate中调用setConfirmCallback
代码实现方式:
@Test
public void testConfirm(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b)
System.out.println("消息投递到交换机成功");
else
System.out.println("消息投递到交换机失败");
}
});
rabbitTemplate.convertAndSend("fanout_exchange","","测试投递消息");
}
2.可靠性投递方式2,即return退回机制
当消息从交换机路由到队列时,为确保过程中消息不丢失
实现:
- connectionFactory配置中,启用return退回
- 在rabbitTemplate中调用setReturnCallback
- 开启路由失败的处理机制
代码实现方式:
@Test
public void testReturnCallBack() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("投递到队列的消息内容:" + new String(message.getBody()));
System.out.println("错误码是:" + i);
}
});
rabbitTemplate.convertAndSend("topic_exchange", "apple.aa.bb", "进入队列");
}
3.消费端确认,并接受数据
-
如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag, true);方法接受数据。
-
如果出现异常,则在cath中调用basicNack或basicReject,拒绝消息,让MQ重新发送消息
实现:
代码实现方式:
public class SimpleQueueListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("接收到的消息是:" + new String(message.getBody()));
System.out.println("处理业务");
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
e.printStackTrace();
channel.basicNack(deliveryTag, true, true);
}
}
消息总结
通过上述三点解决方案,了解了消息如何不丢失。我们需要保证存在服务器中的message持久化,此外exchange和queue也要持久化。在最后,消费者在消费信息时,我们通过手动ACK确认服务器是否将消息队列退回。
|