IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ实现消费端限流与非公平分配 -> 正文阅读

[大数据]RabbitMQ实现消费端限流与非公平分配

本文从本人博客搬运,原文格式更加美观,可以移步原文阅读:RabbitMQ实现消费端限流与非公平分配

Qos机制概述

默认情况下,rabbitmq在分发消息给消费者时,处理方式是将所有消息按照消费者的数量平均分配,一次性发送给所有消费者,然后等待消费者的响应:

  • 如果消费者响应ack,代表消费成功,rabbitmq会从队列中删除该条消息。响应ack分为两种情况:
    • 自动响应:这是默认方式。当消费者处理消息的方法正常执行完成时自动回复ack给rabbitmq
    • 手动确认:需要在配置文件中开启。在代码中手动控制回复ack的时机
  • 如果消费者响应nack,则代表消费失败,rabbitmq不会删除该消息,并且会尝试重新发送消息(默认重发的次数无限制)。响应nack同样分为两种情况:
    • 自动响应:这是默认方式。当消费者处理消息的方法执行抛出异常时自动回复nack给rabbitmq
    • 手动确认:需要在配置文件中开启。在代码中手动控制回复nack的时机,并且可以控制回复nack的同时是否要求该消息重新入队,如果不要求重新入队,那么rabbitmq会直接删除该消息而不是尝试重发

上述rabbitmq分发消息的默认策略会存在2个问题:

  1. 如果rabbitmq中积压的消息非常多,那么一次性发送给消费者,可能导致消费者内存等资源被占满,无法正常处理消息
  2. 如果多个同时在线的消费者处理消息的能力差距很大,那么默认的平均分配消息的策略将会导致能力强的消费者很快处理完所有消息,能力差的消费者却仍然在处理,不利于消息的处理效率

在rabbitmq中可以用Qos机制解决以上问题。Qos机制的原理是当消费者有一定数量prefetchCount(可手动配置)的消息未被ack确认时,rabbitmq不会给消费者发送新的消息。这样就很好地解决了上述2个问题:

  1. 消费端限流:rabbitmq刚开始只会一次性发送prefetchCount数量的消息给消费者,而不是发送所有消息,此时未被确认的消息数量就是prefetchCount。消费者每处理完1条消息并回复ack时,rabbitmq在收到ack后,此时未被确认的消息数量为prefetchCount-1,这时rabbitmq才会再发送1条消息给消费者。如此直到mq发送完所有消息
  2. 非公平分配消息:能力强的消费者处理消息速度快,即回复ack的速度快,那么就会促使rabbitmq将剩余的消息更多地发给它,达到一种能者多劳的效果

整合SpringBoot测试

首先在yml中添加rabbitmq的配置,其中关键配置是prefetch,代表多少消息未被ack时,rabbitmq不会给消费者发送新的消息

spring:
  rabbitmq:
    host: 192.168.153.130
    port: 5672
    username: guest
    password: guest
    #virtual-host:
    listener:
      simple:
        prefetch: 2  # 代表多少消息未被ack时,rabbitmq不会给消费者发送新的消息

先解释一下rabbitmq控制台队列标签中,针对消息的几个状态的说明:

  • Ready:代表保存在rabbitmq本地,未发送给消费者的消息数量
  • Unack:代表已经发送给消费者,但是还未收到消费者ack或者nack响应的消息数量
  • Total:队列中消息总数量,Ready + Unack之和

然后我们先用下列代码给rabbitmq发送5条消息

@Test
public void testSendMessage() {
    for (int i = 0; i < 5; i++) {
        rabbitTemplate.convertAndSend("test.direct", "queue", new User("baobao" + i, 18, new Date()));
    }
}

此时控制台的初始状态如下

然后我们创建消费者

@Component
@Slf4j
public class UserConsumer {
    @RabbitListener(queues = "test.queue")
    public void handleUserMessage(User user) throws InterruptedException {
        log.info("开始处理消息");
        log.info(user.toString());
        // 休眠23秒,模拟消息处理的时间很长
        TimeUnit.SECONDS.sleep(23);
        log.info("消息处理结束");
    }
}

启动消费者后观察日志打印和控制台消息数量的变化:

  • 由于prefetch为2,所以rabbitmq刚开始会发送2条消息给消费者,消费者开始顺序处理消息

  • 当第1条消息处理正常处理完以后,自动回复了ack给rabbitmq,此时Unacked消息数量由2减少为1,小于了prefetch,rabbitmq就会再发送一条消息给消费者,发送后Ready消息数量减1,Unacked消息数量又增加为2,达到prefetch,此时rabbitmq又会停止继续发送消息给消费者

  • 当第2条消息也处理完以后,同理rabbitmq会继续发送1条消息给消费者

  • 当第3条消息也处理完之后,继续发送mq中剩余的最后一条消息给消费者

  • 最后当消费者处理完所有消息,mq的消息数量全部归0

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-11 16:42:24  更:2021-07-11 16:42:38 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/28 9:50:49-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码