7. RabbitMQ 高级
7.1. 过期时间TTL
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。
- 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
- 第二种方法是对消息进行单独设置,每条消息TTL可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。
7.1.1. 设置队列TTL
在 spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml 文件中添加如下内容:
<!--定义过期队列及其属性,不存在则自动创建-->
<rabbit:queue id="my_ttl_queue" name="my_ttl_queue" auto-declare="true">
<rabbit:queue-arguments>
<!--投递到该队列的消息如果没有消费都将在6秒之后被删除-->
<entry key="x-message-ttl" value-type="long" value="6000"/>
</rabbit:queue-arguments>
</rabbit:queue>
在启动类里加载配置文件:
?然后在测试类 spring-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java 中编写如下方法发送消息到上述定义的队列:
/**
* 过期队列消息
* 投递到该队列的消息如果没有消费都将在6秒之后被删除
*/
@Test
public void ttlQueueTest(){
//路由键与队列同名
rabbitTemplate.convertAndSend("my_ttl_queue", "发送到过期队列my_ttl_queue,6秒内不消费则不能再被消费。");
}
参数 x-message-ttl 的值 必须是非负 32 位整数 (0 <= n <= 2^32-1) ,以毫秒为单位表示 TTL 的值。这样,值 6000 表示存在于 队列 中的当前 消息 将最多只存活 6 秒钟。
如果不设置TTL,则表示此消息不会过期。如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。
7.1.2. 设置消息TTL
消息的过期时间;只需要在发送消息(可以发送到任何队列,不管该队列是否属于某个交换机)的时候设置过期时间即可。在测试类中编写如下方法发送消息并设置过期时间到队列:
/**
* 过期消息
* 该消息投递任何交换机或队列中的时候;如果到了过期时间则将从该队列中删除
*/
@Test
public void ttlMessageTest(){
MessageProperties messageProperties = new MessageProperties();
//设置消息的过期时间,5秒
messageProperties.setExpiration("5000");
Message message = new Message("测试过期消息,5秒钟过期".getBytes(), messageProperties);
//路由键与队列同名
rabbitTemplate.convertAndSend("my_ttl_queue", message);
}
expiration 字段以微秒为单位表示 TTL 值。且与 x-message-ttl 具有相同的约束条件。因为 expiration 字段必须为字符串类型,broker 将只会接受以字符串形式表达的数字。
当同时指定了 queue 和 message 的 TTL 值,则两者中较小的那个才会起作用。
7.2. 死信队列
DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。
具体步骤如下面的章节。
7.2.1. 定义死信交换机
在 spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml 文件中添加如下内容:
<!--定义定向交换机中的持久化死信队列,不存在则自动创建-->
<rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true"/>
<!--定义广播类型交换机;并绑定上述两个队列-->
<rabbit:direct-exchange id="my_dlx_exchange" name="my_dlx_exchange" auto-declare="true">
<rabbit:bindings>
<!--绑定路由键my_ttl_dlx、my_max_dlx,可以将过期的消息转移到my_dlx_queue队列-->
<rabbit:binding key="my_ttl_dlx" queue="my_dlx_queue"/>
<rabbit:binding key="my_max_dlx" queue="my_dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
7.2.2. 队列设置死信交换机
为了测试消息在过期、队列达到最大长度后都将被投递死信交换机上;所以添加配置如下:
在 spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml 文件中添加如下内容:
<!--定义过期队列及其属性,不存在则自动创建-->
<rabbit:queue id="my_ttl_dlx_queue" name="my_ttl_dlx_queue" auto-declare="true">
<rabbit:queue-arguments>
<!--投递到该队列的消息如果没有消费都将在6秒之后被投递到死信交换机-->
<entry key="x-message-ttl" value-type="long" value="6000"/>
<!--设置当消息过期后投递到对应的死信交换机-->
<entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--定义限制长度的队列及其属性,不存在则自动创建-->
<rabbit:queue id="my_max_dlx_queue" name="my_max_dlx_queue" auto-declare="true">
<rabbit:queue-arguments>
<!--投递到该队列的消息最多2个消息,如果超过则最早的消息被删除投递到死信交换机-->
<entry key="x-max-length" value-type="long" value="2"/>
<!--设置当消息过期后投递到对应的死信交换机-->
<entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--定义定向交换机 根据不同的路由key投递消息-->
<rabbit:direct-exchange id="my_normal_exchange" name="my_normal_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding key="my_ttl_dlx" queue="my_ttl_dlx_queue"/>
<rabbit:binding key="my_max_dlx" queue="my_max_dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
7.2.3. 消息过期的死信队列测试
1)发送消息代码
添加 spring-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java 方法
/**
* 过期消息投递到死信队列
* 投递到一个正常的队列,但是该队列有设置过期时间,到过期时间之后消息会被投递到死信交换机(队列)
*/
@Test
public void dlxTTLMessageTest(){
rabbitTemplate.convertAndSend("my_normal_exchange", "my_ttl_dlx", "测试过期消息;6秒过期后会被投递到死信交换机");
}
2)在rabbitMQ管理界面中结果
未过期:
过期后:
3)流程
具体因为队列消息过期而被投递到死信队列的流程:
7.2.4. 消息过长的死信队列测试
1)发送消息代码
添加 spring-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java 方法
/**
* 超过队列长度消息投递到死信队列
* 投递到一个正常的队列,但是该队列有设置最大消息数,到最大消息数之后队列中最早的消息会被投递到死信交换机(队列)
*/
@Test
public void dlxMaxMessageTest(){
rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx",
"队列my_max_dlx_queue的最大长度为2;消息超过后会被投递到死信交换机;这是第1个消息");
rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx",
"队列my_max_dlx_queue的最大长度为2;消息超过后会被投递到死信交换机;这是第2个消息");
rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx",
"队列my_max_dlx_queue的最大长度为2;消息超过后会被投递到死信交换机;这是第3个消息");
}
?
2)在rabbitMQ管理界面中结果
上面发送的3条消息中的第1条消息会被投递到死信队列中(如果启动了消费者,那么队列消息很快会被取走消费掉);
3)消费者接收死信队列消息
与过期消息投递到死信队列的代码和配置是共用的,并不需要重新编写。
4)流程
消息超过队列最大消息长度而被投递到死信队列的流程在前面的图中已包含。
7.3. 延迟队列
延迟队列存储的对象是对应的延迟消息;所谓“延迟消息” 是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
在RabbitMQ中延迟队列可以通过 过期时间 + 死信队列 来实现;具体如下流程图所示:
在上图中;分别设置了两个5秒、10秒的过期队列,然后等到时间到了则会自动将这些消息转移投递到对应的死信队列中,然后消费者再从这些死信队列接收消息就可以实现消息的延迟接收。
延迟队列的应用场景;如:
- 在电商项目中的支付场景;如果在用户下单之后的几十分钟内没有支付成功;那么这个支付的订单算是支付失败,要进行支付失败的异常处理(将库存加回去),这时候可以通过使用延迟队列来处理
- 在系统中如有需要在指定的某个时间之后执行的任务都可以通过延迟队列处理
7.4. 消息确认机制
确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)
在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。
7.4.1 发布确认
有两种方式:消息发送成功确认和消息发送失败回调。
在spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
connectionFactory 中启用消息确认:
<!-- publisher-confirms="true" 表示:启用了消息确认 -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true" />
配置消息确认回调方法如下:
<!-- 消息回调处理类 -->
<bean id="confirmCallback" class="com.itheima.rabbitmq.MsgSendConfirmCallBack"/>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<!-- confirm-callback="confirmCallback" 表示:消息失败回调 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
confirm-callback="confirmCallback"/>
消息确认回调方法com.itheima.rabbitmq.MsgSendConfirmCallBack如下:
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息确认成功....");
} else {
//处理丢失的消息
System.out.println("消息确认失败," + cause);
}
}
}
功能测试如下:
发送消息
com.itheima.rabbitmq.ProducerTest#queueTest
@Test
public void queueTest(){
//路由键与队列同名
rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息。");
}
管理界面确认消息发送成功
消息确认回调
在spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
connectionFactory 中启用回调:
<!-- publisher-returns="true" 表示:启用了失败回调 -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-returns="true" />
配置消息失败回调方法如下:
注意:同时需配置mandatory="true",否则消息则丢失
<!-- 消息失败回调类 -->
<bean id="sendReturnCallback" class="com.itheima.rabbitmq.MsgSendReturnCallback"/>
<!-- return-callback="sendReturnCallback" 表示:消息失败回调 ,同时需配置mandatory="true",否则消息则丢失-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
confirm-callback="confirmCallback" return-callback="sendReturnCallback"
mandatory="true"/>
消息失败回调方法com.itheima.rabbitmq.MsgSendReturnCallback如下:
public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
String msgJson = new String(message.getBody());
System.out.println("Returned Message:"+msgJson);
}
}
功能测试如下:
模拟消息发送失败
com.itheima.rabbitmq.ProducerTest#testFailQueueTest
@Test
public void testFailQueueTest() throws InterruptedException {
//exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE
amqpTemplate.convertAndSend("test_fail_exchange", "", "测试消息发送失败进行确认应答。");
}
失败回调结果如下:
7.4.2 事务支持
场景:业务处理伴随消息的发送,业务处理失败(事务回滚)后要求消息不发送。rabbitmq 使用调用者的外部事务,通常是首选,因为它是非侵入性的(低耦合)。
外部事务的配置:spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
<!-- channel-transacted="true" 表示:支持事务操作 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
confirm-callback="confirmCallback" return-callback="sendReturnCallback"
channel-transacted="true" />
<!--平台事务管理器-->
<bean id="transactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
测试类或者测试方法上加入@Transactional注解
@Transactional
public class ProducerTest {
@Test
public void queueTest2(){
//路由键与队列同名
rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息--01。");
System.out.println("----------------dosoming:可以是数据库的操作,也可以是其他业务类型的操作---------------");
//模拟业务处理失败
System.out.println(1/0);
rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息--02。");
}
}
7.5. 消息追踪
消息中心的消息追踪需要使用Trace实现,Trace是Rabbitmq用于记录每一次发送的消息,方便使用Rabbitmq的开发者调试、排错。可通过插件形式提供可视化界面。Trace启动后会自动创建系统Exchange:amq.rabbitmq.trace ,每个队列会自动绑定该Exchange,绑定后发送到队列的消息都会记录到Trace日志。
7.5.1 消息追踪启用与查看
以下是trace的相关命令和使用(要使用需要先rabbitmq启用插件,再打开开关才能使用):
命令集 | 描述 |
---|
rabbitmq-plugins list | 查看插件列表 | rabbitmq-plugins enable rabbitmq_tracing | rabbitmq启用trace插件 | rabbitmqctl trace_on | 打开trace的开关 | rabbitmqctl trace_on -p itcast | 打开trace的开关(itcast为需要日志追踪的vhost) | rabbitmqctl trace_off | 关闭trace的开关 | rabbitmq-plugins disable rabbitmq_tracing | rabbitmq关闭Trace插件 | rabbitmqctl set_user_tags heima administrator | 只有administrator的角色才能查看日志界面 |
安装插件并开启 trace_on 之后,会发现多个 exchange:amq.rabbitmq.trace ,类型为:topic。
7.5.2 日志追踪
第一步:发送消息
rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息--01。");
发送成功,web查看多了一条消息
第二步:查看trace
?第三步:点击Tracing查看Trace log files
第四步:点击itcast-trace.log确认消息轨迹正确性
url:http://127.0.0.1:15672/api/trace-files/itcast-trace.log
浏览器截图:
10. RabbitMQ 应用与面试
10.1. 消息堆积
当消息生产的速度长时间,远远大于消费的速度时。就会造成消息堆积。
-
消息堆积的影响
- 可能导致新消息无法进入队列
- 可能导致旧消息无法丢失
- 消息等待消费的时间过长,超出了业务容忍范围。
-
产生堆积的情况
- 生产者突然大量发布消息
- 消费者消费失败
- 消费者??? 出现性能瓶颈。
- 消费者挂掉
-
解决办法
- 排查消费者的消费性能瓶颈
- 增加消费者的多线程处理
- 部署增加多个消费者
-
场景介绍
在用户登录成功之后,会向rabbitmq发送一个登录成功的消息。这个消息可以被多类业务订阅。
登录成功,记录登录日志;登录成功,根据规则送积分。其中登录送积分可以模拟成较为耗时的处理
场景重现:让消息产生堆积
-
生产者大量发送消息:使用Jmeter开启多线程,循环发送消息大量进入队列。 -
消费者消费失败:随机抛出异常,模拟消费者消费失败,没有ack(手动ack的时候)。 -
设置消费者的性能瓶颈:在消费方法中设置休眠时间,模拟性能瓶颈 -
关闭消费者:停掉消费者,模拟消费者挂掉 -
消费者端示例核心代码: public class LoginIntegralComsumer implements MessageListener {
public void onMessage(Message message) {
String jsonString = null;
try {
jsonString = new String(message.getBody(),"UTF8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
if(new Random().nextInt(5)==2){
//模拟发生异常
throw new RuntimeException("模拟处理异常");
}
try {
//模拟耗时的处理过程
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println(Thread.currentThread().getName()+"处理消息:"+jsonString);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} -
如果每1秒钟处理一条消息
1小时处理 60*60=3600条,处理完10万条数据 100000/3600=27.7小时
问题解决:消息已经堆积如何解决
消息队列堆积,想办法把消息转移到一个新的队列,增加服务器慢慢来消费这个消息,这样就可以
使生产环境的队列为可用状态。
1、解决消费者消费异常问题
2、解决消费者的性能瓶颈:改短休眠时间
5.4小时。
3、增加消费线程,增加多台服务器部署消费者。快速消费。增加10个线程
concurrency="10" prefetch="10"
1小时
增加一台服务器
0.5小时
10.2. 消息丢失
在实际的生产环境中有可能出现一条消息因为一些原因丢失,导致消息没有消费成功,从而造成数据不一致等问题,造成严重的影响,比如:在一个商城的下单业务中,需要生成订单信息和扣减库存两个动作,如果使用RabbitMQ来实现该业务,那么在订单服务下单成功后需要发送一条消息到库存服务进行扣减库存,如果在此过程中,一条消息因为某些原因丢失,那么就会出现下单成功但是库存没有扣减,从而导致超卖的情况,也就是库存已经没有了,但是用户还能下单,这个问题对于商城系统来说是致命的。
消息丢失的场景主要分为:消息在生产者丢失,消息在RabbitMQ丢失,消息在消费者丢失。
10.2.1. 消息在生产者丢失
场景介绍
消息生产者发送消息成功,但是MQ没有收到该消息,消息在从生产者传输到MQ的过程中丢失,一般是由于网络不稳定的原因。
解决方案
采用RabbitMQ 发送方消息确认机制,当消息成功被MQ接收到时,会给生产者发送一个确认消息,表示接收成功。RabbitMQ 发送方消息确认模式有以下三种:普通确认模式,批量确认模式,异步监听确认模式。spring整合RabbitMQ后只使用了异步监听确认模式。
说明
异步监听模式,可以实现边发送消息边进行确认,不影响主线程任务执行。
步骤
-
生产者发送3000条消息 -
在发送消息前开启开启发送方确认模式
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true"
/>
3. 在发送消息前添加异步确认监听器
//添加异步确认监听器
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
// 处理ack
System.out.println("已确认消息,标识:" + correlationData.getId());
} else {
// 处理nack, 此时cause包含nack的原因。
System.out.println("未确认消息,标识:" + correlationData.getId());
System.out.println("未确认原因:" + cause);
//重发
}
}
});
10.2.2. 消息在RabbitMQ丢失
场景介绍
消息成功发送到MQ,消息还没被消费却在MQ中丢失,比如MQ服务器宕机或者重启会出现这种情况
解决方案
持久化交换机,队列,消息,确保MQ服务器重启时依然能从磁盘恢复对应的交换机,队列和消息。
spring整合后默认开启了交换机,队列,消息的持久化,所以不修改任何设置就可以保证消息不在RabbitMQ丢失。但是为了以防万一,还是可以申明下。
10.2.3. 消息在消费者丢失
场景介绍
消息费者消费消息时,如果设置为自动回复MQ,消息者端收到消息后会自动回复MQ服务器,MQ则会删除该条消息,如果消息已经在MQ被删除但是消费者的业务处理出现异常或者消费者服务宕机,那么就会导致该消息没有处理成功从而导致该条消息丢失。
解决方案
设置为手动回复MQ服务器,当消费者出现异常或者服务宕机时,MQ服务器不会删除该消息,而是会把消息重发给绑定该队列的消费者,如果该队列只绑定了一个消费者,那么该消息会一直保存在MQ服务器,直到消息者能正常消费为止。本解决方案以一个队列绑定多个消费者为例来说明,一般在生产环境上也会让一个队列绑定多个消费者也就是工作队列模式来减轻压力,提高消息处理效率
MQ重发消息场景:
1.消费者未响应ACK,主动关闭频道或者连接
2.消费者未响应ACK,消费者服务挂掉
10.3. 有序消费消息
10.3.1. 场景介绍
场景1
当RabbitMQ采用work Queue模式,此时只会有一个Queue但是会有多个Consumer,同时多个Consumer直接是竞争关系,此时就会出现MQ消息乱序的问题。
场景2
当RabbitMQ采用简单队列模式的时候,如果消费者采用多线程的方式来加速消息的处理,此时也会出现消息乱序的问题。
10.3.2. 场景1解决
10.3.3. 场景2解决
?
10.4. 重复消费
10.4.1. 场景介绍
为了防止消息在消费者端丢失,会采用手动回复MQ的方式来解决,同时也引出了一个问题,消费者处理消息成功,手动回复MQ时由于网络不稳定,连接断开,导致MQ没有收到消费者回复的消息,那么该条消息还会保存在MQ的消息队列,由于MQ的消息重发机制,会重新把该条消息发给和该队列绑定的消息者处理,这样就会导致消息重复消费。而有些操作是不允许重复消费的,比如下单,减库存,扣款等操作。
MQ重发消息场景:
1.消费者未响应ACK,主动关闭频道或者连接
2.消费者未响应ACK,消费者服务挂掉
10.4.2. 解决方案
如果消费消息的业务是幂等性操作(同一个操作执行多次,结果不变)就算重复消费也没问题,可以不做处理,如果不支持幂等性操作,如:下单,减库存,扣款等,那么可以在消费者端每次消费成功后将该条消息id保存到数据库,每次消费前查询该消息id,如果该条消息id已经存在那么表示已经消费过就不再消费否则就消费。本方案采用redis存储消息id,因为redis是单线程的,并且性能也非常好,提供了很多原子性的命令,本方案使用setnx命令存储消息id。
setnx(key,value):如果key不存在则插入成功且返回1,如果key存在,则不进行任何操作,返回0
|