重复消费问题:
为了解决消费端因为种种原因而造成的消息丢失问题,我们都知道根源在于因为RabbitMQ的自动ack机制,所以为了避免以上问题,我们会选中手动ack,以确保消息不会因为某些原因而丢失。
但随之而来的也有一个问题:如果忘记ack,或者又因为种种原因消费者端没能给RabbitMQ对应ack,无法确认消息已经被消费完了,那这条未被“约束”的消息也许就会被另一个消费者消费,就会造成重复消费问题
如果是进行增加,或者一些非幂等性操作,比如扣费业务,那可就完犊子了
而其中用Redis似乎是对解决重复消费问题的一个比较好的方案:
思路如下:
在消费者消费消息之前,先将消息的id放到Redis中
最明了的方式:设置id为0时为正在执行业务,id为1是为业务执行成功,消费完毕
若手动ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,若key已经存在,则获取他的值,如果值为0,即当前消息还没消费完,当前消费者就什么都不做,如果值为1,则直接ack操作。
当然,这也会有一种非常严重的隐患:
若第一个消费者在执行业务时,出现了死锁问题,便会无法获取key的资源
为此我们应该在将消息id放入Redis时,为它设置一个生存时间,也可以叫过期时间,避免在极端情况下的死锁问题。
?
话不多,先来测试实践一下
一、测试依赖导导导
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
?
?二、在测试前用了一个先前编写好的创建连接的工具类,就不用一直配置连接信息
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQClient {
public static Connection getConnection(){
// 创建Connection工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("test");
factory.setPassword("test");
factory.setVirtualHost("/test");
// 创建Connection
Connection conn = null;
try {
conn = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
// 返回Connection
return conn;
}
}
?三、生产者代码实现
import java.util.UUID;
public class Publisher {
@Test
public void publish() throws Exception {
//使用工具类创建消息队列连接
Connection connection = RabbitMQClient.getConnection();
//创建Channel
Channel channel = connection.createChannel();
//指定路由规则及消息属性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(1) //0:消息不持久化 1:消息持久化
.messageId(UUID.randomUUID().toString()) //生成随机信息id
.build();
String msg = "This is MQ Test!"; //消息体
channel.basicPublish("","MqTest",true,properties,msg.getBytes());
// 参数1:exchange:指定exchange,使用"",使用默认的交换机
// 参数2:routingKey:指定路由的规则,使用具体的队列名称。
// 参数3:mandatory:若exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,会将消息返还给生产者。
// 参数4:BasicProperties :指定传递的消息所携带的properties属性。
// 参数5 byte[]:指定发布的具体消息
System.out.println("生产者发布消息成功!");
channel.close();
connection.close();
}
}
四、消费者代码实现
import com.yj.utils.RabbitMQClient;
import com.rabbitmq.client.*;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import java.io.IOException;
public class Consumer {
@Test
public void consume() throws Exception {
Connection connection = RabbitMQClient.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("MqTest",true,false,false,null);
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true,设置false后MQ重启后队列全部删除)
//参数3:exclusive - 是否排外(conn.close()当前队列会被自动删除,还有当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
//自定义监听器监听队列
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//1、连接redis
Jedis jedis = new Jedis("127.0.0.1", 6379);
//2、获取队列中消息id
String messageId = properties.getMessageId();
//3、setnx到redis中,value默认为0,设置过期时间10s
String result = jedis.set(messageId, "0", "NX", "EX", 10);
if (result != null && result.equalsIgnoreCase("OK")){
System.out.println("接收到信息:" + new String(body,"UTF-8"));
//4、若成功添加,即消费成功,设置消息id,表示正在执行业务
//这里就不设置1了,6868为了在测试数据库中显眼些,
jedis.set(messageId,"68686868");
//5、手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
//deliveryTag:该消息的index;
//multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
}else {
//6、如果2中的setnx失败,获取key对应的value,如果是0什么都不做,如果是1则手动ack
String s = jedis.get(messageId);
if ("1".equalsIgnoreCase(s)){
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
}
};
channel.basicConsume("MqTest",true,consumer);
System.out.println("消费者开始监听队列!");
System.in.read();//让程序不停止,只有键盘录入数据才会往下走
// 释放资源
channel.close();
connection.close();
}
}
五、简单测试一下
?
?
测试成功!
?终:看来还是可行的+_+
|