理解下死信,就是永远都不能被消费到的信息。死信队列其实就是为了防止消息未消费成功丢失的场景。通过死信队列存储这类消息,然后再对死信队列的消息做处理操作。场景比如订单支付挂起。
maven配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>SpringbootTest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>SpringbootTest</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
定义两个消费者,一个01消费正常队列,一个02消费死信队列。
public class DeadConsumer01 {
private static final String DEAD_EXCHANGE = "dead-1exchange";
private static final String NORMAL_EXCHANGE = "normal-1exchange";
private static final String DEAD_QUEUE = "dead-1queue";
private static final String NORMAL_QUEUE = "normal-1queue";
@SneakyThrows
public static void main(String[] args) {
Channel channel = MqChanneUtils.getChannel();
//声明交换机
channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//声明正常队列-需绑定死信交换机
Map<String,Object> paramMap = new HashMap<>();
paramMap.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置与之绑定的死信交换机
paramMap.put("x-dead-letter-routing-key","dead");//设置死信队列的绑定key
paramMap.put("x-max-length",4);//设置队列消息最大数
paramMap.put("x-message-ttl",10000);//设置队列中消息过期时间
channel.queueDeclare(NORMAL_QUEUE,false,false,false,paramMap);
//绑定
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
DeliverCallback deliverCallback = (comsumeTag, delivery)->{
String message = new String(delivery.getBody(), "UTF-8");
if (message.equals("消息3")){
channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);
}else {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
System.out.println("consumer01已经收到消息" + message);
}
};
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,comsumeTag->{
});
}
public class DeadConsumer02 {
private static final String DEAD_QUEUE = "dead-1queue";
@SneakyThrows
public static void main(String[] args) {
Channel channel = MqChanneUtils.getChannel();
DeliverCallback deliverCallback = (comsumeTag, delivery)->{
String message = new String(delivery.getBody(),"UTF-8");
System.out.println("consumer02 死信队列已经收到消息"+message);
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,comsumeTag->{
});
}
}
定义一个生产者
?
public class Producter01 {
private static final String NORMAL_EXCHANGE = "normal-1exchange";
@SneakyThrows
public static void main(String[] args) {
Channel channel = MqChanneUtils.getChannel();
/* AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().expiration("10000").build();*/
for (int i =0; i < 5;i++) {
String message = "消息";
channel.basicPublish(NORMAL_EXCHANGE, "normal", false, null,(message+i).getBytes(StandardCharsets.UTF_8));
}
}
}
?
?原理就是根据消费者01的定义的一些参数来控制是否进入死信队列。如它的消息在10s内没被消费,这就可以用作订单支付超时的场景。如它的消息长度超过最大长度限制,这可以控制高并发下的抢单场景。如消费者这边对于某些特殊消息要做拒绝处理,但又需要留痕到数据库。都可以用死信队列在做逻辑。? 其实死信队列跟普通队列一致,就是你定义一个普通队列来解决死信的问题也是OK的,只不过MQ里头把它作为一个参数。定义这个更加使用灵活。
|