一、幂等性
1.1 概念
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入到事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等。
在 MQ 中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
1.2 消息重复消费
消费者在消费 MQ 中的消息时,MQ 把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认消息,该条消息会重新发给其他的消费者,后者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
1.3 解决思路
MQ 消费者的幂等性解决思路是:通过使用全局 ID 或者唯一标识,比如时间戳或者 UUID,也可以按照自己的规则生成一个全局唯一 id,每次消费消息时先通过该消息的 id 来判断当前消息是否已经消费过。
1.4 消费端的幂等性保障
在海量订单生成的业务高峰期,生产者有可能会重发发送消息,这时候消费者就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:a.唯一ID + 指纹码机制,利用数据库主键去重,b.利用 Redis 的原子性去实现。
1.5 唯一 ID + 指纹码机制
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后利用查询语句进行判断这个 id 是否存在数据库中。如果不存在,则正常消费该消息,消费完后将数据写入数据库。如果存在,则说明该消息已经消费过,直接丢弃,不作处理。 优势:实现简单,就一个拼接,然后查询判断是否重复 劣势:在高并发时,如果是单个数据库就会有写入性能瓶颈,当然,也可以采用分库分表提升性能。 并不推荐使用这种方式。
1.6 Redis 原子性
利用 Redis 执行 SETNX 命令,天然具有幂等性。从而实现不重复消费。 步骤:
- 先获取到全局唯一 ID
- 消费者获取到消息后,先根据全局唯一 ID 去 Redis 中查询是否存在该消息
- 如果不存在(即 SETNX 命令返回结果为 0),则正常消费该消息。SETNX 命令在执行时,除了可以判断当前消息是否被消费过,还可以自动将数据保存至 Redis 中,表明该消息已经被消费过。
- 如果 Redis 中存在(即 SETNX 命令返回结果为 1),则说明该消息已经消费过,直接丢弃,不作处理
二、优先级队列
不同优先级的队列其执行顺序不同,优先级越高的队列,其内部消息则会被优先消费。
2.1 如何添加
1、控制台页面添加 2、队列中代码添加优先级
Map<String, Object> params = new HashMap<>(1);
params.put("x-max-priority", 10);
channel.queueDeclare("hello", false, false, false, params);
3、消息中代码添加优先级
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("exchange", "routingKey", properties, message.getBytes());
4、SpringBoot 整合 RabbitMQ:队列中代码添加优先级
Map<String, Object> arguments = new HashMap<>(1);
arguments.put("x-max-priority", 5);
Queue queue = new Queue("queueName", true, false, false, arguments);
5、SpringBoot 整合 RabbitMQ:消息中代码添加优先级
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(6);
return message;
}
};
rabbitTemplate.convertAndSend("exchangeName", "routingKey", "message", messagePostProcessor);
2.2 SpringBoot 整合 RabbitMQ 方式
2.2.1 添加配置类
@Configuration
public class PriorityConfig {
private static final String PRIORITY_EXCHANGE_NAME = "priority.exchange";
private static final String PRIORITY_QUEUE_NAME = "priority.queue";
private static final String PROPRITY_ROUTING_KEY = "priority";
@Bean("priorityExchange")
public DirectExchange priorityExchange(){
return new DirectExchange(PRIORITY_EXCHANGE_NAME);
}
@Bean("priorityQueue")
public Queue priorityQueue(){
Map<String, Object> arguments = new HashMap<>(1);
arguments.put("x-max-priority", 5);
return new Queue(PRIORITY_QUEUE_NAME, true, false, false, arguments);
}
@Bean
public Binding priorityQueueBindingPriorityExchange(
@Qualifier("priorityQueue") Queue priorityQueue,
@Qualifier("priorityExchange") DirectExchange priorityExchange){
return BindingBuilder.bind(priorityQueue).to(priorityExchange).with(PROPRITY_ROUTING_KEY);
}
}
2.2.2 消息生产者
@Slf4j
@RestController
@RequestMapping("/priority")
public class PriorityController {
private static final String PRIORITY_EXCHANGE_NAME = "priority.exchange";
private static final String PRIORITY_QUEUE_NAME = "priority.queue";
private static final String PROPRITY_ROUTING_KEY = "priority";
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public String sendMsg(@PathVariable String message){
for (int i = 1; i < 11; i++) {
CorrelationData correlationData = new CorrelationData(i + "");
if(i == 6){
log.info("当前时间:{},发送消息:{}给队列 priority.queue,优先级:6", new Date(), message+i, correlationData);
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(6);
return message;
}
};
rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE_NAME, PROPRITY_ROUTING_KEY, message+i, messagePostProcessor);
} else {
log.info("当前时间:{},发送消息:{}给队列 priority.queue,优先级:1", new Date(), message+i, correlationData);
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(1);
return message;
}
};
rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE_NAME, PROPRITY_ROUTING_KEY, message+i, messagePostProcessor);
}
}
return "发送成功";
}
}
2.2.3 消息消费者
@Slf4j
@Component
public class PriorityConsumer {
@RabbitListener(queues = "priority.queue")
public void receiveWarningMessage(Message message, CorrelationData correlationData){
String msg = new String(message.getBody());
log.info("接收到队列 priority.queue 的消息内容:{},消息 ID:{},优先级为:{}",
msg, correlationData.getId(),message.getMessageProperties().getPriority());
}
}
2.3 测试结果分析
先将消费者代码注释掉,启动服务后访问:http://localhost:8080/priority/sendMsg/你好啊 接着将消费者代码的注释取消掉,重启服务。 可以看到优先级为 6 的消息在服务重启后被优先消费掉。这说明,数字越大,优先级越高,消息就会被优先消费。优先级最大可以设置为 255,但是一般推荐 1~10,队列在接收到消息后会在内部进行排序,如果设置太高,那么需要进行排队的消息以及次数就会变多,增加了内存压力。
三、惰性队列
3.1 使用场景
RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入到磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目的是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而长时间不能消费消息,造成堆积时,惰性队列就很有必要了。 默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。
3.2 两种模式
队列具备两种模式:default 和 lazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置。如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
在队列声明的时候,可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:
Map<String, Object> args = new HashMap<String,Object>();
args.put("x-queue-mode","lazy");
channel.queueDeclare("queueName",false,false,false,args);
SpringBoot 方式:
Map<String, Object> arguments = new HashMap<>(1);
arguments.put("x-queue-mode", "lazy");
Queue queue = new Queue("queueName", false, false, false, arguments);
3.3 内存开销对比
在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2 GB,而惰性队列仅仅占用 1.5MB。
|