RabbitMQ 手动确认模式(日常项目开发常用模式)
借鉴导言
此文借鉴多名CSDN用户博客,并将其博文中关于MQ常用的点,进行了归纳整理 借鉴博文来源:[小目标青年][爱吃烤面筋的鱼][dreamboycs][交换机]
架构及工作原理
MQ架构图
名词解释
Producer/Consumer
producer 消息生产者
consumer 消息消费者
Queue(消息队列)
消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。
设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失
设置为临时队列,queue中的数据在系统重启之后就会丢失
设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除
Exchange(交换机)
Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。交换机存在4种类型:
Direct :直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
fanout:广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。
topic:主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)
headers:消息体的header匹配(ignore)
从示意图可以看出消息生产者并没有直接将消息发送给消息队列,而是通过建立与Exchange的Channel,将消息发送给Exchange,Exchange根据规则,将消息转发给指定的消息队列。消费者通过建立与消息队列相连的Channel,从消息队列中获取消息。
Channel可以理解为建立在生产者/消费者和RabbitMQ服务器之间的TCP连接上的虚拟连接,一个TCP连接上可以建立多个Channel。 RabbitMQ服务器的Exchange对象可以理解为生产者发送消息的邮局,消息队列可以理解为消费者的邮箱。Exchange对象根据它定义的规则和消息包含的routing key以及header信息将消息转发到消息队列。
根据转发消息的规则不同,RabbitMQ服务器中使用的Exchange对象有四种,Direct Exchange, Fanout Exchange, Topic Exchange, Header Exchange,如果定义Exchange时没有指定类型和名称, RabbitMQ将会为每个消息队列设定一个Default Exchange,它的Routing Key是消息队列名称。
任务分发机制 Message durability消息持久化 要持久化队列queue的持久化需要在声明时指定
durable=True;
注意:队列的名字一定要是Broker中不存在的,不然不能改变此队列的任何属性. 队列和交换机有一个创建时候指定的标志 durable , durable 的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列中的消息会在重启后恢复 消息持久化包括3部分
- exchange持久化,在声明时指定durable => true
channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
- queue持久化,在声明时指定durable => true
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
- 消息持久化,在投递时指定delivery_mode => 2(1是非持久化)
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的,如果exchange和queue两者之间有一个持久化,一个非持久化,则不允许建立绑定. 注意:一旦创建了队列和交换机,就不能修改其标志了,例如,创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。
总体来看,我们所关注业务实现是: 1)消息怎么投递的。 2)消费者怎么消费消息。 3)消息是否是可靠投递。 4)消息投递方式。 5)消息的生命周期。 6)消息队列生命周期 消息是怎么投递的?(记住一点,生产者消息投递都是面向交换机的)
- RabbitMQ 是面向交换机投递消息的。交换机可能绑定有许多队列,交换机如何将消息投递给这些队列呢?
首先说一下面向交换机的设计的优势:
- 明显借助了数据链路层那个交换机的设计思想。除了层级分明以外,还能从分提高链路利用率(可能有点抽像)。
- 从代码层面来看:如果没有交换机,你至少得维护一个十分庞大的路由表,然后从路由表正确投递消息,有了交互机,这里路由表就会被拆分到多个交换机里面,效果不必多说。
- 然后就是高度的解耦,不同的交换机可有不同的路由规则,要是没有交换机。。。。。。
Exchange 交换机有4种投递方式,就是枚举类 BuiltinExchangeType 的4个枚举变量:
- DIRECT:会将所有消息先取消息的ROUTE_KEY,然后投递到与ROUTE_KEY绑定的队列里面。
if(msg.routekey.equals(queue.routekey))
-
FANOUT:此种模式下,根本不检查消息的ROUTE_KEY,直接投送到交换机所拥有的所有队列里面。 -
TOPIC,HEADERS自行看一下官网怎么说的,不想码字了_||
总结起来就一个函数就把消息发出去了,可以去官网查一下这个API
channel.basicPublish(excange_name,route_key,false,bs,"test".getBytes());
消费者怎么消费消息(记住一点,消费者消费消息是面向消息队列的,这与生成者有点不一样)
- 还不是就是TCP长连接心跳的那些事,就是这么一个API
channel.basicConsume(QUEUE_AUTODELETE, true, consumer);
- consumer是Consumer类的一个实例,直接去处理回调接口就ok了
消息传递是否可靠
- 很明显是可靠的,除非你将消息队列,声明成非持久模式,这事你又重启了机器。这会丢失消息的。还有就是他有应答机制,你可以通过设置消费者消费消息的模式,去手动应答。channel.basicConsume(?,autoACk,?)的autoAck参数设置
消息的生命周期
- 一旦受到消费者应答,标识消息已被消费,则消息被回收掉。
队列生命周期
channel.queueDeclare(QUEUE_NAME,false,false,true,null);
第二个参数设置为true,会将消息持久化到磁盘,第四个参数设置为true表示没有消息并且没有连接则删除改队列,详情可以查一下API
项目实战
RabbitMq的安装,暂不介绍了(也可以参考我的其他博文:linux安装RabbitMq) pom依赖
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml / properties配置文件
spring:
rabbitmq:
# ip地址
host: 10.0.0.0
# 端口
port: 5672
# 用户名
username: guest
# 密码
password: guest
publisher-confirms: true
publisher-returns: true
listener:
simple:
# 表示消费者消费成功消息以后需要手工的进行签收(ack确认),默认为 auto
acknowledge-mode: manual
concurrency: 5
max-concurrency: 10
prefetch: 1
retry:
initial-interval: 5000ms
enabled: true
max-attempts: 5
default-requeue-rejected: true
注意:手动模式配置 acknowledge-mode: manual 默认:auto RabbitConfig配置监听类
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("ConfirmCallback: " + "相关数据:" + correlationData);
log.info("ConfirmCallback: " + "确认情况:" + ack);
log.info("ConfirmCallback: " + "原因:" + cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("ReturnCallback: " + "消息:" + message);
log.info("ReturnCallback: " + "回应码:" + replyCode);
log.info("ReturnCallback: " + "回应信息:" + replyText);
log.info("ReturnCallback: " + "交换机:" + exchange);
log.info("ReturnCallback: " + "路由键:" + routingKey);
}
});
return rabbitTemplate;
}
public static final int DEFAULT_CONCURRENT = 10;
public static final int DEFAULT_PREFETCH_COUNT = 50;
@Bean("pointTaskContainerFactory")
public SimpleRabbitListenerContainerFactory pointTaskContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setPrefetchCount(DEFAULT_PREFETCH_COUNT);
factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
configurer.configure(factory, connectionFactory);
return factory;
}
}
主题交换机
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
public final static String csp = "topic.csp";
public final static String lsp = "topic.lsp";
@Bean
public Queue cspQueue() {
return new Queue(TopicRabbitConfig.csp);
}
@Bean
public Queue lspQueue() {
Map<String, Object> args= new HashMap<>();
args.put("x-max-priority", 100);
return new Queue(QUEUE, false, false, false, args);
}
@Bean
public Queue sendcspQueue() {
return new Queue(TopicRabbitConfig.sendcsp);
}
@Bean
public Queue sendlspQueue() {
return new Queue(TopicRabbitConfig.sendlsp);
}
@Bean
TopicExchange spExchange() {
return new TopicExchange("OkExchange");
}
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(cspQueue()).to(spExchange()).with(csp);
}
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(lspQueue()).to(spExchange()).with(lsp);
}
}
生产者:
@Autowired
private RabbitTemplate rabbitTemplate;
private void sendMsg(){
Map<String, Object> param = new HashMap<>();
param.put("messageId", UUID.randomUUID().toString());
param.put("createTime", new Date());
param.put("message", "");
param.put("Data", "");
String msg = RandomStringUtils.randomAlphanumeric(6);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
messageProperties.setContentType(CONTENT_TYPE_TEXT_PLAIN);
messageProperties.setContentEncoding("utf8");
messageProperties.setHeader("param",param);
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("OkExchange",TopicRabbitConfig.CSP,message);
}
发送一条消息到mq查看(不经意多点了一下,所以有两条消息) 到此,生产者推送消息的消息确认调用回调函数已经完毕。 可以看到 RabbitConfig配置监听类 写了两个回调函数, ConfirmCallback 、RetrunCallback; 那么以上这两种回调函数都是在什么情况会触发呢?
先从总体的情况分析,推送消息存在四种情况: ①消息推送到server,但是在server里找不到交换机 ②消息推送到server,找到交换机了,但是没找到队列 ③消息推送到sever,交换机和队列啥都没找到 ④消息推送成功
手动模式消费者(监听类)注意三个方法: 手动确认 , 这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。 消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。
着重讲下reject,因为有时候一些场景是需要重新入列的。channel.basicReject(deliveryTag, true); 拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。 使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。 但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。
channel.basicAck(deliverTag, true); 消费成功,确认消息 channel.basicNack(deliverTag, false, true); nack返回false,出现异常并重新回到队列,重新消费 channel.basicReject(deliverTag, false); 为false则拒绝消息,丢掉该消息;为true会重新放回队列,重新消费
介绍告一段落,接下来一起看看消息接收 手动确认是如何消费的 消费者代码:
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Component
public class TopicCspConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = TopicRabbitConfig.CSP, durable = "true"),
exchange = @Exchange(name = "OKExchange", type = "topic"),
key = TopicRabbitConfig.CSP
))
@RabbitHandler
public void process(Message msg, Channel channel) throws IOException {
log.info("TopicCspConsumer 消费者收到消息:{}" , JSONObject.toJSONString(msg));
Map<String,Object> message = msg.getMessageProperties().getHeader("param");
long deliverTag = msg.getMessageProperties().getDeliveryTag();
try {
channel.basicAck(deliverTag, true);
}catch (Exception e){
try {
channel.basicNack(deliverTag, false, true);
} catch (IOException ioException) {
log.error("重新放入队列失败,失败原因:{}",e.getMessage(),e);
}
log.error("TopicConsumer消费者出错,mq参数:{},错误信息:{}",message,e.getMessage(),e);
}
}
}
至此 博主的rabbitMQ手动模式,已经完成了,but,看到借鉴的博客 需要这个消费者项目里面,监听的好几个队列都想变成手动确认模式,而且处理的消息业务逻辑不一样(实际上,上述代码已经可以处理多个队列的手动模式)。为防止你们拷贝后运行无效(可能是环境问题),贴上该段代码配置
import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private MyAckReceiver myAckReceiver;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames("CSP","LSP");
container.setMessageListener(myAckReceiver);
return container;
}
}
至此,完成…
力拔山兮气盖世,时不利兮骓不逝…
|