一、RabbitMq消息可靠实践
参考:https://zhuanlan.zhihu.com/p/139688703 安装:https://www.cnblogs.com/saryli/p/9729591.html
1、确认机制
当连接出现问题的时候,在客户端和服务端之间的消息可能正在投递中,还没有被Broker接收,它们可能正在被编码或者解码,或者一些其他的情况。 在这种场景下,消息并没有被投递,那么它们是需要被重新投递以保障业务稳定性。
确认机制能被用在两个方向:允许消费者告诉服务器(Broker)它已经收到了消息,也允许服务器告诉生产者它接收到了消息。前者就是我们常说的消费者Ack,后者就是我们常说的生产者Confirm。
2、生产者
生成者确认机制, [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EwWpcijB-1629790957998)(生产者确认机制.png)]
// 选择确认机制
channel.confirmSelect();
// 确认消息监听,//都是服务端正常,服务端告诉我们异常,还有极端情况,ack和nack都没获取到,比如网络闪断,这时候需要用定时任务去抓取做后续处理。
channel.addConfirmListener(new ConfirmListener() {
/**
**消息投递成功
*/
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息已经ack,tag: " + deliveryTag);
}
/**
* rabbitmq 中间件服务 告诉我们他异常额
* 比如MQ异常,queue容量达到上限,磁盘写满了等。
*/
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// 对于消费者没有ack的消息,可以做一些特殊处理
System.out.println("消息被拒签,tag: " + deliveryTag);
}
});
// push message
- 1、RabbitMQ还有事务机制(txSelect、txCommit、txRollback),也能保障消息的发送。不过事务机制是「同步阻塞」的,所以不推荐使用,而confirm模式是「异步」机制。
- 2、还有极端情况,ack和nack都没获取到,比如网络闪断,这时候需要用定时任务去抓取做后续处理。(客户端自行重试,或则抓取异常自行处理)
- 3、使用spring的rabbitTemplate可以支持重试,加强一下重试,如果发送出现网络异常,
3、消费者
消费者确认机制,只有消费者确认的消息,RabbitMQ才会删除它,不确认就不会被删除,关闭自动确认机制
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
//手动确认
getChannel().basicNack(envelope.getDeliveryTag(),false, false);
}
});
4、持久化和队列镜像
- 1、为了防止在Broker中丢失消息,交换器、队列和消息都应该设置为持久化
如过将queue的持久化标识durable设置为true,则代表是一个持久的队列
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//关键的是第二个参数设置为true,即durable=true.,
channel.queueDeclare("queue.persistent.name", true, false, false, null);
消息持久化,推送的时候加上配置项
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
- 2、镜像处理
为了防止在Broker中丢失消息,交换器、队列和消息都应该设置为持久化。除此以外,队列和消息还应该被复制,为了应对操作系统未及时fsync刷盘、Broker重启、Broker服务器硬件故障、或者Broker crash故障等问题。
5、告警
## 设置节点可使用RAM百分比,超过这个百分比就会告警
vm_memory_high_watermark.relative = 0.4
## 设置节点可使用RAM的上限,单位为byte即字节
vm_memory_high_watermark.absolute = 1073741824
## RabbitMQ 3.6.0+,设置节点可使用RAM的绝对大小,它的优先级低于relative
vm_memory_high_watermark.absolute = 2GB
## 设置磁盘剩余空间阈值,当可用空间低于这个值就会触发告警
disk_free_limit.absolute = 51200
## with RabbitMQ 3.6.0+.
disk_free_limit.absolute = 500KB
disk_free_limit.absolute = 50MB
disk_free_limit.absolute = 5GB
## 设置磁盘剩余空间为有效RAM的多少倍,比如当前可用RAM为2G,且设定这个参数为2。那么当磁盘可用空间低于2*2=4G时就会触发告警
disk_free_limit.relative = 2.0
6、监控和Metrics
1、CPU统计情况; 2、内存使用率; 3、虚拟内存统计; 4、RabbitMQ节点数据目录下磁盘剩余可用空间; 5、磁盘IO情况; 6、网络吞吐量(接收量、发送量、最大网络吞吐量等); 7、网络延迟情况(RabbitMQ集群所有节点以及客户端之间的网络延迟); 8、文件描述符;
7、健康检查
|