一、消息队列
1.1、MQ 的相关概念
1.1.1、什么是 MQ
MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
1.1.2、为什么要用 MQ
- 流量消峰
举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。 - 应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。 - 异步处理
有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api,B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息。
1.1.3、MQ 的分类
1.ActiveMQ
- 优点:
单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据 - 缺点:
官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。
2.Kafka 大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件,以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。目前已经被 LinkedIn,Uber, Twitter, Netflix 等大公司所采纳。
- 优点:
性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms 级可用性非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方Kafka Web 管理界面 Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 - 缺点:
Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢;
3.RocketMQ RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。
- 优点:
单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅读源码,定制自己公司的 MQ - 缺点:
支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在 MQ核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码
4.RabbitMQ 2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
- 优点:
由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高https://www.rabbitmq.com/news.html - 缺点:
商业版需要收费,学习成本较高
1.1.4、MQ 的选择
- Kafka
Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选 kafka 了。尚硅谷官网 kafka 视频连接http://www.gulixueyuan.com/course/330/tasks - RocketMQ
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。 - RabbitMQ
结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。
1.2、RabbitMQ
1.2.1、RabbitMQ 的概念
RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
1.2.2、四大核心概念
- 生产者
产生数据发送消息的程序是生产者 - 交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定 - 队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 - 消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
1.2.3、RabbitMQ 核心部分
1.2.4、各个名词介绍
- Broker:
接收和分发消息的应用,RabbitMQ Server 就是 Message Broker - Virtual host:
出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等 - Connection:
publisher/consumer 和 broker 之间的 TCP 连接 - Channel:
如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销 - Exchange:
message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast) - Queue:
消息最终被送到这里等待 consumer 取走 - Binding:
exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
二、RabbitMQ入门
2.1、创建生产者和消费者模块
创建生产者rabbitmq-producer和消费者rabbitmq-consumer模块
2.2、添加maven依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.0</version>
</dependency>
2.3、编写连接工具类
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
return connection;
}
}
2.4、编写生产者
public class RabbitmqProducer {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送完成");
channel.close();
connection.close();
}
}
2.5、编写消费者
public class RabbitmqConsumer {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body,"UTF-8"));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
2.6、小结
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分,代表消费者保留的消息缓冲区;生产者向其中投递消息,消费者从其中取出消息。生产者将消息发送到队列。消费者从该队列接收消息。
三、AMQP
3.1、概念介绍
AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
AMQP是一个二进制协议,拥有一些现代化特点:多信道、协商式,异步,安全,扩平台,中立,高效。
RabbitMQ是AMQP协议的Erlang的实现。
概念 | 说明 |
---|
连接Connection | 一个网络连接,比如TCP/IP套接字连接。 | 会话Session | 端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。 | 信道Channel | 多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。 | 客户端Client | AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。 | 服务节点Broker | 消息中间件的服务节点;一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。 | 端点 | AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。 | 消费者Consumer | 一个从消息队列里请求消息的客户端程序。 | 生产者Producer | 一个向交换机发布消息的客户端应用程序。 |
3.2、RabbitMQ运转流程
在RabbitMQ入门案例中:
生产者发送消息
- 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
- 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;
- 将路由键(空字符串)与队列绑定起来;
- 发送消息至RabbitMQ Broker;
- 关闭信道;
- 关闭连接;
消费者接收消息
- 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker
- 向Broker 请求消费相应队列中的消息,设置相应的回调函数;
- 等待Broker回应闭关投递响应队列中的消息,消费者接收消息;
- 确认(ack,自动确认)接收到的消息;
- RabbitMQ从队列中删除相应已经被确认的消息;
- 关闭信道;
- 关闭连接;
3.3、生产者流转过程说明
- 客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQPO-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
- 客户端调用connection.createChannel方法。此方法开启信道,其包装的channel.open命令发送给Broker,等待channel.basicPublish方法,对应的AMQP命令为Basic.Publish,这个命令包含了content Header 和content Body()。content Header 包含了消息体的属性,例如:投递模式,优先级等,content Body 包含了消息体本身。
- 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。
3.4、消费者流转过程说明
- 消费者客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQPO-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
- 消费者客户端调用connection.createChannel方法。和生产者客户端一样,协议涉及Channel .Open/Open-Ok命令。
- 在真正消费之前,消费者客户端需要向Broker 发送Basic.Consume 命令(即调用channel.basicConsume方法〉将Channel 置为接收模式,之后Broker 回执Basic . Consume - Ok 以告诉消费者客户端准备好消费消息。
- Broker 向消费者客户端推送(Push) 消息,即Basic.Deliver 命令,这个命令和Basic.Publish 命令一样会携带Content Header 和Content Body。
- 消费者接收到消息并正确消费之后,向Broker 发送确认,即Basic.Ack 命令。
- 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。
四、RabbitMQ五种消息模式
消息分类
- 工作队列模式
- Hello World(简单模式)
- Work queues(工作队列模式)
- 发布/订阅模式
- Publish/Subscribe(广播模式)
- Routing(路由模式)
- Topics(通配符模式)
4.1、hello world简单模式
代码和入门程序一致
4.2、Work queues工作队列模式
默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。 Work Queues 与入门程序的 简单模式 相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
Work Queues 与入门程序的 简单模式 的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。
1、生产者
public class WorkQueueProducer {
private final static String QUEUE_NAME = "work_queues";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 1; i <= 10; i++) {
String message = "生产者生产消息----" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("发送完成");
channel.close();
connection.close();
}
}
2、消费者1
public class WorkQueueConsumer1 {
private final static String QUEUE_NAME = "work_queues";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body,"UTF-8"));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
3、消费者2
public class WorkQueueConsumer2 {
private final static String QUEUE_NAME = "work_queues";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body,"UTF-8"));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
4.3、发布/订阅
在发布/订阅模型中,多了一个exchange角色,而且过程略有变化
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。
RabbitMQ交换机类型
- Direct exchange(直连交换机)
- Fanout exchange(扇型交换机)
- Topic exchange(主题交换机)
- Headers exchange(头交换机)
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
4.4、广播模式(Fanout)
在广播模式下,消息发送流程是这样的:
- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
1、生产者
public class FanoutProducer {
static final String FANOUT_EXCHAGE = "fanout_exchange";
static final String FANOUT_QUEUE1 = "fanout_queue1";
static final String FANOUT_QUEUE2 = "fanout_queue2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(FANOUT_QUEUE2, true, false, false, null);
channel.queueBind(FANOUT_QUEUE1, FANOUT_EXCHAGE, "");
channel.queueBind(FANOUT_QUEUE2, FANOUT_EXCHAGE, "");
for (int i = 1; i <= 10; i++) {
String message = "生产者生产消息----发布订阅模式----" + i;
channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());
}
System.out.println("发送完成");
channel.close();
connection.close();
}
}
2、消费者1
public class FanoutConsumer1 {
static final String FANOUT_EXCHAGE = "fanout_exchange";
static final String FANOUT_QUEUE1 = "fanout_queue1";
static final String FANOUT_QUEUE2 = "fanout_queue2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(FANOUT_QUEUE1, true, false, false, null);
channel.queueBind(FANOUT_QUEUE1, FANOUT_EXCHAGE, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body,"UTF-8"));
}
};
channel.basicConsume(FANOUT_QUEUE1, true, consumer);
}
}
3、消费者2
public class FanoutConsumer2 {
static final String FANOUT_EXCHAGE = "fanout_exchange";
static final String FANOUT_QUEUE1 = "fanout_queue1";
static final String FANOUT_QUEUE2 = "fanout_queue2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(FANOUT_QUEUE2, true, false, false, null);
channel.queueBind(FANOUT_QUEUE1, FANOUT_EXCHAGE, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body,"UTF-8"));
}
};
channel.basicConsume(FANOUT_QUEUE2, true, consumer);
}
}
4.5、路由模式(Direct)
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。 在Direct模型下
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey (路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey 。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
Routing模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。
生产者
public class DirectProducer {
static final String DIRECT_EXCHAGE = "direct_exchange";
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");
channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");
String message = "新增了商品。路由模式;routing key 为 insert ";
channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());
message = "修改了商品。路由模式;routing key 为 update" ;
channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());
System.out.println("发送完成");
channel.close();
connection.close();
}
}
消费者1
public class DirectConsumer1 {
static final String DIRECT_EXCHAGE = "direct_exchange";
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body,"UTF-8"));
}
};
channel.basicConsume(DIRECT_QUEUE_INSERT, true, consumer);
}
}
消费者2
public class DirectConsumer2 {
static final String DIRECT_EXCHAGE = "direct_exchange";
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "insert");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body,"UTF-8"));
}
};
channel.basicConsume(DIRECT_QUEUE_UPDATE, true, consumer);
}
}
4.6、通配符模式(Topic)
Topic 类型的Exchange 与Direct 相比,都是可以根据RoutingKey 把消息路由到不同的队列。只不过Topic 类型Exchange 可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: user.insert 通配符规则:
# :匹配一个或多个词 * :匹配不多不少恰好1个词 解释
- Queue1:绑定的是
china.# ,因此凡是以 china. 开头的routing key 都会被匹配到。包括china.news和china.weather - Queue2:绑定的是
#.news ,因此凡是以 .news 结尾的 routing key 都会被匹配。包括china.news和japan.news
生产者
public class TopictProducer {
static final String TOPIC_EXCHAGE = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
String message = "中国新闻。Topic模式;routing key 为 china.news";
channel.basicPublish(TOPIC_EXCHAGE, "china.news", null, message.getBytes());
System.out.println("已发送消息:" + message);
message = "中国天气。Topic模式;routing key 为 china.weather";
channel.basicPublish(TOPIC_EXCHAGE, "china.weather", null, message.getBytes());
System.out.println("已发送消息:" + message);
message = "日本新闻。Topic模式;routing key 为 japan.news";
channel.basicPublish(TOPIC_EXCHAGE, "japan.news", null, message.getBytes());
System.out.println("已发送消息:" + message);
message = "日本天气。Topic模式;routing key 为 japan.weather";
channel.basicPublish(TOPIC_EXCHAGE, "japan.weather", null, message.getBytes());
System.out.println("已发送消息:" + message);
channel.close();
connection.close();
}
}
消费者1
public class TopicConsumer1 {
static final String TOPIC_EXCHAGE = "topic_exchange";
static final String TOPIC_QUEUE1 = "topic_queuq1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(TOPIC_QUEUE1, true, false, false, null);
channel.queueBind(TOPIC_QUEUE1, TOPIC_EXCHAGE, "china.#");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body, "UTF-8"));
System.out.println("-------------------");
}
};
channel.basicConsume(TOPIC_QUEUE1, true, consumer);
}
}
消费者2
public class TopicConsumer2 {
static final String TOPIC_EXCHAGE = "topic_exchange";
static final String TOPIC_QUEUE2 = "topic_queuq2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(TOPIC_QUEUE2, true, false, false, null);
channel.queueBind(TOPIC_QUEUE2, TOPIC_EXCHAGE, "japan.#");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body,"UTF-8"));
System.out.println("-------------------");
}
};
channel.basicConsume(TOPIC_QUEUE2, true, consumer);
}
}
消费者3
public class TopicConsumer3 {
static final String TOPIC_EXCHAGE = "topic_exchange";
static final String TOPIC_QUEUE3 = "topic_queuq3";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(TOPIC_QUEUE3, true, false, false, null);
channel.queueBind(TOPIC_QUEUE3, TOPIC_EXCHAGE, "#.news");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body,"UTF-8"));
System.out.println("-------------------");
}
};
channel.basicConsume(TOPIC_QUEUE3, true, consumer);
}
}
消费者4
public class TopicConsumer4 {
static final String TOPIC_EXCHAGE = "topic_exchange";
static final String TOPIC_QUEUE4 = "topic_queuq4";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(TOPIC_QUEUE4, true, false, false, null);
channel.queueBind(TOPIC_QUEUE4, TOPIC_EXCHAGE, "#.weather");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body,"UTF-8"));
System.out.println("-------------------");
}
};
channel.basicConsume(TOPIC_QUEUE4, true, consumer);
}
}
五、Spring整合RabbitMQ
5.1、搭建生产者工程
5.1.1、创建工程
创建生产者工程 spring-rabbitmq-producer
5.1.2、添加依赖
添加依赖
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.13</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.13</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
5.1.3、配置整合
创建 rabbitmq.properties 连接参数配置文件
rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
创建spring-rabbitmq.xml 整合配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<rabbit:queue id="spring_word_queue" name="spring_word_queue" auto-declare="true"/>
<rabbit:queue id="spring_fanout_queue1" name="spring_fanout_queue1" auto-declare="true"/>
<rabbit:queue id="spring_fanout_queue2" name="spring_fanout_queue2" auto-declare="true"/>
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue1"></rabbit:binding>
<rabbit:binding queue="spring_fanout_queue2"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:queue id="spring_direct_queue1" name="spring_direct_queue1" auto-declare="true"/>
<rabbit:queue id="spring_direct_queue2" name="spring_direct_queue2" auto-declare="true"/>
<rabbit:direct-exchange name="spring_direct_exchange">
<rabbit:bindings>
<rabbit:binding queue="spring_direct_queue1" key="red"></rabbit:binding>
<rabbit:binding queue="spring_direct_queue2" key="blue"></rabbit:binding>
<rabbit:binding queue="spring_direct_queue2" key="yellow"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:queue id="spring_topic_queue1" name="spring_topic_queue1" auto-declare="true"/>
<rabbit:queue id="spring_topic_queue2" name="spring_topic_queue2" auto-declare="true"/>
<rabbit:queue id="spring_topic_queue3" name="spring_topic_queue3" auto-declare="true"/>
<rabbit:queue id="spring_topic_queue4" name="spring_topic_queue4" auto-declare="true"/>
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange">
<rabbit:bindings>
<rabbit:binding pattern="china.*" queue="spring_topic_queue1"></rabbit:binding>
<rabbit:binding pattern="japan.#" queue="spring_topic_queue2"></rabbit:binding>
<rabbit:binding pattern="#.news" queue="spring_topic_queue3"></rabbit:binding>
<rabbit:binding pattern="#.weather" queue="spring_topic_queue4"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
5.1.4、发送消息
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class SpringProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void queueTest() {
rabbitTemplate.convertAndSend("spring_queue", "hello spring rabbitmq");
}
@Test
public void workQueueTest() {
rabbitTemplate.convertAndSend("spring_word_queue", "hello spring rabbitmq");
rabbitTemplate.convertAndSend("spring_word_queue", "hello spring rabbitmq");
}
@Test
public void fanoutTest() {
rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "发送到 spring_fanout_exchange交换机的广播消息1");
rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "发送到 spring_fanout_exchange交换机的广播消息2");
}
@Test
public void directTest() {
rabbitTemplate.convertAndSend("spring_direct_exchange", "red", "发送到 spring_direct_exchange交换机的red路由消息");
rabbitTemplate.convertAndSend("spring_direct_exchange", "blue", "发送到 spring_direct_exchange交换机的blue路由消息");
rabbitTemplate.convertAndSend("spring_direct_exchange", "yellow", "发送到 spring_direct_exchange交换机的yellow路由消息");
}
@Test
public void topicTest(){
rabbitTemplate.convertAndSend("spring_topic_exchange", "china.news", "发送到spring_topic_exchange交换机china.new的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "china.weather", "发送到spring_topic_exchange交换机china.weather的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "japan.news", "发送到spring_topic_exchange交换机japan.news的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "japan.weather", "发送到spring_topic_exchange交换机japan.weather的消息");
}
}
5.2、搭建消费者工程
5.2.1、创建工程
创建消费者工程spring-rabbitmq-consumer
5.2.2、添加依赖
添加依赖
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.13</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.13</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
5.2.3、配置整合
创建 rabbitmq.properties 连接参数配置文件
rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
创建spring-rabbitmq.xml 整合配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<bean id="springQueueListener" class="com.java521.listener.SpringQueueListener"></bean>
<bean id="springWorkQueueListener1" class="com.java521.listener.SpringWorkQueueListener1"></bean>
<bean id="springWorkQueueListener2" class="com.java521.listener.SpringWorkQueueListener2"></bean>
<bean id="fanoutListener1" class="com.java521.listener.FanoutListener1"></bean>
<bean id="fanoutListener2" class="com.java521.listener.FanoutListener2"></bean>
<bean id="directListener1" class="com.java521.listener.DirectListener1"></bean>
<bean id="directListener2" class="com.java521.listener.DirectListener2"></bean>
<bean id="topicListener1" class="com.java521.listener.TopicListener1"></bean>
<bean id="topicListener2" class="com.java521.listener.TopicListener2"></bean>
<bean id="topicListener3" class="com.java521.listener.TopicListener3"></bean>
<bean id="topicListener4" class="com.java521.listener.TopicListener4"></bean>
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<rabbit:listener ref="springWorkQueueListener1" queue-names="spring_word_queue"/>
<rabbit:listener ref="springWorkQueueListener2" queue-names="spring_word_queue"/>
<rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue1"/>
<rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue2"/>
<rabbit:listener ref="directListener1" queue-names="spring_direct_queue1"/>
<rabbit:listener ref="directListener2" queue-names="spring_direct_queue2"/>
<rabbit:listener ref="topicListener1" queue-names="spring_topic_queue1"/>
<rabbit:listener ref="topicListener2" queue-names="spring_topic_queue2"/>
<rabbit:listener ref="topicListener3" queue-names="spring_topic_queue3"/>
<rabbit:listener ref="topicListener4" queue-names="spring_topic_queue4"/>
</rabbit:listener-container>
</beans
5.2.4、消息监听器
1、简单队列监听器
public class SpringQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
2、工作队列监听器1
public class SpringWorkQueueListener1 implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("spring_queue1: 接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
3、工作队列监听器2
public class SpringWorkQueueListener2 implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("spring_queue2: 接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
4、广播监听器1
public class FanoutListener1 implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("广播监听器1:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
5、广播监听器2
public class FanoutListener2 implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("广播监听器2:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
6、路由监听器1
public class DirectListener1 implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("路由监听器1:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
7、路由监听器2
public class DirectListener2 implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("路由监听器2:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
8、配符监听器1
public class TopicListener1 implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("通配监听器1:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
9、配符监听器2
public class TopicListener2 implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("通配监听器2:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
10、配符监听器3
public class TopicListener3 implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("通配监听器3:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
11、配符监听器4
public class TopicListener4 implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("通配监听器4:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
六、SpringBoot整合RabbitMQ
6.1、简介
在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ:https://github.com/spring-projects/spring-amqp
尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。
一般在开发过程中:
- 生产者工程:
1.application.yml文件配置RabbitMQ相关信息; 2.在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定 3.注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机 - 消费者工程:
1.application.yml文件配置RabbitMQ相关信息 2.创建消息处理类,用于接收队列中的消息并进行处理
6.2、搭建生产者工程
6.2.1、创建工程
创建springboot项目生产者工程springboot-rabbitmq-producer
6.2.2、添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
6.2.3、配置文件
创建application.yml ,内容如下
server:
port: 9090
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
2)绑定交换机和队列 创建RabbitMQ队列与交换机绑定的配置类RabbitMQConfig
@Configuration
public class RabbitMQConfig {
public static final String SPRING_BOOT_SIMPLE_QUEUE = "spring_boot_simple_queue";
public static final String SPRING_BOOT_WORK_QUEUE = "spring_boot_work_queue";
public static final String SPRING_BOOT_FANOUT_QUEUE1 = "spring_boot_fanout_queue1";
public static final String SPRING_BOOT_FANOUT_QUEUE2 = "spring_boot_fanout_queue2";
public static final String SPRING_BOOT_FANOUT_EXCHANGE = "spring_boot_fanout_exchange";
public static final String SPRING_BOOT_DIRECT_QUEUE1 = "spring_boot_direct_queue1";
public static final String SPRING_BOOT_DIRECT_QUEUE2 = "spring_boot_direct_queue2";
public static final String SPRING_BOOT_DIRECT_EXCHANGE = "spring_boot_direct_exchange";
public static final String SPRING_BOOT_TOPIC_QUEUE1 = "spring_boot_topic_queue1";
public static final String SPRING_BOOT_TOPIC_QUEUE2 = "spring_boot_topic_queue2";
public static final String SPRING_BOOT_TOPIC_EXCHANGE = "spring_boot_topic_exchange";
@Bean
public Queue simpleQueue() {
return new Queue(SPRING_BOOT_SIMPLE_QUEUE);
}
@Bean
public Queue workQueue() {
return new Queue(SPRING_BOOT_WORK_QUEUE);
}
@Bean
public FanoutExchange springBootFanoutExchange() {
return new FanoutExchange(SPRING_BOOT_FANOUT_EXCHANGE);
}
@Bean
public Queue springBootFanoutQueue1() {
return new Queue(SPRING_BOOT_FANOUT_QUEUE1);
}
@Bean
public Queue springBootFanoutQueue2() {
return new Queue(SPRING_BOOT_FANOUT_QUEUE2);
}
@Bean
public Binding bindingQueue1(Queue springBootFanoutQueue1, FanoutExchange springBootFanoutExchange) {
return BindingBuilder.bind(springBootFanoutQueue1).to(springBootFanoutExchange);
}
@Bean
public Binding bindingQueue2(Queue springBootFanoutQueue2, FanoutExchange springBootFanoutExchange) {
return BindingBuilder.bind(springBootFanoutQueue2).to(springBootFanoutExchange);
}
@Bean
public DirectExchange springBootDirectExchange() {
return new DirectExchange(SPRING_BOOT_DIRECT_EXCHANGE);
}
@Bean
public Queue springBootDirectQueue1() {
return new Queue(SPRING_BOOT_DIRECT_QUEUE1);
}
@Bean
public Queue springBootDirectQueue2() {
return new Queue(SPRING_BOOT_DIRECT_QUEUE2);
}
@Bean
public Binding bindingQueue3(Queue springBootDirectQueue1, DirectExchange springBootDirectExchange) {
return BindingBuilder.bind(springBootDirectQueue1).to(springBootDirectExchange).with("debug");
}
@Bean
public Binding bindingQueue4(Queue springBootDirectQueue2, DirectExchange springBootDirectExchange) {
return BindingBuilder.bind(springBootDirectQueue2).to(springBootDirectExchange).with("info");
}
@Bean
public TopicExchange springBootTopicExchange() {
return new TopicExchange(SPRING_BOOT_TOPIC_EXCHANGE);
}
@Bean
public Queue springBootTopicQueue1() {
return new Queue(SPRING_BOOT_TOPIC_QUEUE1);
}
@Bean
public Queue springBootTopicQueue2() {
return new Queue(SPRING_BOOT_TOPIC_QUEUE2);
}
@Bean
public Binding bindingQueue5(Queue springBootTopicQueue1, TopicExchange springBootTopicExchange) {
return BindingBuilder.bind(springBootTopicQueue1).to(springBootTopicExchange).with("java.*");
}
@Bean
public Binding bindingQueue6(Queue springBootTopicQueue2, TopicExchange springBootTopicExchange) {
return BindingBuilder.bind(springBootTopicQueue2).to(springBootTopicExchange).with("java.#");
}
}
6.3、搭建消费者工程
6.3.1、创建工程
创建消费者工程springboot-rabbitmq-consumer
6.3.2、添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
6.2.3、配置文件
创建application.yml ,内容如下
server:
port: 9090
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
6.2.4、消息监听处理类
编写消息监听器 MyListener
@Component
public class MyListener {
@RabbitListener(queues = "spring_boot_simple_queue")
public void simpleQueueListener(String message) {
System.out.println("simpleQueue队列: 消费者接收到的消息为:" + message);
}
@RabbitListener(queues = "spring_boot_work_queue")
public void workQueueListener1(String message) {
System.out.println("workQueue队列1: 消费者接收到的消息为:" + message);
}
@RabbitListener(queues = "spring_boot_work_queue")
public void workQueueListener2(String message) {
System.out.println("workQueue: 消费者接收到的消息为:" + message);
}
@RabbitListener(queues = "spring_boot_fanout_queue1")
public void fanoutQueueListener1(String message) {
System.out.println("fanout队列1: 消费者接收到的消息为:" + message);
}
@RabbitListener(queues = "spring_boot_fanout_queue2")
public void fanoutQueueListener2(String message) {
System.out.println("fanout队列2: 消费者接收到的消息为:" + message);
}
@RabbitListener(queues = "spring_boot_direct_queue1")
public void listenDirectQueue1(String message) {
System.out.println("消费者1接收到 debug 和 info 消息:" + message);
}
@RabbitListener(queues = "spring_boot_direct_queue2")
public void listenDirectQueue2(String message) {
System.out.println("消费者2接收到 debug 消息:" + message);
}
@RabbitListener(queues = "spring_boot_topic_queue1")
public void listenTopicQueue1(String message) {
System.out.println("消费者1接收到 java 或者java.x java.xx...消息:" + message);
}
@RabbitListener(queues = "spring_boot_topic_queue2")
public void listenTopicQueue2(String message) {
System.out.println("消费者2接收到 java.x 消息:" + message);
}
}
6.4、测试
@SpringBootTest
class SpringbootRabbitmqProducerApplicationTests {
public static final String SPRING_BOOT_SIMPLE_QUEUE = "spring_boot_simple_queue";
public static final String SPRING_BOOT_WORK_QUEUE = "spring_boot_work_queue";
public static final String SPRING_BOOT_FANOUT_EXCHANGE = "spring_boot_fanout_exchange";
public static final String SPRING_BOOT_DIRECT_EXCHANGE = "spring_boot_direct_exchange";
public static final String SPRING_BOOT_TOPIC_EXCHANGE = "spring_boot_topic_exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(SPRING_BOOT_SIMPLE_QUEUE, message);
}
@Test
public void testWorkQueue() {
String message = "hello, spring amqp!";
for (int i = 1; i < 20; i++) {
rabbitTemplate.convertAndSend(SPRING_BOOT_WORK_QUEUE, message + "======" + i);
}
}
@Test
public void testFanoutExchange() {
String message = "hello, spring boot, 我是fanout交换机";
for (int i = 1; i <= 20; i++) {
rabbitTemplate.convertAndSend(SPRING_BOOT_FANOUT_EXCHANGE, "", message + "======" + i);
}
}
@Test
public void testDirectExchange() {
String message = "hello, spring boot, 我是direct交换机";
for (int i = 1; i <= 20; i++) {
if (i % 2 == 0) {
rabbitTemplate.convertAndSend(SPRING_BOOT_DIRECT_EXCHANGE, "debug", message + i);
} else {
rabbitTemplate.convertAndSend(SPRING_BOOT_DIRECT_EXCHANGE, "info", message + i);
}
}
}
@Test
public void testTopicExchange() {
String message = "hello, spring boot, 我是topic交换机";
for (int i = 1; i <= 20; i++) {
if (i % 2 == 0) {
rabbitTemplate.convertAndSend(SPRING_BOOT_TOPIC_EXCHANGE, "java.php", message + i);
} else {
rabbitTemplate.convertAndSend(SPRING_BOOT_TOPIC_EXCHANGE, "java.go.python", message + i);
}
}
}
}
第一次启动,应先启动生产者,创建队列和交换机,如先启动消费者则找不到队列
七、RabbitMQ 高级特性
7.1、消息可靠性投递
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
rabbitmq 整个消息投递的路径为: producer —> rabbitmq broker —> exchange —> queue —> consumer
- 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
- 消息从 exchange 到 queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递
7.1.1、confirm确认模式代码实现
1、创建maven工程,消息的生产者工程,项目模块名称:rabbitmq-producer-spring 2、添加依赖
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.13</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.13</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
3、在 resources 目录下创建 rabbitmq.properties 配置文件,添加链接RabbitMQ相关信息
rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
4、在 resources 目录下创建 spring-rabbitmq-producer.xml 配置文件,添加以下配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbitmq="http://www.springframework.org/schema/rabbit"
xmlns:rabbitm="http://www.springframework.org/schema/rabbit"
xmlns:rabbit="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbitm:connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"/>
<rabbitmq:admin connection-factory="connectionFactory"/>
<rabbitmq:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<rabbitmq:queue id="queue_confirm" name="queue_confirm"/>
<rabbitmq:direct-exchange name="exchange_confirm">
<rabbitmq:bindings>
<rabbitmq:binding queue="queue_confirm" key="confirm"></rabbitmq:binding>
</rabbitmq:bindings>
</rabbitmq:direct-exchange>
</beans>
5、编写测试代码
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testConfirm() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了....");
if (ack) {
System.out.println("接收成功消息" + cause);
} else {
System.out.println("接收失败消息" + cause);
}
}
});
rabbitTemplate.convertAndSend("exchange_confirm2222", "confirm", "测试confirm消息......");
}
}
6、测试结果
7.1.2、return退回模式代码实现
回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败是 才会执行 ReturnCallBack,具体实现如下:
- 在 spring-rabbitmq-producer.xml 配置文件,在 rabbit:connection-factory节点 添加配置:
<rabbitm:connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-returns="true"
publisher-confirms="true"/>
- 编写测试方法
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
...
@Test
public void testReturn() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了....");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
}
});
rabbitTemplate.convertAndSend("exchange_confirm", "confirm222", "message confirm....");
}
}
7.1.3、小结
对于确认模式:
- 设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。
- 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
对于退回模式
- 设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。
- 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函returnedMessage。
在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。 使用channel列方法,完成事务控制: txSelect(), 用于将当前channel设置成transaction模式 txCommit(),用于提交事务 txRollback(),用于回滚事务
7.2、Consumer ACK
ack指 Acknowledge,确认。 表示消费端收到消息后的确认方式。 有三种确认方式:
- 自动确认:acknowledge=“none”
- 手动确认:acknowledge=“manual”
- 根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦,不作讲解)
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
7.2.1、代码实现
1、创建maven工程,消息的消费者工程,项目模块名称:rabbitmq-consumer-spring 2、添加依赖
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.13</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.13</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
3、在 resources 目录下创建 rabbitmq.properties 配置文件,添加链接RabbitMQ相关信息
rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
4、在 resources 目录下创建 spring-rabbitmq-consumer.xml 配置文件,添加以下配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<context:component-scan base-package="com.java521.listener"/>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener ref="ackListener" queue-names="queue_confirm"></rabbit:listener>
</rabbit:listener-container>
</beans>
5、编写ackListener 监听类实现ChannelAwareMessageListener接口
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(new String(message.getBody()));
System.out.println("处理逻辑业务");
int i = 1 / 0;
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
channel.basicNack(deliveryTag, true, true);
}
}
}
7.2.2、小结
在rabbit:listener-container 标签中设置acknowledge 属性,设置ack方式
如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false); 方法确认签收消息 如果出现异常,则在catch中调用 basicNack 或 basicReject ,拒绝消息,让MQ重新发送消息。
如何保证消息的高可靠性传输
- 持久化
exchange要持久化 queue要持久化 message要持久化 - 生产方确认Confirm
- 消费方确认Ack
- Broker高可用
7.3、消费端限流
如上图所示:如果在A系统中需要维护相关的业务功能,可能需要将A系统的服务停止,那么这个时候消息的生产者还是一直会向MQ中发送待处理的消息,消费者此时服务已经关闭,导致大量的消息都会在MQ中累积。如果当A系统成功启动后,默认情况下消息的消费者会一次性将MQ中累积的大量的消息全部拉取到自己的服务,导致服务在短时间内会处理大量的业务,可能会导致系统服务的崩溃。 所以消费端限流是非常有必要的。 可以通过MQ中的 listener-container 配置属性 perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
7.3.1、代码实现
- 编写 QosListener 监听类,保证当前的监听类消息处理机制是 ACK 为手动方式
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
System.out.println(new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
}
- 在配置文件的 listener-container 配置属性中添加配置
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="qosListener" queue-names="queue_confirm"></rabbit:listener>
</rabbit:listener-container>
配置说明: perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
7.3.2、小结
在rabbit:listener-container 中配置 prefetch属性设置消费端一次拉取多少消息消费端的确认模式一定为手动确认。acknowledge=“manual”
7.4、TTL
TTL 全称 Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清除。 RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。 可以在RabbitMQ管理控制台设置过期时间。
7.4.1、代码实现
7.4.1.1、设置队列的过期时间
- 在消息的生产方中,在 spring-rabbitmq-producer.xml 配置文件中,添加如下配置
<rabbitmq:queue name="queue_ttl" id="queue_ttl">
<rabbitmq:queue-arguments>
<entry key="x-message-ttl" value="30000" value-type="java.lang.Integer"></entry>
</rabbitmq:queue-arguments>
</rabbitmq:queue>
<rabbitmq:topic-exchange name="exchange_ttl">
<rabbitmq:bindings>
<rabbitmq:binding pattern="ttl.#" queue="queue_ttl"></rabbitmq:binding>
</rabbitmq:bindings>
</rabbitmq:topic-exchange>
- 编写发送消息测试方法
@Test
public void testTtl() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("exchange_ttl", "ttl.hehe", "message ttl...." + i);
}
}
测试结果:当消息发送成功后,过30s后在RabbitMQ的管理控制台会看到消息会自动删除。
7.4.1.2、设置单个消息的过期时间
编写代码测试,并且设置队列的过期时间为30s, 单个消息的过期时间为3s
@Test
public void testTtl2() {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("3000");
System.out.println("进入");
return message;
}
};
for (int i = 1; i <= 10; i++) {
if (i == 5) {
System.out.println("执行...");
rabbitTemplate.convertAndSend("exchange_ttl", "ttl.hehe", "message ttl...." + i, messagePostProcessor);
} else {
rabbitTemplate.convertAndSend("exchange_ttl", "ttl.hehe" + i, "message ttl...." + i);
}
}
}
如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
- 队列过期后,会将队列所有消息全部移除。
- 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
7.4.2、小结
- 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
- 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
- 如果两者都进行了设置,以时间短的为准。
7.5、死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。 消息成为死信的三种情况
- 队列消息长度到达限制(要投递的队列消息满了,无法投递)
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
如果这个包含死信的队列配置了dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。
队列绑定死信交换机: 给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
队列将死信投递给死信交换机时,必须知道两个信息:
- 死信交换机名称
- 死信交换机与死信队列绑定的RoutingKey
这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。
7.5.1、代码实现
1、在消息的生产方中,在 spring-rabbitmq-producer.xml 配置文件中,添加如下配置:
声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
<rabbitmq:queue name="test_ueue_dlx" id="test_queue_dlx"></rabbitmq:queue>
<rabbitmq:topic-exchange name="test_exchange_dlx">
<rabbitmq:bindings>
<rabbitmq:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbitmq:binding>
</rabbitmq:bindings>
</rabbitmq:topic-exchange>
声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
<rabbitmq:queue name="queue_dlx" id="queue_dlx"></rabbitmq:queue>
<rabbitmq:topic-exchange name="exchange_dlx">
<rabbitmq:bindings>
<rabbitmq:binding pattern="dlx.#" queue="queue_dlx"></rabbitmq:binding>
</rabbitmq:bindings>
</rabbitmq:topic-exchange>
正常队列绑定死信交换机,并设置相关参数信息
<rabbitmq:queue name="test_queue_dlx" id="test_queue_dlx">
<rabbitmq:queue-arguments>
<entry key="x-dead-letter-exchange" value="exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
<entry key="x-max-length" value="10" value-type="java.lang.Integer" />
</rabbitmq:queue-arguments>
</rabbitmq:queue>
<rabbitmq:topic-exchange name="test_exchange_dlx">
<rabbitmq:bindings>
<rabbitmq:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbitmq:binding>
</rabbitmq:bindings>
</rabbitmq:topic-exchange>
<rabbitmq:queue name="queue_dlx" id="queue_dlx"></rabbitmq:queue>
<rabbitmq:topic-exchange name="exchange_dlx">
<rabbitmq:bindings>
<rabbitmq:binding pattern="dlx.#" queue="queue_dlx"></rabbitmq:binding>
</rabbitmq:bindings>
</rabbitmq:topic-exchange>
2、编写测试方法
@Test
public void testDlx() {
rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "我是一条消息,我会死吗?");
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "我是一条消息,我会死吗?");
}
rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "我是一条消息,我会死吗?");
}
7.5.2、小结
- 死信交换机和死信队列和普通的没有区别
- 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
- 消息成为死信的三种情况:
队列消息长度到达限制; 消费者拒接消费消息,并且不重回队列; 原队列存在消息过期设置,消息到达超时时间未被消费;
7.6、延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。 提出需求:
- 下单后,30分钟未支付,取消订单,回滚库存。
- 新用户注册成功7天后,发送短信问候。
实现方式: - 定时器
- 延迟队列
注意:在RabbitMQ中并未提供延迟队列功能。 但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
7.6.1、代码实现
1、在消息的生产方中,在 spring-rabbitmq-producer.xml 配置文件中,添加如下配置:
<rabbitmq:queue id="order_queue" name="order_queue">
<rabbitmq:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx"/>
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel"/>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
</rabbitmq:queue-arguments>
</rabbitmq:queue>
<rabbitmq:topic-exchange name="order_exchange">
<rabbitmq:bindings>
<rabbitmq:binding pattern="order.#" queue="order_queue"></rabbitmq:binding>
</rabbitmq:bindings>
</rabbitmq:topic-exchange>
<rabbitmq:queue id="order_queue_dlx" name="order_queue_dlx"></rabbitmq:queue>
<rabbitmq:topic-exchange name="order_exchange_dlx">
<rabbitmq:bindings>
<rabbitmq:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbitmq:binding>
</rabbitmq:bindings>
</rabbitmq:topic-exchange>
2、编写测试方法
@Test
public void testDelay() throws InterruptedException {
rabbitTemplate.convertAndSend("order_exchange", "order.msg", "订单信息:id=1,time=2021年");
for (int i = 10; i > 0; i--) {
System.out.println(i + "...");
Thread.sleep(1000);
}
}
7.6.2、小结
- 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
- RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。
7.7、日志与监控
7.7.1、RabbitMQ日志
RabbitMQ默认日志存放路径:/var/log/rabbitmq/rabbit@xxx.log 或者安装目录/var/log/rabbitmq/rabbit@xxx.log
RabbitMQ日志详细信息: 日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等。
7.7.2、web管控台监控
直接访问当前的IP:15672,输入用户名和密码(默认是 guest),就可以查看RabbitMQ的管理控制台。当然也可通过命令的形式来查看。如下:
rabbitmqctl list_queues
rabbitmqctl list_users
rabbitmqctl list_connections
rabbitmqctl list_exchanges
rabbitmqctl list_consumers
rabbitmqctl environment
rabbitmqctl list_queues name messages_unacknowledged
rabbitmqctl list_queues name memory
rabbitmqctl list_queues name messages_ready
7.8、消息追踪
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。
7.8.1、消息追踪-Firehose
firehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为 amq.rabbitmq.trace,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。 注意:打开 trace 会影响消息写入功能,适当打开后请关闭。
rabbitmqctl trace_on
消息追踪验证: 1、创建一个队列test_trace ,并将当前的队列绑定到 amq.rabbitmq.trace 交换机上,设置RoutingKey为:# 2、未开启消息追踪之前,我们发送一个消息 当前消息发送成功后,在控制台我们可以看到当前消息的具体信息 3、设置开启消息追踪,在发送一条消息 我们发现当前消息也正常存在,并且开启消息追踪后,会多出一条消息是 amq.rabbitmq.trace 交换机发给当前队列的消息,消息中的内容是比较完整的。
建议:在开发阶段我们可以开启消息追踪,在实际生产环境建议将其关闭 关闭Firehose命令:rabbitmqctl trace_off
7.8.2、消息追踪-rabbitmq_tracing
rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。
rabbitmq-plugins enable rabbitmq_tracing
创建trace 发送消息成功后,我们点击日志文件,要求输入RabbitMQ的登录用户名和密码。
建议:在开发阶段我们可以开启消息追踪插件,在实际生产环境不建议建议开启,除非是非常特殊的业务场景,根据实际情况选择开启即可。
7.8.3、RabbitMQ应用问题
7.8.3.1、消息可靠性保障
提出需求:如何能够保证消息的 100% 发送成功? 首先大家要明确任何一个系统都不能保证消息的 100% 投递成功,我们是可以保证消息以最高最可靠的发送给目标方。 在RabbitMQ中采用 消息补充机制 来保证消息的可靠性 步骤分析: 参与部分:消息生产者、消息消费者、数据库、三个队列(Q1、Q2、Q3)、交换机、回调检查服务、定时检查服务
- 消息的生产者将业务数据存到数据库中
- 发送消息给 队列Q1
- 消息的生产者等待一定的时间后,在发送一个延迟消息给队列 Q3
- 消息的消费方监听 Q1 队列消息,成功接收后
- 消息的消费方会 发送 一条确认消息给 队列Q2
- 回调检查服务监听 队列Q2 发送的确认消息
- 回调检查服务接收到确认消息后,将消息写入到 消息的数据库表中
- 回调检查服务同时也会监听 队列Q3延迟消息, 如果接收到消息会和数据库比对消息的唯一标识
- 如果发现没有接收到确认消息,那么回调检查服务就会远程调用 消息生产者,重新发送消息
- 重新执行 2-7 步骤,保证消息的可靠性传输
- 如果发送消息和延迟消息都出现异常,定时检查服务会监控 消息库中的消息数据,如果发现不一致的消息然后远程调用消息的生产者重新发送消息。
7.8.3.2、消息幂等性处理
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。 在本教程中使用 乐观锁机制 保证消息的幂等操作
|