消息可靠性投递

RabbitMQ发送消息的几个环节
- 生产者发送到Broker(MQ服务器), 如何保证Broker成功接收
- Exchage路由到Queue, 如何保证路由到正确的队列
- 消息存在Queue中, 如果没有消费者来消费, 那么如何保证消息稳定的存储
- 一条消息成功的被消费者接收, Broker是怎么知道消费者接收到了消息
消息发送到RabbitMQ服务器
RabbitMQ提供了两种服务端确认机制, 一种是Transaction事务模式, 一种是Confirm确认模式
spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
# 打开消息确认机制
publisher-confirm-type: correlated
事务模式
通过一个 channel.txSelect()的方法把信道设置成事务模式,然后就可以发布消息给 RabbitMQ 了,如果 channel.txCommit();的方法调用成功,就说明事务提交成功,则消息一定到达了 RabbitMQ 中。
但是事务模式有一个缺点,它是阻塞的,一条消息没有发送完毕,不能发送下一条消息
确认模式
- 普通确认模式
- 消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者,也就是调用 channel.waitForConfirms()返回 true,这样生产者就知道消息被服务端接收了。
- 发送一条确认一条的效率不高
- 批量确认
- 一次发送多个消息, 成功之后返回给生产者一个确认
- 如果1000 条消息才确认一次,如果前面 999 条消息都被服务端接收了,如果第 1000 条消息被拒绝了,那么前面所有的消息都要重发。
- 异步确认模式
spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
# 手动确认消息
listener:
simple:
acknowledge-mode: manual
消息到交换机 交换机到队列
如果路由键错误或者队列不存在, RabbitMQ提供了两种方法来处理此类问题
spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
# 打开消息确认机制
publisher-confirm-type: correlated
# 既然要做可靠性,当然是设置为返回到客户端 打开消息返回
publisher-returns: true
template:
mandatory: true
消息在队列稳定存储
如果发生宕机, 重启 队列中的消息可能会丢失, RabbitMQ也提供了持久化机制
- 队列持久化
- queue的持久化是通过durable=true来实现的。
- 交换机持久化
- channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true)
- 消息持久化
- MessageProperties.PERSISTENT_TEXT_PLAIN
消息什么时候刷到磁盘?
- 写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。
- 有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每个25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘。
- 每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。
如果在消息刷盘的过程中宕机怎么办
消费者成功消费消息
如果消费者收到消息后没来得及处理即发生异常,或者处理过程中发生异常, 服务端应该以某种方式得知消费者对消息的接收情况,并决定是否重新投递这条消息给其他消费者。
- RabbitMQ 提供了消费者的消息确认机制(message acknowledgement),消费者可以自动或者手动地发送 ACK 给服务端。
- 没有收到 ACK 的消息,消费者断开连接后,RabbitMQ 会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。
|