1. 背景
本节讲述 Java 使用 RabbitMQ 的示例,和 发送者确认回调,消费者回执的内容。
2.知识
高级消息队列协议 (AMQP) 是面向消息的中间件的平台中立的协议。Spring AMQP 项目将 Spring 的概念应用于 AMQP,形成解决方案的开发。
AMQP 的一些基本概念:
开始之前, 要使用 RabbitMQ 首先要了解 AMQP 协议的基本概念,更多可阅读我的另一篇文章。
- 生产者:一个发送消息的程序,它产生消息并发送到队列。这里是用Go写的发送端示程序例。
- 消息队列:即 RabbitMQ 内部的队列,它安装在一个服务器中。做为消息中间件,它与具体开发语言无关,支持 Go,Java等接入连接。
- 消费者:消费者是一个等待消息,接收消息的接收端程序示例
- 交换机(Exchange)可以理解成邮局,交换机将收到的消息根据路由规则分发给绑定的队列(Queue)
安装 RabbitMQ
参考我的另一篇文章:https://www.jianshu.com/p/53ba4fbd0d03
我们使用 Spring AMQP 框架来 操作 RabbitMQ 收发消息。
Spring AMQP 框架
Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。它提供了一个“模板”作为发送和接收消息的高级抽象。
该项目由两部分组成;spring-amqp 是基础抽象,spring-rabbit 是 RabbitMQ 实现。
Spring AMQP 的特征
- 用于异步处理入站消息的侦听器容器
- RabbitTemplate 用于发送和接收消息
- RabbitAdmin 用于自动声明队列、交换和绑定
3. 示例
下面通过一个示例看下基本的收发消息的操作。
3.1 编写程序“生产者”
第一步:配置Rabbit的数据连接
编辑 application.yml, 指定 rabbitmq 的服务器地址,端口号,账户名密码等。
spring:
application:
name: producer
rabbitmq:
host: localhost
virtual-host: /
port: 5672
username: admin
password: admin
第二步:配置好 队列,交换机,和绑定(queue,exchange,binding)
队列里存储了消息,交换机类似邮局,而“绑定”是个“ 队列+交换机”关联关系。通过“绑定” binding 将 交换机和 队列连线在一起。
@Configuration
public class RabbitConfig {
// 路由的 key
public static final String ROUTING_KEY = "hello_routing_key";
public static final String EXCHANGE_NAME = "zyf_direct_exchange";
public static final String QUEUE_NAME = "first_queue";
// 一个队列
@Bean
public Queue getFirstQueue() {
return new Queue(QUEUE_NAME);
}
// 一个 直接交换机
@Bean
public DirectExchange getDirectExchange() {
return new DirectExchange(EXCHANGE_NAME);
}
// 进行绑定
@Bean
public Binding getBinding() {
return BindingBuilder.bind(getFirstQueue()).to(getDirectExchange()).with(ROUTING_KEY);
}
}
第三步:发消息
RabbitTemplate 是操作发送消息的 “模板方法”,springboot 已帮忙配置好注入关系,直接拿来用就可以了。调用: rabbitTemplate.convertAndSend(...) 来发消息,需要指明 交换机的名称,和路由key。
@Service
public class BusinessService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String msg) {
System.out.println("# sendMessage,msg=" + msg);
String routingKey = RabbitConfig.ROUTING_KEY;
String exchangeName = RabbitConfig.EXCHANGE_NAME;
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}
}
总结:
- 先配置好从 “交换机”到“队列”的连线。
- 发送者 就可以通过 交换机名称和路由 key 来发送消息。
3.2 编写程序“消费者”
然后就是准备接收消息了。
第一步:配置好 rabbitmq 的数据连接。
和上面的 发送者一样,编辑 application.yml, 指定 rabbitmq 的服务器地址,端口号,账户名密码等。
第二步:配置 异步消息的监听器
接收消息配置一个回调即可。使用 @RabbitMessageListener 注解标注。
@Component
public class RabbitMessageListener {
public static final String QUEUE_NAME = "first_queue";
@RabbitListener(queues = QUEUE_NAME)
public void receive(String msg) {
System.out.println(msg);
}
}
至此就完成了收发消息。我的代码示例见:https://github.com/vir56k/java_demo/tree/master/rabbitmq_demo1
4. 更多扩展
4.1 生产者发送时的结果回调(确认模式)
发布是异步的——如何检测成功和失败?
发布消息是一种异步机制,默认情况下,"无法路由的消息" 会被 RabbitMQ 丢弃。为了成功发布,您可以收到异步确认,如相关发布者确认和返回 中所述。
考虑两种失败情况:
- 发消息到不存在的交换机。
- 发消息到交换机,但没有匹配的队列。
第一种情况的场景是 指定了 错误的交换机名称。
第二种情况的场景是 “发送者的退货” 。
(1)发送者发送消息后的 “消息确认” 回调事件
对于发布者确认 ,RabbitTemplate 需要 设置:
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
和
rabbitTemplate.setConfirmCallback(confirmCallback());
然后注册回调的实习,示例:
// 用来确认生产者 producer 将消息发送到 broker 的回调
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback() {
return new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData == null ? "" : correlationData.getId();
if (ack) {
// log.info(String.format("# [%s] 投递到 broker 成功!, cause=%s", id, cause));
} else {
log.error(String.format("# [%s] 投递到 broker 失败!, cause=%s", id, cause));
}
}
};
}
上面的 ack 参数 指示了是否投递(到交换机)成功。
注意:一个 ConfirmCallback 仅支持 一个RabbitTemplate。
**(2)发送者的 “退货” 回调事件
对于返回的消息,模板的 mandatory 属性必须设置为true 。也需要将 CachingConnectionFactory 其 publisherReturns属性设置为true
即:
connectionFactory.setPublisherReturns(true);
...
rabbitTemplate.setReturnsCallback(returnsCallback());
// 强制标志,当 setReturnsCallback 被设置时,这里要设置为 true
rabbitTemplate.setMandatory(true);
示例:
// 发布到交换机,但没有匹配的目标队列 时,退货
@Bean
public RabbitTemplate.ReturnsCallback returnsCallback() {
return new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
int replyCode = returnedMessage.getReplyCode();
String replyText = returnedMessage.getReplyText();
String exchange = returnedMessage.getExchange();
System.out.println(String.format("# 退货消息:原因=%s, replyCode=%s, exchange=%s", replyText, replyCode, exchange));
}
};
}
上面方法里的 ReturnedMessage 具有以下属性:
- message - 返回的消息本身
- replyCode - 指示退货原因的代码
- replyText - 退货的文字原因 - 例如 NO_ROUTE
- exchange - 消息发送到的交换
- routingKey - 使用的路由密钥
每个 ReturnsCallback 仅支持一个RabbitTemplate。
4.2消费者回执(确认模式)
消息接收回执是指 消息接收者 收到消息后 向 “broker” 消息代理 回复的“ 确认消息 ”
注意:这里的回执和 发送者 “没有任何关系” 。它通知到 rabbitmq ,rabbitmq 根据回执决定是 重复,或者放弃。
有三种回执模式:
- NONE:不发送确认。RabbitMQ 将此称为“自动确认”,因为代理假定所有消息都已确认,而消费者没有采取任何行动。
- MANUAL:侦听器必须通过调用来确认所有消息Channel.basicAck()。
- AUTO:容器自动确认消息,除非MessageListener抛出异常。
实现手动回执时,注入 ackMode 就可以了。
@RabbitListener(queues = QUEUE_NAME, ackMode = "MANUAL")
示例:
int i = 0;
// 异步 接收消息
@RabbitListener(queues = QUEUE_NAME, ackMode = "MANUAL")
public void receiveForManual(String msg, Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
System.out.println(msg);
String returned_message_correlation = message.getMessageProperties().getHeader("spring_returned_message_correlation");
log.info(String.format("# 触发 receiveForManual msg =%s ,correlationId = %s", msg, returned_message_correlation));
i++;
if (i % 3 == 0) {
log.info("# 接收消息...");
channel.basicAck(tag, false);
} else if (i % 3 == 1) {
log.error("# 消息已重复处理失败,拒绝再次接收...");
channel.basicReject(tag, false);
} else if (i % 3 == 2) {
log.error("# 消息即将再次返回队列处理...");
channel.basicNack(tag, false, true);
}
}
我的代码示例见:https://github.com/vir56k/java_demo/tree/master/rabbitmq_demo2
5.参考:
Spring AMQP 文档https://spring.io/projects/spring-amqp
https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.amqp
https://github.com/spring-projects/spring-amqp-samples
https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#scheduling