一,消息可靠性投递
在使用
RabbitMQ
的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。
RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
- confifirm 确认模式
- return 退回模式
rabbitmq
整个消息投递的路径为:
producer ---> rabbitmq broker ---> exchange ---> queue ---> consumer
rabbitmq broker:简单来说就是消息队列服务器实体。中文意思:中间件。接受客户端连接,实现AMQP消息队列和路由功能的进程。一个broker里可以开设多个vhost,用作不同用户的权限分离。
- 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
- 消息从 exchange 到 queue 投递失败则会返回一个 returnCallback 。
通过 confirmCallback 判定 消息是否成功到达exchange
因为returnCallback 是?启动消息失败返回,比如路由不到队列时触发回调?
?我们将利用这两个 callback 控制消息的可靠性投递
1,confirm确认模式代码实现
代码实现:
? ? ? ? 1,创建maven 工程,消息的生产者工程,项目模块名称:rabbitmq-producer-spring
? ? ? ? 2,添加依赖
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring‐context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring‐rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring‐test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven‐compiler‐plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
3,在 resources 目录下创建 rabbitmq.properties 配置文件,添加链接RabbitMQ相关信息
rabbitmq.host=172.16.98.133
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual‐host=/
4.
在
resources
目录下创建
spring-rabbitmq-producer.xml
配置文件,添加以下配置
<?xml version="1.0" encoding="UTF‐8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema‐instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring‐beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring‐context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring‐rabbit.xsd">
<!‐‐加载配置文件‐‐>
<context:property‐placeholder location="classpath:rabbitmq.properties"/>
<!‐‐ 定义rabbitmq connectionFactory 1. 设置 publisher‐confirms="true" ‐‐> <rabbit:connection‐factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual‐host="${rabbitmq.virtual‐host}"
publisher‐confirms="true" />
<!‐‐定义管理交换机、队列‐‐>
<rabbit:admin connection‐factory="connectionFactory"/>
<!‐‐定义rabbitTemplate对象操作可以在代码中方便发送消息‐‐>
<rabbit:template id="rabbitTemplate" connection‐factory="connectionFactory"/>
<!‐‐2. 消息可靠性投递(生产端)‐‐>
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
<rabbit:direct‐exchange name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm">
</rabbit:binding>
</rabbit:bindings>
</rabbit:direct‐exchange>
</beans>
或者配置yml 文件

5, 编写测试代码
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring‐rabbitmq‐producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 确认模式:
* 步骤:
* 1. 确认模式开启:ConnectionFactory中开启publisher‐confirms="true"
* 2. 在rabbitTemplate定义ConfirmCallBack回调函数
*/
@Test
public void testConfirm() {
//2. 定义回调 **
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/***
* @param correlationData 相关配置信息
* @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
* @param cause 失败原因
*/
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了....");
if (ack) {
//接收成功
System.out.println("接收成功消息" + cause);
} else {
//接收失败
System.out.println("接收失败消息" + cause);
//做一些处理,让消息再次发送。
}
}
});
//3. 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm111", "confirm", "message confirm....");
}
}
或
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class confirmTest4 implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 确认模式:
* 步骤:
* 1,确认模式开启:在yml 配置文件里配置 publisher-confirms: true
* 2, 在rabbitTemplate 定义ConfirmCallBack 回调函数
*/
@Test
public void testConfirms() {
rabbitTemplate.setConfirmCallback(new confirmTest4());
Map<String, String> msg = new HashMap<>();
msg.put("pageId", "5abefd525b05aa293098fca6");
//转成json串
String jsonString = JSON.toJSONString(msg);
rabbitTemplate.convertAndSend("ex_routing_cms_postpage", "5abefd525b05aa293098fca6", jsonString);
}
/**
*
* @param correlationData 相关配置信息
* @param ack exchange交换机 是否成功收到了消息,true成功,false代表失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm 方法开始执行..............");
if (ack) {
//接收成功
System.out.println("接收成功的消息:" + cause);
} else {
//接收失败
System.out.println("接收失败消息:" + cause);
//接收失败后,我们做一些处理,让消息再次发送,达到消息可靠性传递
}
}
其中重写confirm 方法里的参数,
? ? ? ? correlationData:消息唯一标识
? ? ? ? ack:确认结果
????????cause:失败原因

?这是发送消息到queue 成功后,展示的内容,
但是如果发送失败是这样的

?当发送失败是可以配置return 退回模式代码。
2,?return退回模式代码实现
回退模式:? 当消息发送给EXchange 后,Exchange 路由到Queue 失败是才会执行ReturnCallBack,具体实现如下:
? ? ? ? 1,在Spring-rabbitmq-producer.xml 配置文件,在rabbit:connection-factory 节点添加配置
publisher‐returns="true"
或者 在application.yml 中设置

设置交换机处理失败消息的模式,两种方法,一种写在代码里,一种配置文件
1, 配置文件

?2,代码方式:
/**
* 步骤:
* 1. 开启回退模式:publisher‐returns="true"
* 2. 设置ReturnCallBack
* 3. 设置Exchange处理消息的模式:
* 1. 如果消息没有路由到Queue,则丢弃消息(默认)
* 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
*/
@Test
public void testReturn() {
//设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);
//2.设置ReturnCallBack
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.out.println("return 执行了....");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
//处理
}
});
//3. 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message
confirm....");
}
或
package com.xuecheng.manage_cms.returns;
import com.alibaba.fastjson.JSON;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.HashMap;
import java.util.Map;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ReturnTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testReturn() {
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了..........");
System.out.println("message = " + message);
System.out.println("replyCode = " + replyCode);
System.out.println("replyText = " + replyText);
System.out.println("exchange = " + exchange);
System.out.println("routingKey = " + routingKey);
}
});
Map<String, String> msg = new HashMap<>();
msg.put("pageId", "5abefd525b05aa293098fca6");
//转成json串
String jsonString = JSON.toJSONString(msg);
rabbitTemplate.convertAndSend("ex_routing_cms_postpage", "5abefd525b05aa293098fca61", jsonString);
}
}

设置routingKey为一个不符合规则的key,观察控制台打印结果。
returnedMessage方法中参数
????????消息主体message : message ? ? ? ? 消息主体 message : replyCode ? ? ? ? 描述:replyText ? ? ? ? 消息使用的交换器 exchange : exchange ? ? ? ? 消息使用的路由键 routing : routingKey
总结
对于确认模式
设置
ConnectionFactory
的
publisher-confifirms="true"
开启 确认模式。
使用
rabbitTemplate.setConfifirmCallback
设置回调函数。当消息发送到
exchange
后回调
confiirm方法。在方法中判断
ack
,如果为
true
,则发送成功,如果为
false
,则发送失败,需要处理。
对于退回模式
设置
ConnectionFactory
的
publisher-returns="true"
开启 退回模式。
使用
rabbitTemplate.setReturnCallback
设置退回函数,当消息从
exchange
路由到
queue
失败后,如果设置
了
rabbitTemplate.setMandatory(true)
参数,则会将消息退回给
producer
。并执行回调函数
returnedMessage
。
在
RabbitMQ
中也提供了事务机制,但是性能较差,此处不做讲解。
使用
channel
列方法,完成事务控制:
txSelect(),
用于将当前
channel
设置成
transaction
模式
txCommit()
,用于提交事务
txRollback(),
用于回滚事务
?二,Consumer ACK(消息接收确认)
消息消费者如何通知 Rabbit 消息消费成功?
ack指 Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
?
自动确认:
acknowledge="
none
"
?
手动确认:
acknowledge="
manual
"
?
根据异常情况确认:
acknowledge="
auto
"
,(这种方式使用麻烦,不作讲解)
其中自动确认是指,当消息一旦被
Consumer
接收到,则自动确认收到,并将相应
message
从
RabbitMQ 的消息 缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认方式,则需要在业务处理成功后,调用
channel.basicAck(),手动签收,如果出现异常,则调 用
channel.basicNack()
方法,让其自动重新发送消息
代码实现
1.
创建
maven
工程,消息的消费者工程,项目模块名称:
rabbitmq-consumer-spring
2.
添加依赖
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring‐context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring‐rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring‐test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven‐compiler‐plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
3.
在
resources
目录下创建
rabbitmq.properties
配置文件,添加链接
RabbitMQ
相关信息
rabbitmq.host
=
172.16.98.133
rabbitmq.port
=
5672
rabbitmq.username
=
guest
rabbitmq.password
=
guest
rabbitmq.virtual‐host
=
/
4.
在
resources
目录下创建
spring-rabbitmq-consumer.xml
配置文件,添加以下配置
<?xml version="1.0" encoding="UTF‐8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema‐instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring‐beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring‐context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring‐rabbit.xsd">
<!‐‐加载配置文件‐‐>
<context:property‐placeholder location="classpath:rabbitmq.properties"/>
<!‐‐ 定义rabbitmq connectionFactory ‐‐>
<rabbit:connection‐factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual‐host="${rabbitmq.virtual‐host}"/>
<context:component‐scan base‐package="com.itheima.listener" />
<!‐‐定义监听器容器 添加 acknowledge="manual" 手动‐‐>
<rabbit:listener‐container connection‐factory="connectionFactory" acknowledge="manual"
>
<rabbit:listener ref="ackListener" queue‐names="test_queue_confirm">
</rabbit:listener>
</rabbit:listener‐container>
</beans>
或
5.
编写
ackListener
监听类实现
ChannelAwareMessageListener
接口
/**
* Consumer ACK机制:
* 1. 设置手动签收。acknowledge="manual"
* 2. 让监听器类实现ChannelAwareMessageListener接口
* 3. 如果消息成功处理,则调用channel的 basicAck()签收
* 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
*/
@Component
public class ackListener implements ChannelAwareMessageListener {
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "ex_routing_cms_postpage",type = "direct"),
value = @Queue(value = "queue_cms_postpage_01",durable = "true"),
key = "#.#"
))
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1, 接收转换消息
System.out.println("接收消息内容:"+new String(message.getBody()));
//2, 处理业务逻辑
System.out.println("处理业务逻辑");
//int i = 2/0 ; // 代码运行错误
//3,手动签收
channel.basicAck(deliveryTag,true);
}catch (Exception e){
e.printStackTrace();
System.out.println("代码执行错误了。。。。。。");
//4,代码出现错误后,设置拒绝签收,重新发送
//第三个参数:requeue: 消息重回队列,如何设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
channel.basicNack(deliveryTag,true,true);
//了解
//channel.basicReject(deliveryTag,true);
}
}
}
上面的
? ? @RabbitListener(bindings = @QueueBinding(
? ? ? ? ? ? exchange = @Exchange(value = "ex_routing_cms_postpage",type = "direct"),
? ? ? ? ? ? value = @Queue(value = "queue_cms_postpage_01",durable = "true"),
? ? ? ? ? ? key = "#.#"
? ? ))
是监听注解
消息发送,通过ack 可以知道?Rabbit 消息消费成功。
如果消费者接受消息后,执行代码失败,可让消息重回队列,重新发送消息给消费端。
小结:
- 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
- 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
- 如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
如何保证消息的高可靠性传输?
1.
持久化
? exchange
要持久化
? queue
要持久化
? message
要持久化
2.
生产方确认
Confifirm
3.
消费方确认
Ack
4.Broker
高可用
三,消费端限流

?
? ? ? ? 代码实现:
? ? ? ? 1, 编写一个监听类,保证当前的监听类的消息处理机制是ACK(手动方式)
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
//1.获取消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
//3. 签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
2,在配置文件的listener-container 配置属性中添加配置(或者用@Confirguration)
<rabbit:listener‐container
connection‐factory
=
"connectionFactory"
acknowledge
=
"manual"
prefetch
=
"1"
>
配置说明:
perfetch = 1,
表示消费端每次从
mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下 一条消息。
或
/**
* Consumer ACK机制:
* 1. 设置手动签收。acknowledge="manual"
* 2. 让监听器类实现ChannelAwareMessageListener接口
* 3. 如果消息成功处理,则调用channel的 basicAck()签收
* 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
*/
@Component
public class ackListener implements ChannelAwareMessageListener {
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "ex_routing_cms_postpage",type = "direct"),
value = @Queue(value = "queue_cms_postpage_01",durable = "true"),
key = "#.#"
))
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("一个进来了");
Thread.sleep(10000);
System.out.println("一个结束了");
channel.basicAck(deliveryTag,true);
}catch (Exception e){
e.printStackTrace();
}
}
}
总结:
在
rabbit:listener-container
中配置
prefetch
属性设置消费端一次拉取多少消息
消费端的确认模式一定为手动确认。
acknowledge="manual"
四,TLL(TIME TO LIVE)
存活时间/过期时间: 当消息到达存活时间后,还没有被消费,会被自动清除.
RabbitMQ 可以对消息设置过期时间,也可以对整个队列(Queue) 设置过期时间.

代码实现:
????????设置队列的过期时间:
? ? ? ? 1,在消息的生产方,在spring-rabbitmq-producer.xml配置文件中,添加如下配置
<!‐‐ttl‐‐>
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<!‐‐设置queue的参数‐‐>
<rabbit:queue‐arguments>
<!‐‐x‐message‐ttl指队列的过期时间‐‐>
<entry key="x‐message‐ttl" value="100000" value‐type="java.lang.Integer"/>
</rabbit:queue‐arguments>
</rabbit:queue>
<rabbit:topic‐exchange name="test_exchange_ttl" >
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic‐exchange>
?2 编写发送消息测试方法
@Test
public void testTtl() {
for (int i = 0; i < 10; i++) {
// 发送消息
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
}
}
测试结果:当消息发送成功后,过
10s
后在
RabbitMQ的管理控制台会看到消息会自动删除。
设置单个消息的过期时间
? ? ? ? 编写代码测试,并且设置队列的过期时间为100s,单个消息的过期时间为5s,
@Test
public void testTtl() {
// 消息后处理对象,设置一些消息的参数信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1.设置message的信息
message.getMessageProperties().setExpiration("5000");
//消息的过期时间
//2.返回该消息
return message;
}
};
//消息单独过期
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
for (int i = 0; i < 10; i++) {
if(i == 5){
//消息单独过期
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
}else{
//不过期的消息
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
}
}
}
消息过期就是在发送消息后面加个参数,??MessagePostProcessor 类, 这个类里配置过期时间,那么这条消息过期时间就不一样
如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
????????
????????队列过期后,会将队列所有消息全部移除。
????????
????????消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉
)
总结:
设置队列过期时间使用参数:
x-message-ttl
,单位:
ms(
毫秒
)
,会对整个队列消息统一过期。
设置消息过期时间使用参数:
expiration
。单位:
ms(
毫秒
),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
如果两者都进行了设置,以时间短的为准。
五,死信队列
死信队列,英文缩写:
DLX
。
Dead Letter Exchange(死信交换机)
当消息成为
Dead message
后,可以被重新
发送到另一个交换机,这个交换机就是
DLX
死信队列:没有被及时消费的消息存放的队列

?消息成为死信的三种情况:
? ? ? ? 1,队列消息长度到达极限
? ? ? ? 2,消费者拒接消费消息
|