概述
rabbitMQ 会做一个系列,包括:安装、基础使用、高级队列、集群。
使用环境: jdk 8 、springboot 2.4.10
常见概念:
- AMQP:高级消息队列协议,这是一个消息应用的规范。
- Broker: 接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。
- Channel:Channel 作为轻量级 Connection 极大减少了操作系统建立 TCP connection 的开销。
- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout(multicast)
- Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。
- Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
docker 安装
使用 docker 安装测试环境,在 dockerHub 可以查找版本:https://hub.docker.com/
选择带有控制界面的 management 版本(包含web管理页面):
docker pull rabbitmq:3.9.5-management
rabbit mq 默认两个端口:
- 5672 是默认应用访问端口
- 15672 是默认控制台 Web 端口号
docker run -d --name rabbitMqDocker -p 52365:5672 -p 32512:15672 -v /usr/local/docker/rabbit:/var/lib/rabbitmq --hostname rabbitMq -e RABBITMQ_DEFAULT_VHOST=mqDocker76 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.9.5-management
后续的控制台新增用户、权限细节略过。
Exchange 不同模式
准备环境,先添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application 配置:
spring:
rabbitmq:
host: 127.0.0.1
port: 52365
username: admin
password: admin
virtual-host: mqDocker76
RabbitMQ 基本架构如下,然后开始分别测试三种模式。
Direct Exchange
直连模式基础用法,配置文件、生产、消费代码如下:
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue directQueue() {
return new Queue("directQueue", true, false, false);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange("directExchange", true, false);
}
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRouting");
}
}
@RestController
@RequestMapping
public class DirectProducer {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/directMsg")
public String directMsg() {
Map<String, Object> map = new HashMap<>();
map.put("messageId", String.valueOf(UUID.randomUUID()));
map.put("data", "发送数据体" + System.currentTimeMillis());
map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
rabbitTemplate.convertAndSend("directExchange", "directRouting", map);
return JSONObject.toJSONString(map);
}
}
@Component
@RabbitListener(queues = "directQueue")
public class DirectConsumer {
@RabbitHandler
public void process(Map<String, Object> message) {
System.out.println("DirectReceiver 消费者收到消息 : " + message.toString());
}
}
运行请求,控制台输出:
DirectReceiver 消费者收到消息 : {data=发送数据体1630769307037, createTime=2021-09-04 23:28:27, messageId=c09bbfc8-5018-4f9f-b8fc-678cb42348d2}
上面是一对一的生产消费模式。
实际业务中,对消息生产者没有多少限制,只需要生产发送就可以,但是 消息消费需要保证不能出现重复消费 。
而消费端也不是一个服务在进行,工作队列就是这种情况:一个生产者,多个消费者。
spring:
rabbitmq:
listener:
simple:
prefetch: 1
再新建一个消费者:
@Component
@RabbitListener(queues = "directQueue")
public class DirectConsumerTwo {
@RabbitHandler
public void process(Map<String, Object> message) {
System.out.println("消费者2,DirectReceiver 消费者收到消息 : " + message.toString());
}
}
其实就是 轮询模式 。
Fanout Exchange
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。
这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
代码基本一样:
@Configuration
public class FanoutConfig {
@Bean
public Queue queueA() {
return new Queue("fanout.A", true, false, false);
}
@Bean
public Queue queueB() {
return new Queue("fanout.B", true, false, false);
}
@Bean
public Queue queueC() {
return new Queue("fanout.C", true, false, false);
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
@GetMapping("/fanoutMsg")
public String confirmMsg() {
Map<String, Object> map = new HashMap<>();
map.put("messageId", String.valueOf(UUID.randomUUID()));
map.put("data", "发送数据体" + System.currentTimeMillis());
map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
rabbitTemplate.convertAndSend("fanoutExchange", null, map);
return JSONObject.toJSONString(map);
}
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(Map<String, Object> message) {
System.out.println("fanout.A 收到消息 : " + message.toString());
}
}
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(Map<String, Object> message) {
System.out.println("fanout.B 收到消息 : " + message.toString());
}
}
Topic Exchange
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是:
在它的路由键和绑定键之间是有规则的,大致如下:
- 路由键必须是一串字符,用小数点(.) 隔开
- 通配符 * ,代表一个占位符,或者说一个单词 ,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了
- 通配符 # ,代表一个或多个占位符,或者说一个或多个单词 ,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也可以匹配
代码相似:
@Configuration
public class TopicConfig {
@Bean
public Queue queue1() {
return new Queue("topic.queue1", true, false, false);
}
@Bean
public Queue queue2() {
return new Queue("topic.queue2", true, false, false);
}
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange", true, false);
}
@Bean
Binding bindingQueue1() {
return BindingBuilder.bind(queue1()).to(topicExchange()).with("topic.queue1");
}
@Bean
Binding bindingQueue2() {
return BindingBuilder.bind(queue2()).to(topicExchange()).with("topic.#");
}
}
@RestController
@RequestMapping
public class TopicProducer {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/queue1")
public String queue1() {
Map<String, Object> map = new HashMap<>();
map.put("messageId", String.valueOf(UUID.randomUUID()));
map.put("data", "发送数据体" + System.currentTimeMillis());
map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
rabbitTemplate.convertAndSend("topicExchange", "topic.queue1", map);
return JSONObject.toJSONString(map);
}
@GetMapping("/queue2")
public String queue2() {
Map<String, Object> map = new HashMap<>();
map.put("messageId", String.valueOf(UUID.randomUUID()));
map.put("data", "发送数据体" + System.currentTimeMillis());
map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
rabbitTemplate.convertAndSend("topicExchange", "topic.queue2", map);
return JSONObject.toJSONString(map);
}
}
@Component
@RabbitListener(queues = "topic.queue1")
public class TopicReceiver1 {
@RabbitHandler
public void process(Map<String, Object> message) {
System.out.println("topic.queue1 收到消息 : " + message.toString());
}
}
@Component
@RabbitListener(queues = "topic.queue2")
public class TopicReceiver2 {
@RabbitHandler
public void process(Map<String, Object> message) {
System.out.println("topic.queue2 收到消息 : " + message.toString());
}
}
向 topic.queue1 发送消息,打印:
topic.queue1 收到消息 : {data=发送数据体1631024400445, createTime=2021-09-07 22:20:00, messageId=b4dc33cc-42d6-42e7-a828-4a5a3d2e1678}
topic.queue2 收到消息 : {data=发送数据体1631024400445, createTime=2021-09-07 22:20:00, messageId=b4dc33cc-42d6-42e7-a828-4a5a3d2e1678}
向 topic.queue2 发送消息,打印:
topic.queue2 收到消息 : {data=发送数据体1631024437342, createTime=2021-09-07 22:20:37, messageId=c92d5dab-84b7-4530-86f3-0bc31e5c4036}
消息可靠性
使用了 RabbitMQ 以后,我们的业务链路明显变长了,但造成消息丢失的场景也增加了。
主要存在以下三个关键环节:
- 消息生产者 - rabbitmq服务器,发送消息失败
- rabbitmq服务器自身故障导致消息丢失
- 消息消费者 - rabbitmq服务,消息消费失败
针对这三个环节分别有对应的解决方案。
发送确认
发送确认分为两步,第一步是消息到达 exchange 交换机,第二步是从交换机路由到队列。两步同时成功则消息发送成功。
先添加配置:
spring:
rabbitmq:
publisher-returns: true
publisher-confirm-type: correlated
rabbitMQ 有以下两个接口供实现:
- ConfirmCallback:通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
- ReturnsCallback:通过实现 ReturnsCallback 接口,启动消息失败返回,如果正确到达队列不执行。比如路由不到队列时触发回调
PS: RabbitTemplate.ReturnCallback 已经过时了,改用上面那个,加个 s ;
配置文件:
@Slf4j
@Component
public class RabbitTemplateConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(returnedMessage -> {
System.out.println("消息主体 message : " + returnedMessage.getMessage());
System.out.println("消息主体 message : " + returnedMessage.getReplyCode());
System.out.println("描述:" + returnedMessage.getReplyText());
System.out.println("消息使用的交换器 exchange : " + returnedMessage.getExchange());
System.out.println("消息使用的路由键 routing : " + returnedMessage.getRoutingKey());
});
rabbitTemplate.setConfirmCallback((correlationData, arrival, cause) -> {
assert correlationData != null;
if (arrival) {
log.info("消息已发送到交换机,MessageId:{}", correlationData.getId());
} else {
log.info("消息发送失败,MessageId:{},失败原因:{}", correlationData.getId(), cause);
}
});
}
失败测试只需要写错路由,或者队列就行了。测试发送:
rabbitTemplate.convertAndSend("directExchange", "queue", map, new CorrelationData());
打印日志:
消息主体 message : (Body:'{data=发送数据体1631111311090, createTime=2021-09-08 22:28:31, messageId=206749b5-d3bd-4ebe-8acb-3070d99a40a2}' MessageProperties [headers={spring_returned_message_correlation=8b124ce9-f8a1-4196-bb2a-ca1170842e05}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
消息主体 message : 312
描述:NO_ROUTE
消息使用的交换器 exchange : directExchange
消息使用的路由键 routing : queue
2021-09-08 22:28:31.104 INFO 10536 --- [nectionFactory1] c.d.shiva.confirm.RabbitTemplateConfig : 消息已发送到交换机,MessageId:8b124ce9-f8a1-4196-bb2a-ca1170842e05
消息持久化
消息持久化,需要把 queue ,exchange 都持久化。
上面创建交换机和队列时,已经使用了以下参数进行持久化:
durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
消息签收
rabbitMQ有个 ack 签收机制,简单来说就是三种模式:
AcknowledgeMode.NONE:默认推送的所有消息都已经消费成功,会不断地向消费端推送消息。所以推送出去的消息不会暂存在server 端
AcknowledgeMode.AUTO: 由 spring-rabbit 依据消息处理逻辑是否抛出异常自动发送 ack (无异常)或 nack (异常)到 server 端。
AcknowledgeMode.MANUAL:模式需要人为地获取到 channel 之后调用方法向 server 发送 ack (或消费失败时的 nack )信息
总结就是:无 ack 模式:效率高,存在丢失大量消息的风险。有 ack 模式:效率低,不会丢消息。
在配置文件添加:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: manual
在消费结果方面,也有三种结果:
消费结果 | 结果 | 批量操作 |
---|
ack | 表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除
void basicAck(long deliveryTag, boolean multiple) | 允许 | nack | 表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) | 允许 | reject | 拒绝消息,与 basicNack 区别在于不能进行批量操作,其他用法很相似。
void basicReject(long deliveryTag, boolean requeue) | 不允许 |
- deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag 都会递增。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
- multiple:为了减少网络流量,手动确认可以被批处理,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
举个栗子: 假设我先发送三条消息deliveryTag 分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag 为8,multiple 设置为 true,会将5、6、7、8的消息全部进行确认。
下面看代码:
@Slf4j
@Component
@RabbitListener(queues = "directQueue")
public class ConfigDirectConsumer {
@RabbitHandler
public void process(Map<String, Object> message, Channel channel, Message mqMsg) throws IOException {
try {
System.out.println("消费者收到消息 : " + message.toString());
channel.basicAck(mqMsg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (mqMsg.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,不在返回队列...");
channel.basicReject(mqMsg.getMessageProperties().getDeliveryTag(), false);
} else {
log.error("消息即将再次返回队列处理...");
channel.basicNack(mqMsg.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
循环消费
如果对一条异常处理的消息,进行重新入队,就会无限循环重复消费,用确认处理然后返回队尾可以稍微缓减:
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(msg));
参考文章
docker 安装rabbitMQ - 风止雨歇 - 博客园 (cnblogs.com)
尚硅谷2021最新版RabbitMQ教程丨快速掌握MQ消息中间件_哔哩哔哩_bilibili
Springboot 整合RabbitMq ,用心看完这一篇就够了_默默不代表沉默-CSDN博客
RabbitMQ的应用场景以及基本原理介绍_杨龙飞的博客-CSDN博客_rabbitmq使用场景
SpringBoot+RabbitMQ 实现"工作队列"_Felix-CSDN博客
springboot + rabbitmq 消息确认机制_不忘初心 砥砺前行-CSDN博客
spring-rabbit消费过程解析及AcknowledgeMode选择_JinchaoLv的博客-CSDN博客_acknowledge-mode
RabbitMQ:消息发送确认 与 消息接收确认(ACK) - 简书 (jianshu.com)
Springboot中整合RabbitMq之Topic模式(单个springboot项目)_我的博客-CSDN博客_rabbitmq topic模式
RabbitMq从入门到精通-ConfirmCallback ReturnCallback 区别及使用_wxb880114的专栏-CSDN博客
rabbitTemplate.setReturnCallback()显示过时_kano_2525的博客-CSDN博客
|