使用RabbitMq做中间件时,生产者如何保证消息不丢失?(基于rabbitTemplate)
一. 生产者confirm机制
配置文件中新增
spring.publisher-confirm-type: correlated
这个配置是意思是: 在发送消息时,带上CorrelationData 对象,这样MQ在收到消息后就可以回调我们的服务,从而可以使用CorrelationData.get 以及 CorrelationData.get(time,TimeUnit) 两个方法监听消息妥投。 这两个方法返回是CorrelationData.Confirm,它有个isAck方法:
- 如果isAck为true,则表示妥投。
- 如果isAck为false,则表示消息未妥投,可能的原因是交换机不存在。
- 如果CorrelationData.get(time,TimeUnit) 超时未返回,则可能消息未妥投。当然也可能消息妥投但是回调失败,这个待会讨论。
代码如下:
CorrelationData.Confirm confirm = correlationData.getFuture().get(300L, TimeUnit.SECONDS);
if(confirm.isAck()){
log.info("妥投");
}else{
log.error("不妥投");
}
当然,这种同步的监听方式,并不能满足所有场景,rabbitTemplate还提供了异步的监听方式
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
log.info("妥投");
}else{
log.error("不妥投");
}
}
});
注意:异步监听如果消息回调失败,是监听不到的。这个待会讨论。
二.消息持久化
上面消息妥投仅仅指的是消息到达RabbitMq的交换机,但是依旧不能保持消息100%不丢失。 因为这个时间消息只是存放于内存中,如果这时rabbitMq的服务宕机,内存中的消息就可以能丢失。 这时就该用上消息持久化。 在定义交换机与队列时,可以手动设置durable来支持消息持久化
DirectExchange exchange = ExchangeBuilder
.directExchange(testExchange)
.durable(true)
.autoDelete()
.build();
Queue queue = QueueBuilder
.durable(testQueue)
.autoDelete()
.build();
当然,消息持久化会导致吞吐量下降,因为rabbitmq需要在持久化完成才回调我们的服务完成confirm操作。
三.消息重发机制
未完待续
|