本文从本人博客搬运,原文格式更加美观,可以移步原文阅读:RabbitMQ实现消费端限流与非公平分配
Qos机制概述
默认情况下,rabbitmq在分发消息给消费者时,处理方式是将所有消息按照消费者的数量平均分配,一次性发送给所有消费者,然后等待消费者的响应:
- 如果消费者响应
ack ,代表消费成功,rabbitmq会从队列中删除该条消息。响应ack 分为两种情况:
- 自动响应:这是默认方式。当消费者处理消息的方法正常执行完成时自动回复
ack 给rabbitmq - 手动确认:需要在配置文件中开启。在代码中手动控制回复
ack 的时机 - 如果消费者响应
nack ,则代表消费失败,rabbitmq不会删除该消息,并且会尝试重新发送消息(默认重发的次数无限制)。响应nack 同样分为两种情况:
- 自动响应:这是默认方式。当消费者处理消息的方法执行抛出异常时自动回复
nack 给rabbitmq - 手动确认:需要在配置文件中开启。在代码中手动控制回复
nack 的时机,并且可以控制回复nack 的同时是否要求该消息重新入队,如果不要求重新入队,那么rabbitmq会直接删除该消息而不是尝试重发
上述rabbitmq分发消息的默认策略会存在2个问题:
- 如果rabbitmq中积压的消息非常多,那么一次性发送给消费者,可能导致消费者内存等资源被占满,无法正常处理消息
- 如果多个同时在线的消费者处理消息的能力差距很大,那么默认的平均分配消息的策略将会导致能力强的消费者很快处理完所有消息,能力差的消费者却仍然在处理,不利于消息的处理效率
在rabbitmq中可以用Qos 机制解决以上问题。Qos 机制的原理是当消费者有一定数量prefetchCount (可手动配置)的消息未被ack 确认时,rabbitmq不会给消费者发送新的消息。这样就很好地解决了上述2个问题:
- 消费端限流:rabbitmq刚开始只会一次性发送
prefetchCount 数量的消息给消费者,而不是发送所有消息,此时未被确认的消息数量就是prefetchCount 。消费者每处理完1条消息并回复ack 时,rabbitmq在收到ack 后,此时未被确认的消息数量为prefetchCount-1 ,这时rabbitmq才会再发送1条消息给消费者。如此直到mq发送完所有消息 - 非公平分配消息:能力强的消费者处理消息速度快,即回复
ack 的速度快,那么就会促使rabbitmq将剩余的消息更多地发给它,达到一种能者多劳的效果
整合SpringBoot测试
首先在yml中添加rabbitmq的配置,其中关键配置是prefetch ,代表多少消息未被ack 时,rabbitmq不会给消费者发送新的消息
spring:
rabbitmq:
host: 192.168.153.130
port: 5672
username: guest
password: guest
listener:
simple:
prefetch: 2
先解释一下rabbitmq控制台队列标签中,针对消息的几个状态的说明:
Ready :代表保存在rabbitmq本地,未发送给消费者的消息数量Unack :代表已经发送给消费者,但是还未收到消费者ack 或者nack 响应的消息数量Total :队列中消息总数量,Ready + Unack 之和
然后我们先用下列代码给rabbitmq发送5条消息
@Test
public void testSendMessage() {
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend("test.direct", "queue", new User("baobao" + i, 18, new Date()));
}
}
此时控制台的初始状态如下
然后我们创建消费者
@Component
@Slf4j
public class UserConsumer {
@RabbitListener(queues = "test.queue")
public void handleUserMessage(User user) throws InterruptedException {
log.info("开始处理消息");
log.info(user.toString());
TimeUnit.SECONDS.sleep(23);
log.info("消息处理结束");
}
}
启动消费者后观察日志打印和控制台消息数量的变化:
- 由于
prefetch 为2,所以rabbitmq刚开始会发送2条消息给消费者,消费者开始顺序处理消息
- 当第1条消息处理正常处理完以后,自动回复了
ack 给rabbitmq,此时Unacked 消息数量由2减少为1,小于了prefetch ,rabbitmq就会再发送一条消息给消费者,发送后Ready 消息数量减1,Unacked 消息数量又增加为2,达到prefetch ,此时rabbitmq又会停止继续发送消息给消费者
- 当第2条消息也处理完以后,同理rabbitmq会继续发送1条消息给消费者
- 当第3条消息也处理完之后,继续发送mq中剩余的最后一条消息给消费者
- 最后当消费者处理完所有消息,mq的消息数量全部归0
|