RabbitMQ学习(下)——发布确认高级、幂等性、优先级、惰性和RabbitMQ集群
一、发布确认高级
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
rabbitmq 整个消息投递的路径为:producer—>rabbitmq broker—>exchange—>queue—>consumer
- 消息从 producer 到 exchange 则会返回一个
confirmCallback 。 - 消息从 exchange–>queue 投递失败则会返回一个
returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递
消息的可靠投递小结:
-
通过spring.rabbitmq.publisher-confirm-type 属性设置发布确认的类型。
NONE 值是禁用发布确认模式,是默认值CORRELATED 值是发布消息成功到交换器后会触发回调方法SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;
使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。 -
设置ConnectionFactory的publisher-returns=“true” 开启退回模式 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
1.1 confirm 确认模式
代码架构图:

-
配置文件,设置spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.host=192.168.2.4
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
# 发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type=correlated
-
添加配置类
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String CONFIRM_ROUTING_KEY = "key1";
@Bean
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
@Bean
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding queueBindingExchange(
@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
-
消息生产者
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable("message") String message) {
CorrelationData correlationData = new CorrelationData("1");
String routingKey1 = "key1";
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
routingKey1,message,correlationData);
log.info("发送消息内容:{},routingKey:{}", message,routingKey1);
CorrelationData correlationData2 = new CorrelationData("2");
String routingKey2 = "key2";
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
routingKey2,message,correlationData2);
log.info("发送消息内容:{},routingKey:{}", message,routingKey2);
}
}
-
消息生产者的回调接口(重点) @Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if(ack){
log.info("交换机已经收到 id 为:{}的消息",id);
}else{
log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
}
}
}
-
执行测试 访问:http://localhost:8080/confirm/sendMessage/你好 可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key1”,第二条消息的 RoutingKey 为 “key2”,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。 丢弃的消息交换机是不知道的,需要解决告诉生产者消息传送失败 
将生产者的代码改成下面的样子:(设置发送到不存在的交换机)
@RequestMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable("message") String message) {
CorrelationData correlationData = new CorrelationData("1");
String routingKey1 = "key1";
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"1",
routingKey1,message,correlationData);
log.info("发送消息内容:{},routingKey:{}", message,routingKey1);
}
访问:http://localhost:8080/confirm/sendMessage/你好  可以看到当发送到错误的交换机时回调方法还是执行的。
1.2 return 退回模式
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。
所以需要通过设置ConnectionFactory的publisher-returns=“true” 开启退回模式,就可以在当消息传递过程中不可达目的地时将消息返回给生产者。
-
配置文件,设置spring.rabbitmq.publisher-returns=true spring.rabbitmq.host=192.168.2.4
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
# 发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type=correlated
# 开启时,当消息发送不出去的时候会回退消息
spring.rabbitmq.publisher-returns=true
-
修改回调接口 @Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if(ack){
log.info("交换机已经收到 id 为:{}的消息",id);
}else{
log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由 key:{}",
new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyText(),
returnedMessage.getExchange(),
returnedMessage.getRoutingKey());
}
}
-
测试执行 访问:http://localhost:8080/confirm/sendMessage/你好 
1.3 备份交换机
-
设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢? -
在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。 -
备份交换器是为了实现没有路由到队列的消息,声明交换机的时候添加属性alternate-exchange ,声明一个备用交换机,为了方便使用一般声明为fanout类型,这样交换机收到路由不到队列的消息就会发送到备用交换机,进而发送到绑定的备份队列中。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
代码架构图: 
-
在上面代码的基础上,修改配置类 @Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
public static final String CONFIRM_ROUTING_KEY = "key1";
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
public static final String BACKUP_QUEUE_NAME = "backup_queue";
public static final String WARNING_QUEUE_NAME = "warning_queue";
@Bean
public DirectExchange confirmExchange() {
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)
.build();
}
@Bean
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding queueBindingExchange(
@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
@Bean
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
@Bean
public Queue backupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
@Bean
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
@Bean
public Binding backupQueueBindingBackupExchange(
@Qualifier("backupQueue") Queue backupQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
@Bean
public Binding warningQueueBindingBackupExchange(
@Qualifier("warningQueue") Queue warningQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
}
-
报警消费者
@Slf4j
@Component
public class WarningConsumer {
@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message) {
String msg = new String(message.getBody());
log.error("报警发现不可路由消息:{}", msg);
}
}
-
执行测试
访问: http://localhost:8080/confirm/sendMessage/你好 ,可以看到没有被路由到队列的消息被报警消费者消费了。 
上面的代码mandatory 参数与备份交换机同时开启了。但最后的结果是备份交换机发现了不可路由的消息,而回退方法没有被调用。 即:备份交换机优先级高
二、幂等性、优先级、惰性
2.1 幂等性
- 幂等性的实质是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复相同的请求而对该资源重复造成影响。注意关注的是请求操作对资源本身造成的影响,而不是请求资源返回的结果。就是保证同一条消息不会重复或者重复消费了也不会对系统数据造成异常。
RabbitMQ的幂等性
- 拿RabbitMQ来说的话,消费者在消费完成一条消息之后会向MQ回复一个ACK(可以配置自动ACK或者手动ACK) 来告诉MQ这条消息已经消费了。假如当消费者消费完数据后,准备回执ACK时,系统挂掉了,MQ是不知道该条消息已经被消费了。所以重启之后MQ会再次发送该条消息,导致消息被重复消费,如果此时没有做幂等性处理,可能就会导致数据错误等问题。
如何避免消息的重复消费问题?
全局唯一ID + Redis
-
生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令,将messageId作为key放到redis中:setnx(messageId,1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃。 -
setnx命令,若给定的key不存在,执行set操作,返回1,若给定的Key已存在,不做任何操作,返回0。
生产者代码:
public void sendMessageIde() {
MessageProperties properties = new MessageProperties();
properties.setMessageId(UUID.randomUUID().toString());
Message message = new Message("Hello RabbitMQ".getBytes(), properties);
rabbitTemplate.convertAndSend("durable-exchange", "rabbit.long.yuan", message);
}
消费者代码:
@RabbitListener(queues = "durable-queue")
@RabbitHandler
public void processIde(Message message, Channel channel) throws IOException {
if (stringRedisTemplate.opsForValue().setIfAbsent(message.getMessageProperties().getMessageId(),"1")){
System.out.println("消费消息:"+ new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
2.2 优先级
注意:实现优先级队列,队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费。
设置优先级队列:
-
通过RabbitMQ管理界面配置队列的优先级属性  -
通过代码去实现 Map<String,Object> args = new HashMap<String,Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("queue_priority", true, false, false, args);
配置了队列优先级的属性之后,可以在管理页面看到Pri的标记: 
发送的消息中设置消息本身的优先级:
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("exchange_priority","rk_priority",properties,("messages").getBytes());
实战:发送10条消息,其中第5条为高优先级消息
- 生产者
public class PriorityProducer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-priority",10);
channel.queueDeclare(QUEUE_NAME,false,false,false,arguments);
for (int i = 1;i < 11 ;i++) {
String message = "info" + i;
if(i == 5) {
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("",QUEUE_NAME,properties,message.getBytes());
} else {
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
}
}
}
}
- 消费者
public class PriorityConsumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
- 执行测试

2.3 惰性
使用场景:
-
RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。 -
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。
两种模式:
队列具备两种模式:default 和 lazy。默认的为default 模式,在3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。 如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
内存开销对比:

在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅 占用 1.5MB
三、RabbitMQ集群
使用集群的原因:
- 如果 RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台 RabbitMQ 服务器可以满足每秒 1000 条消息的吞吐量,那么如果应用需要 RabbitMQ 服务满足每秒 10 万条消息的吞吐量呢?购买昂贵的服务器来增强单机 RabbitMQ 务的性能显得捉襟见肘,搭建一个 RabbitMQ 集群才是 解决实际问题的关键
3.1 RabbitMQ集群的搭建
所搭建的集群实例架构图:

-
准备三台安装RabbitMQ的服务器,并设置主机名为node1,node2,node3
修改hostname文件 
-
配置各个节点的 hosts 文件( vim /etc/hosts),让各个节点都能互相识别对方 vim /etc/hosts
# 加上下面的代码
192.168.2.4 node1
192.168.2.5 node2
192.168.2.6 node3
 -
确保各个节点的 cookie 文件使用的是同一个值,在node1上执行远程操作命令,将node1的cookie复制给node2和node3 scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
 -
启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和 RbbitMQ 应用服务(在三台节点上分别执行以下命令) rabbitmq-server -detached
 -
建立集群 将node2加入到node1 rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
将node3加入到node2 rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app
-
查看集群的状态 rabbitmqctl cluster_status
 -
重新设置用户 此时想要登录集群必须重新创建账户为集群创建账户
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

此时登录管理界面可以看到集群已经搭建好了 
如果想解除集群 可以按照下面操作:(两台机器分别执行)
在默认集群模式集群默认上在node1节点建立一个队列:  问题:如果此时node1节点宕机,则该队列将无法使用。此时需要使用镜像队列解决 
3.2 RabbitMQ之镜像队列
RabbitMQ默认集群模式,并不包管队列的高可用性,尽管队列信息,交换机、绑定这些可以复制到集群里的任何一个节点,然则队列内容不会复制,固然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能守候重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,须要创建镜像队列。
镜像队列搭建步骤:
-
添加 policy策略  -
执行测试 在 node1 上创建一个队列发送一条消息,队列存在镜像队列  停掉 node1之后发现 node3成为镜像队列  就算整个集群只剩下一台机器了 依然能消费队列里面的消息,说明队列里面的消息被镜像队列传递到相应机器里面了
问题:如果node1节点宕机的话,而连接node1节点的此时还需要改变成node2或者node3的IP。 解决:通过Nginx实现高可用负载均衡
|