1.消息中间件
1.1 消息中间件概念
????消息中间件(Message Queue,MQ),又称消息队列,是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
????以前应用之间的远程调用:
????加入MQ后应用之间的调用:
1.2 MQ的优势
1.2.1 应用解耦
????MQ相当于一个中介,生产方通过MQ与消费方交互,它使得应用程序解耦合。
注意:系统的耦合性越高,容错性就越低,可维护性就越低。
使用 MQ 可以让各应用解耦,提升容错性和可维护性。
1.2.2 任务异步处理
????将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
原来处理一个下单操作耗时:20 + 300 + 300 + 300 = 920ms,响应速度太慢!
加入MQ后,处理一个下单操作只需 20 + 5 = 25ms,其他后续耗时业务将通过消费MQ中的数据去完成,提升了用户体验和系统吞吐量(单位数据内处理请求的数目,TPS、QPS都是吞吐量的常用量化指标)。
1.2.3 削峰填谷
????如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000上,这个时候数据库肯定卡死了。
????加入MQ之后,由MQ去承载瞬间增加的多个用户请求并保存消息数据,然后系统就可以按照自己的消费能力去消费MQ中的请求数据,比如每秒1000个消息,这样慢慢写入数据库,这样就不会卡死数据库了。
????但是使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持1000QPS,直到消费完积压的消息,这就叫做“填谷”。
相关概念:
- QPS/TPS:每秒处理请求/事务的数量,因特网上,经常用QPS来衡量域名系统服务器的机器的性能。
- 并发数:系统同时处理请求/事务数。
- QPS/TPS = 并发数/平均响应时间。
1.3 MQ的不足
-
系统可用性降低; 系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。
如何保证MQ的高可用?
-
系统复杂度提高; MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。
如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
-
引出一致性问题。 A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息,如果 B 系统、C 系统处理成功,D 系统处理 失败,就存在一致性问题。
如何保证消息数据处理的一致性?
1.4 常见的MQ产品
????目前,业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。
| RabbitMQ | ActiveMQ | RocketMQ | Kafka |
---|
公司/社区 | Rabbit | Apache | 阿里 | Apache | 开发语言 | Erlang | Java | Java | Scala&Java | 协议支持 | AMQP、XMPP、SMTP、STOMP | OpenWire、STOMP、 REST、XMPP、AMQP | 自定义 | 自定义协议, 社区封装了 http协议支持 | 客户 端支 持语 言 | 官方支持Erlang、Java、Ruby等,社区产出多种API,几乎支持所有语言 | Java、C、C++、Python、PHP、 Perl、.net等 | Java,C++ (不成熟) | 官方支持Java,社区产出 多种API,如 PHP、Python 等 | 单机吞吐量 | 万级(NO.3) | 万级(NO.4) | 十万级(最NO.1) | 十万级(NO.2) | 消息延迟 | 微妙级 | 毫秒级 | 毫秒级 | 毫秒以内 | 功能特性 | 并发能力强,性能极其好, 延时低,社区活跃,管理界面丰富 | 老牌产品,成熟度 高,文档较多 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,毕竟是为大数据领域准备的。 |
阿里的淘宝、天猫就是用的RocketMQ。
1.5 MQ的两种主流实现方式
1.5.1 AMQP
????高级消息队列协议(Advanced Message Queuing Protocol,AMQP),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,遵循此协议,不受客户端和中间件产品和开发语言限制。2006年,AMQP 规范发布。
可类比HTTP。
1.5.2 JMS
????Java 消息服务(Java Message Service,JMS)是一个应用程序接口,是一个 Java 平台中关于面向消息中间件的API。JMS 是 JavaEE 规范中的一种。
可类比JDBC。
????很多消息中间件都实现了JMS规范,例如 ActiveMQ、RabbitMQ 官方虽然没有提供 JMS 的实现包,但是开源社区有。
1.5.3 AMQP 和 JMS 的区别
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式;
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的;
- JMS规定了两种消息模式;而AMQP的消息模式更加丰富。
2. RabbitMQ
RabbitMQ官方地址
2.1 RabbitMQ概念
????2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。
注意:Erlang 语言是专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
2.2 RabbitMQ基础架构
????RabbitMQ 中的相关概念:
-
Broker 接收和分发消息的应用,RabbitMQ Server就是 Message Broker。 -
Virtual host 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等。 -
Connection publisher/consumer 和 broker 之间的 TCP 连接。 -
Channel 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread 创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。
注意:Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
-
Exchange message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。
常用的类型有:direct (point-to-point)、topic (publish-subscribe)、fanout (multicast)。
-
Queue 消息最终被送到这里等待 consumer 取走。 -
Binding exchange 和 queue 之间的虚拟连接,Binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。
2.3 安装并配置RabbitMQ
Windows下安装并配置RabbitMQ、Linux下安装并配置RabbitMQ。
2.4 RabbitMQ入门示例
2.4.1 RabbitMQ工作模式
????RabbitMQ提供了6种工作模式,分别为:
- 简单模式;
- work queues 工作队列模式;
- Publish/Subscribe 发布与订阅模式;
- Routing 路由模式;
- Topics 主题模式(也叫通配符模式);
- RPC 远程调用模式(远程调用,不太算MQ)。
接下来以RabbitMQ的各种模式为基础,进行入门案例的编写演示。
2.4.2 简单模式
一个生产者、一个消费者和一个消息队列。
- P:生产者(publisher),即要发送消息的一方;
- C:消费者(consumer),即要接收消息的一方,会一直等待消息到来;
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
2.4.2.1 工程创建
提前创建一个空工程。
-
在空工程下创建一个子模块rabbitmq-01-simple_mode; -
向该子模块中的POW文件添加依赖。 因为需要使用 AMQP 协议来连接 RabbitMQ 进行通信,所以需要添加其相关依赖。 <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
2.4.2.2 编写连接工具类
public class RabbitMqConnectionUtil {
private static final ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.192.130");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ljh");
connectionFactory.setUsername("lijinghua");
connectionFactory.setPassword("lijinghua");
}
private RabbitMqConnectionUtil() {
}
public static Connection getConnection() throws IOException, TimeoutException {
return connectionFactory.newConnection();
}
}
2.4.2.3 构建生产者
public class Publisher {
public static final String QUEUE_NAME = "simple_mode";
public static void main(String[] args){
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMqConnectionUtil.getConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "简单模式---消息---";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("生产者已发送消息:"+message);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
if (channel != null){
channel.close();
}
if (connection != null){
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
2.4.2.4 构建消费者
public class Consumer {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMqConnectionUtil.getConnection();
channel = connection.createChannel();
channel.queueDeclare(Publisher.QUEUE_NAME, true, false, false, null);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(Publisher.QUEUE_NAME, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
注意:这里不要释放资源,因为消费者监听队列的时候是阻塞状态,保持持续监听,当生产者发送消息到队列中时,小消费者会第一时间消费。
2.4.3 工作队列模式
????对于任务过重或任务较多情况,使用工作队列可以提高任务处理的速度。
一个生产者、多个消费者和一个消息队列。
工作队列模式和简单模式的区别仅仅在于消费者的数量增加了,多个消费者共同消费一个队列中的消息。由于这里只需要增加消费者即可,因此,代码参考2.4.2小节即可,下面以2个消费者为例,生产者发送20条消息为例。
注意:工作队列模式下,对于一个队列中的同一个消息而言,多个消费者之间是竞争的关系。
2.4.4 订阅模式
2.4.4.1 订阅模式的概念
注意:相比于简单模式和工作队列模式而言,订阅模式增加了一个exchange角色,生产者不再将消息发送到队列中,而是发送到exchange。
-
P:生产者(publisher),即要发送消息的一方; -
C:消费者(consumer),即要接收消息的一方,会一直等待消息到来; -
X:交换机(exchange),一方面接收来自生产者发送的消息,一方面将消息传递给绑定到此交换机的某个队列、每个队列,又或者是直接丢弃。交换机采用的处理方式取决于exchange类型,常见的有:
- Fanout:广播,将消息交给绑定到此交换机的每个队列;
- Direct:定向,把消息交给符合指定routing key 的队列;
- Topic:通配符,把消息交给符合routing pattern (路由模式)的队列。
注意:交换机只负责消息的转发,不能存储消息,如果没有任何队列与该交换机绑定,或者没有符合路由规则的队列,则消息会丢失!
-
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
2.4.4.2 发布与订阅模式
????发布订阅模式,即exchange类型为广播 Fanout 的订阅模式。在该模式下,每个消费者监听自己的队列,生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
注意:
-
发布/订阅模式需要绑定交换机并设置队列和交换机的绑定,绑定之后,一个消息被多个消费者收到;工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机。
和工作队列模式不同,工作队列模式中的消费者是在竞争消息。
-
发布/订阅模式的生产者是面向交换机发送消息,工作队列模式的生产者是面向队列发送消息(底层使用默认交换机)。
2.4.4.2.1 工程创建及连接工具类
参考2.4.2.1、2.4.2.2小节。
2.4.4.2.2 构建生产者
public class Publisher {
public static final String FANOUT_EXCHANGE = "fanout_exchange";
public static final String FANOUT_QUEUE_1 = "fanout_queue_1";
public static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMqConnectionUtil.getConnection();
channel = connection.createChannel();
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");
String message = "发布/订阅模式---消息---";
for (int i = 0; i < 5; i++) {
channel.basicPublish(FANOUT_EXCHANGE, "", null, (message+i).getBytes());
}
System.out.println("生产者已发送5条消息:" + message);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
2.4.4.2.3 构建消费者1&消费者2
public class Consumer1 {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMqConnectionUtil.getConnection();
channel = connection.createChannel();
channel.queueDeclare(Publisher.FANOUT_QUEUE_1, true, false, false, null);
channel.queueBind(Publisher.FANOUT_QUEUE_1,Publisher.FANOUT_EXCHANGE,"");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(Publisher.FANOUT_QUEUE_1, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
消费者1和消费者2代码类似。
2.4.4.3 路由模式
- P:生产者(publisher),即要发送消息的一方,向Exchange发送消息,发送消息时,会指定一个routing key;
- C1:消费者(consumer),即要接收消息的一方,其所在队列指定了需要routing key 为 insert 的消息;
- C2:消费者(consumer),即要接收消息的一方,其所在队列指定了需要routing key 为 update 的消息;
- X:交换机(exchange),一方面接收来自生产者发送的消息,一方面将消息传递给与routing key完全匹配的队列。
? 路由模式,即exchange类型为定向 direct 的订阅模式,其特点为:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个routing key (路由key);
- 消息的生产者在向 Exchange发送消息时,也必须指定消息的 routing key;
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 routing key 与消息的 routing key 完全一致,才会接收到消息。
2.4.4.3.1 工程创建及连接工具类
参考2.4.2.1、2.4.2.2小节。
2.4.4.3.2 构建生产者
public class Publisher {
public static final String DIRECT_EXCHANGE = "direct_exchange";
public static final String DIRECT_QUEUE_1 = "direct_queue_1";
public static final String DIRECT_QUEUE_2 = "direct_queue_2";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMqConnectionUtil.getConnection();
channel = connection.createChannel();
channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DIRECT_QUEUE_1, true, false, false, null);
channel.queueDeclare(DIRECT_QUEUE_2, true, false, false, null);
channel.queueBind(DIRECT_QUEUE_1, DIRECT_EXCHANGE,"insert");
channel.queueBind(DIRECT_QUEUE_2, DIRECT_EXCHANGE,"update");
String message1 = "路由模式---新增消息---";
channel.basicPublish(DIRECT_EXCHANGE, "insert", null, message1.getBytes());
String message2 = "路由模式---修改消息---";
channel.basicPublish(DIRECT_EXCHANGE, "update", null, message2.getBytes());
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
2.4.4.3.3 构建消费者1&消费者2
public class Consumer1 {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMqConnectionUtil.getConnection();
channel = connection.createChannel();
channel.queueDeclare(Publisher.DIRECT_QUEUE_1, true, false, false, null);
channel.queueBind(Publisher.DIRECT_QUEUE_1,Publisher.DIRECT_EXCHANGE,"insert");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(Publisher.DIRECT_QUEUE_1, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
消费者1和消费者2代码类似,仅声明的队列和routing key不同。
2.4.4.4 通配符模式
????通配符模式,即exchange类型为统配符 Topic 的订阅模式。Topic 类型与 Direct 相比,都是可以根据 routing key 把消息路由到不同的队列;但是 Topic 类型的 Exchange 可以让队列在绑定 routing key 的时候使用通配符!
通配符模式相当于加强版的路由模式。
????routing key 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert。通配符规则:
-
# :匹配一个或多个词;
item.# :能够匹配 item.insert.abc 或者 item.insert。
-
* :匹配不多不少恰好1个词。 -
item.* :只能匹配 item.insert 。
- 红色Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到;
- 黄色Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配。
2.4.4.4.1 工程创建及连接工具类
参考2.4.2.1、2.4.2.2小节。
2.4.4.4.2 构建生产者
public class Publisher {
public static final String TOPIC_EXCHANGE = "topic_exchange";
public static final String TOPIC_QUEUE_1 = "topic_queue_1";
public static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMqConnectionUtil.getConnection();
channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
String message1 = "topic模式---新增消息---";
channel.basicPublish(TOPIC_EXCHANGE, "item.insert", null, message1.getBytes());
String message2 = "topic模式---修改消息---";
channel.basicPublish(TOPIC_EXCHANGE, "item.update", null, message2.getBytes());
String message3 = "topic模式---删除消息---";
channel.basicPublish(TOPIC_EXCHANGE, "item.delete", null, message3.getBytes());
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
2.4.4.4.3 构建消费者1&消费者2
public class Consumer1 {
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMqConnectionUtil.getConnection();
channel = connection.createChannel();
channel.exchangeDeclare(Publisher.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(Publisher.TOPIC_QUEUE_1, true, false, false, null);
channel.queueBind(Publisher.TOPIC_QUEUE_1,Publisher.TOPIC_EXCHANGE,"item.*");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(Publisher.TOPIC_QUEUE_1, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
消费者1和消费者2代码类似,消费者2只需更换绑定以及声明的队列。这里把交换机的声明放到了消费者端,所以先启动消费者。
2.4.4.4.4 测试
2.4.5 模式总结
????RabbitMQ工作模式:
-
简单模式 Hello World 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机); -
工作队列模式 Work Queue 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机); -
发布订阅模式 Publish/Subscribe 需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列; -
路由模式 Routing 需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。 -
通配符模式 Topic 需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到匹配通配符的队列。
2.5 RabbitMQ 整合 Spring
提前准备一个空工程。
2.5.1 生产者
创建一个子工程。
2.5.1.1 添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lijinghua</groupId>
<artifactId>rabbitmq-06-spring_publisher</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.13.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.2.13.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
2.5.1.2 创建连接RabbitMQ的配置文件
# 主机IP地址
rabbitmq.host=192.168.192.130
# 端口号
rabbitmq.port=5672
# 账户密码
rabbitmq.username=lijinghua
rabbitmq.password=lijinghua
#虚拟主机
rabbitmq.virtual-host=/ljh
2.5.1.3 创建Spring配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1"/>
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
<rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
<rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="ljh.*" queue="spring_topic_queue_star"/>
<rabbit:binding pattern="ljh.#" queue="spring_topic_queue_well"/>
<rabbit:binding pattern="ljh.#" queue="spring_topic_queue_well2"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
2.5.1.4 测试
package com.lijinghua.rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring.xml")
public class PublisherTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void simpleTest() {
rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息。");
}
@Test
public void fanoutTest() {
rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "发送到spring_fanout_exchange交换机的广播消息");
}
@Test
public void topicTest() {
rabbitTemplate.convertAndSend("spring_topic_exchange", "ljh.sh", "发送到spring_topic_exchange交换机ljh.sh的消息");
rabbitTemplate.convertAndSend("spring_topic_exchange", "ljh.sh.1", "发送到spring_topic_exchange交换机ljh.sh.1的消息 ");
rabbitTemplate.convertAndSend("spring_topic_exchange", "ljh.sh.2", "发送到spring_topic_exchange交换机ljh.sh.2的消息 ");
rabbitTemplate.convertAndSend("spring_topic_exchange", "ljh.cn", "发送到spring_topic_exchange交换机ljh.cn的消息");
}
}
2.5.2 消费者
创建一个子工程。
2.5.2.1 添加依赖
参见2.5.1.1小节。
2.5.2.2 创建连接RabbiMQ的配置文件
参见2.5.1.2小节。
2.5.2.3 创建Spring配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<bean id="springQueueListener" class="com.lijinghua.rabbitmq.listener.SpringQueueListener"/>
<bean id="fanoutListener1" class="com.lijinghua.rabbitmq.listener.FanoutListener"/>
<bean id="fanoutListener2" class="com.lijinghua.rabbitmq.listener.FanoutListener"/>
<bean id="topicListenerStar" class="com.lijinghua.rabbitmq.listener.TopicListenerStar"/>
<bean id="topicListenerWell" class="com.lijinghua.rabbitmq.listener.TopicListenerWell"/>
<bean id="topicListenerWell2" class="com.lijinghua.rabbitmq.listener.TopicListenerWell2"/>
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
<rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
<rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
<rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
<rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>
</rabbit:listener-container>
</beans>
2.5.2.4 消息监听器
2.5.2.4.1 队列监听器
public class SpringQueueListener implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.5.2.4.2 广播监听器
public class FanoutListener implements MessageListener {
private static int count = 0;
private int id;
public FanoutListener() {
this.id = ++count;
}
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("广播监听器%d---接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
getId(),
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public int getId() {
return id;
}
}
2.5.2.4.3 星号通配符监听器
package com.lijinghua.rabbitmq.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerStar implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("通配符*监听器:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.5.2.4.4 井号通配符监听器1
package com.lijinghua.rabbitmq.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerWell implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("通配符#监听器1:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.5.2.4.5 井号通配符监听器2
package com.lijinghua.rabbitmq.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerWell2 implements MessageListener {
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("通配符#监听器2:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.5.2.5 测试
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring.xml")
public class ConsumerTest {
@Test
public void test() {
while(true){}
}
}
2.6 RabbitMQ 整合 SpringBoot
创建一个空工程,下面以通配符模式为例。
2.6.1 生产者
创建一个 SpringBoot 子工程。
2.6.1.1 添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.3</version>
<relativePath/>
</parent>
<groupId>com.lijinghua</groupId>
<artifactId>rabbitmq-08-springboot_publisher</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>rabbitmq-08-springboot_publisher</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
2.6.1.2 配置application.yml配置文件
spring:
rabbitmq:
host: 192.168.192.130
port: 5672
virtual-host: /ljh
username: lijinghua
password: lijinghua
2.6.1.3 绑定交换机和队列
????创建通配符类型的交换机和队列,并对其进行绑定。
package com.lijinghua.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
public static final String TOPIC_EXCHANGE = "springboot_topic_exchange";
public static final String TOPIC_QUEUE = "springboot_topic_queue";
@Bean("myTopicExchange")
public Exchange topicExchange() {
return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE).durable(true).build();
}
@Bean("myTopicQueue")
public Queue topicQueue() {
return QueueBuilder.durable(TOPIC_QUEUE).build();
}
@Bean
public Binding bindingQueueToExchange(@Qualifier("myTopicQueue") Queue queue,@Qualifier("myTopicExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
}
}
2.6.1.4 测试
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE,
"item.insert", "商品新增,routing key 为item.insert");
rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE,
"item.update", "商品修改,routing key 为item.update");
rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE,
"item.delete", "商品删除,routing key 为item.delete");
}
2.6.2 消费者
创建一个 SpringBoot 子工程。
2.6.2.1 添加依赖
参见2.6.1.1小节。
2.6.2.2 配置application.yml文件
参见2.6.1.2小节。
2.6.2.3 配置消息监听处理类
@Component
public class TopicListener {
@RabbitListener(queues = "springboot_topic_queue")
public void itemRTopicListener(String message){
System.out.println("消费者接收到的消息为:" + message);
}
}
2.6.2.4 测试
2.7 高级特性
????在使用 RabbitMQ 的时候,消息生产者希望杜绝任何消息丢失或者投递失败场景。因此,RabbitMQ 为我们提供了两种用来控制消息投递可靠性的模式。
注:RabbitMQ 整个消息投递的路径为:producer—>RabbitMQ broker—>exchange—>queue—>consumer
- 消息从 producer 到 exchange 投递失败则会返回一个 confirmCallback 。
- 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递。
2.7.1 确认模式
消息从 producer 到 exchange 投递失败则会返回一个 confirmCallback。
2.7.1.1 开启确认模式
?????如果是RabbitMQ整合Spring,那么可以在Spring的IOC容器中配置ConnectionFactory:
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"/>
????如果是RabbitMQ整合SpringBoot,那么可以在application.yml配置文件中配置:
spring:
rabbitmq:
host: 192.168.192.130
port: 5672
virtual-host: /ljh
username: lijinghua
password: lijinghua
publisher-confirm-type: correlated
2.7.1.2 定义回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了....");
if (ack) {
System.out.println("接收成功消息" + cause);
} else {
System.out.println("接收失败消息" + cause);
}
}
});
可以将回调函数封装到一个类里,通过IOC容器或者是@Component 进行创建。
????修改exchange名称使得消息无法找到这个交换机,模拟发送失败:
2.7.2 退回模式
消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
2.7.2.1 开启退回模式
????如果是RabbitMQ整合Spring,那么可以在Spring的IOC容器中配置ConnectionFactory:
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
publisher-returns="true"/>
????如果是RabbitMQ整合SpringBoot,那么可以在application.yml配置文件中配置:
spring:
rabbitmq:
host: 192.168.192.130
port: 5672
virtual-host: /ljh
username: lijinghua
password: lijinghua
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
2.7.2.2 定义处理模式和回调函数
@Test
public void testReturn() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
public void returnedMessage(Message message, int replyCode, String
replyText, String exchange, String routingKey) {
System.out.println("return 执行了....");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
}
});
rabbitTemplate.convertAndSend("", "spring_queue",
"the second time :message confirm....");
}
????修改routing key 使得触发退回模式:
2.7.3 消费端 Ack
????前面的确认模式和退回模式在消息生产者的角度上保证了消息从 producer—>RabbitMQ broker—>exchange—>queue 的可靠性投递,而消费端 Ack 表示消费端收到消息后的确认方式(queue—>consumer) 。
????这种方式有三种Ack类型:
2.7.3.1 设置Ack类型
以手动确认为例
????如果是RabbitMQ整合Spring,那么可以在Spring的IOC容器中配置消息监听器(acknowledge=“manual”):
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true" acknowledge="manual">
<rabbit:listener ref="springQueueListener2" queue-names="spring_queue"/>
</rabbit:listener-container>
????如果是RabbitMQ整合SpringBoot,那么可以在application.yml配置文件中配置:
spring:
rabbitmq:
host: 192.168.192.130
port: 5672
virtual-host: /ljh
username: lijinghua
password: lijinghua
listener:
simple:
acknowledge-mode: manual
2.7.3.2 设置消息监听器
public class SpringQueueListener2 implements ChannelAwareMessageListener {
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
System.out.println("处理业务逻辑...");
int i = 3/0;
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
channel.basicNack(deliveryTag,true,true);
}
}
public void onMessage(Message message) {
}
}
假设我先发送三条消息deliveryTag 分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag 为8,multiple 设置为 true,会将5、6、7、8的消息全部进行确认。
@Component
public class MyListener {
@RabbitListener(queues = "springboot_topic_queue")
public void itemTopicListener(String message){
System.out.println("消费者接收到的消息为:" + message);
}
@RabbitListener(queues = "spring_queue")
public void itemSimpleListener(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
System.out.println("处理业务逻辑...");
int i = 3/0;
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
channel.basicNack(deliveryTag,true,true);
}
}
}
2.7.4 消费端限流
之前在讲述MQ作用的时候提到了削峰填谷,设置限流可以避免高峰期大流量严重影响消费者服务。
2.7.4.1 设置手动确认
参考2.7.3小节。
2.7.4.2 配置限流
????如果是RabbitMQ整合Spring,那么可以在Spring的IOC容器中配置消息监听器(prefetch=“1”),表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息:
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true" acknowledge="manual" prefetch="1">
<rabbit:listener ref="springQueueListener2" queue-names="spring_queue"/>
</rabbit:listener-container>
????如果是RabbitMQ整合SpringBoot,那么可以在application.yml配置文件中配置:
spring:
rabbitmq:
host: 192.168.192.130
port: 5672
virtual-host: /ljh
username: lijinghua
password: lijinghua
listener:
simple:
acknowledge-mode: manual
prefetch: 1
注意:
-
prefetch 默认值以前是1,这可能会导致高效消费者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch 值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。 -
消费者端拉取消息可能是非顺序的。
2.7.5 TTL消息过期
????通过设置TTL,使得消息推送到队列后,如果指定时间内没有被消费,则会自动过期。
2.7.5.1 管控台设置TTL
????添加队列时添加ttl参数:
2.7.5.2 代码设置TTL
-
Spring中配置Spring配置文件;
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" valuetype="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
-
SpringBoot中创建队列。
private static final int TTL_EXPIRATION = 10000;
@Bean
public Queue TTlQueue(){
return QueueBuilder.durable(TTL_QUEUE).withArgument("x-message-ttl", TTL_EXPIRATION).build();
}
2.7.5.3 统一过期与单独过期
统一过期,即前两小节对队列统一设定TTL,正常发送消息到queue后经过设置的过期时间后统一移除。下面展示对特定消息单独设置过期时间。
@Test
public void testTtl() {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
return message;
}
};
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....", messagePostProcessor);
for (int i = 0; i < 10; i++) {
if (i == 5) {
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....", messagePostProcessor);
} else {
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
}
}
2.7.6 死信队列
????死信交换机(Dead Letter Exchange,DLX),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
注意:
- 有些MQ产品没有交换机的概念,所以就叫死信队列(Dead Letter Queue,DLQ)。
- 如果没有私信交换机(死信队列),那么消息成为死信后就会被丢弃。
2.7.6.1 消息成为死信的三种情况
-
队列消息长度到达限制;
例如,设置队列参数 x-max-length = 10,第11个消息就是死信。
-
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false; -
原队列存在消息过期设置,消息到达超时时间未被消费。
2.7.6.2 队列绑定死信交换机
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key 即可。
-
管控台绑定方式; -
代码绑定方式。
-
Spring中配置Spring配置文件;
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
<entry key="x-max-length" value="10" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
-
SpringBoot中绑定。
@Bean("myTopicQueue")
public Queue topicQueue() {
Map<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange","receive_exchange");
map.put("x-dead-letter-routing-key", "receive_key");
map.put("x-message-ttl", 10000);
map.put("x-max-length",10);
return QueueBuilder.durable(TOPIC_QUEUE).withArguments(map).build();
}
思路基本和上述一致,这里只添加需要的参数,剩余的声明、绑定即可。
2.7.8 延迟队列
????延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
? 需求:
- 下单后,30分钟未支付,取消订单,回滚库存。
- 新用户注册成功7天后,发送短信问候。
? 实现方式:
注意:在RabbitMQ中并未提供延迟队列功能,TTL+死信队列 组合可实现延迟队列的效果。
|