IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ 其他知识点 -> 正文阅读

[大数据]RabbitMQ 其他知识点

一、幂等性

1.1 概念

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入到事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等。

在 MQ 中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

1.2 消息重复消费

消费者在消费 MQ 中的消息时,MQ 把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认消息,该条消息会重新发给其他的消费者,后者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

1.3 解决思路

MQ 消费者的幂等性解决思路是:通过使用全局 ID 或者唯一标识,比如时间戳或者 UUID,也可以按照自己的规则生成一个全局唯一 id,每次消费消息时先通过该消息的 id 来判断当前消息是否已经消费过。

1.4 消费端的幂等性保障

在海量订单生成的业务高峰期,生产者有可能会重发发送消息,这时候消费者就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:a.唯一ID + 指纹码机制,利用数据库主键去重,b.利用 Redis 的原子性去实现。

1.5 唯一 ID + 指纹码机制

指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后利用查询语句进行判断这个 id 是否存在数据库中。如果不存在,则正常消费该消息,消费完后将数据写入数据库。如果存在,则说明该消息已经消费过,直接丢弃,不作处理。
优势:实现简单,就一个拼接,然后查询判断是否重复
劣势:在高并发时,如果是单个数据库就会有写入性能瓶颈,当然,也可以采用分库分表提升性能。
并不推荐使用这种方式。

1.6 Redis 原子性

利用 Redis 执行 SETNX 命令,天然具有幂等性。从而实现不重复消费。
步骤:

  1. 先获取到全局唯一 ID
  2. 消费者获取到消息后,先根据全局唯一 ID 去 Redis 中查询是否存在该消息
  3. 如果不存在(即 SETNX 命令返回结果为 0),则正常消费该消息。SETNX 命令在执行时,除了可以判断当前消息是否被消费过,还可以自动将数据保存至 Redis 中,表明该消息已经被消费过。
  4. 如果 Redis 中存在(即 SETNX 命令返回结果为 1),则说明该消息已经消费过,直接丢弃,不作处理

二、优先级队列

不同优先级的队列其执行顺序不同,优先级越高的队列,其内部消息则会被优先消费。

2.1 如何添加

1、控制台页面添加
在这里插入图片描述
2、队列中代码添加优先级

Map<String, Object> params = new HashMap<>(1);
params.put("x-max-priority", 10);
channel.queueDeclare("hello", false, false, false, params);
        

3、消息中代码添加优先级

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("exchange", "routingKey", properties, message.getBytes());

4、SpringBoot 整合 RabbitMQ:队列中代码添加优先级

Map<String, Object> arguments = new HashMap<>(1);
arguments.put("x-max-priority", 5);
Queue queue =  new Queue("queueName", true, false, false, arguments);

5、SpringBoot 整合 RabbitMQ:消息中代码添加优先级

MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setPriority(6);
        return message;
    }
};
rabbitTemplate.convertAndSend("exchangeName", "routingKey", "message", messagePostProcessor);

2.2 SpringBoot 整合 RabbitMQ 方式

2.2.1 添加配置类

@Configuration
public class PriorityConfig {

    private static final String PRIORITY_EXCHANGE_NAME = "priority.exchange";
    private static final String PRIORITY_QUEUE_NAME = "priority.queue";
    private static final String PROPRITY_ROUTING_KEY = "priority";

    /**
     * 声明 priorityExchange
     */
    @Bean("priorityExchange")
    public DirectExchange priorityExchange(){
        return new DirectExchange(PRIORITY_EXCHANGE_NAME);
    }

    /**
     * 声明队列priorityQueue
     */
    @Bean("priorityQueue")
    public Queue priorityQueue(){
        Map<String, Object> arguments = new HashMap<>(1);
        arguments.put("x-max-priority", 5);
        return new Queue(PRIORITY_QUEUE_NAME, true, false, false, arguments);
    }

    @Bean
    public Binding priorityQueueBindingPriorityExchange(
            @Qualifier("priorityQueue") Queue priorityQueue,
            @Qualifier("priorityExchange") DirectExchange priorityExchange){
        return BindingBuilder.bind(priorityQueue).to(priorityExchange).with(PROPRITY_ROUTING_KEY);
    }

}

2.2.2 消息生产者

@Slf4j
@RestController
@RequestMapping("/priority")
public class PriorityController {

    private static final String PRIORITY_EXCHANGE_NAME = "priority.exchange";
    private static final String PRIORITY_QUEUE_NAME = "priority.queue";
    private static final String PROPRITY_ROUTING_KEY = "priority";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public String sendMsg(@PathVariable String message){
        for (int i = 1; i < 11; i++) {
            CorrelationData correlationData = new CorrelationData(i + "");
            if(i == 6){
                log.info("当前时间:{},发送消息:{}给队列 priority.queue,优先级:6", new Date(), message+i, correlationData);
                MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setPriority(6);
                        return message;
                    }
                };
                rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE_NAME, PROPRITY_ROUTING_KEY, message+i, messagePostProcessor);
            } else {
                log.info("当前时间:{},发送消息:{}给队列 priority.queue,优先级:1", new Date(), message+i, correlationData);
                MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setPriority(1);
                        return message;
                    }
                };
                rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE_NAME, PROPRITY_ROUTING_KEY, message+i, messagePostProcessor);
            }
        }

        return "发送成功";
    }


}

2.2.3 消息消费者

@Slf4j
@Component
public class PriorityConsumer {

    @RabbitListener(queues = "priority.queue")
    public void receiveWarningMessage(Message message, CorrelationData correlationData){
        String msg = new String(message.getBody());
        log.info("接收到队列 priority.queue 的消息内容:{},消息 ID:{},优先级为:{}",
                msg, correlationData.getId(),message.getMessageProperties().getPriority());
    }
}

2.3 测试结果分析

先将消费者代码注释掉,启动服务后访问:http://localhost:8080/priority/sendMsg/你好啊
在这里插入图片描述
在这里插入图片描述
接着将消费者代码的注释取消掉,重启服务。
在这里插入图片描述
可以看到优先级为 6 的消息在服务重启后被优先消费掉。这说明,数字越大,优先级越高,消息就会被优先消费。优先级最大可以设置为 255,但是一般推荐 1~10,队列在接收到消息后会在内部进行排序,如果设置太高,那么需要进行排队的消息以及次数就会变多,增加了内存压力。

三、惰性队列

3.1 使用场景

RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入到磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目的是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而长时间不能消费消息,造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。

3.2 两种模式

队列具备两种模式:default 和 lazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置。如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。

在队列声明的时候,可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:

Map<String, Object> args = new HashMap<String,Object>();
args.put("x-queue-mode","lazy");
channel.queueDeclare("queueName",false,false,false,args);

SpringBoot 方式:

Map<String, Object> arguments = new HashMap<>(1);
arguments.put("x-queue-mode", "lazy");
Queue queue =  new Queue("queueName", false, false, false, arguments);

3.3 内存开销对比

在这里插入图片描述

在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2 GB,而惰性队列仅仅占用 1.5MB。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-14 22:40:05  更:2022-06-14 22:41:50 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 3:59:05-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码