项目介绍:
1.使用死信队列方式
原理:设置消息在发送后一段时间内没被消费则会被推入死信队列进行消费。
项目依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
<relativePath/>
</parent>
<groupId>cn.xwl</groupId>
<artifactId>xrabbitmq</artifactId>
<version>1.0</version>
<name>xrabbitmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
yml配置
server:
port: 8070
spring:
application:
name: xwl-rabbitmq
rabbitmq:
host: 10.250.200.106
port: 5672
username: xwl
password: 123456
virtualHost: xwlvm #自定义虚拟机
listener:
simple:
acknowledge-mode: auto #设置为自动签收 测试方便
prefetch: 1
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
队列配置
定义死信队列 并设置x-message-ttl(必须的) 单位为毫秒
package cn.xwl.xrabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TimeRabbitConfig {
@Bean
public Queue TimeQueue() {
return new Queue("TimeQueue", true);
}
@Bean
DirectExchange TimeExchange() {
return new DirectExchange("TimeExchange", true, false);
}
@Bean
Binding bindingTimeDlDirect() {
return BindingBuilder.bind(TimeQueue()).to(TimeExchange()).with("timedlRouteKey");
}
@Bean
public Queue TimeDirectQueue() {
Map<String,Object> params=new HashMap<>();
params.put("x-dead-letter-exchange", "TimeExchange");
params.put("x-dead-letter-routing-key", "timedlRouteKey");
params.put("x-message-ttl", 20000);
params.put("x-max-length", 50);
return new Queue("TimeDirectQueue", true,false,false,params);
}
@Bean
DirectExchange TimeDirectExchange() {
return new DirectExchange("TimeDirectExchange", true, false);
}
@Bean
Binding bindingTimeDirect() {
return BindingBuilder.bind(TimeDirectQueue()).to(TimeDirectExchange()).with("timeRouteKey");
}
}
消息发送
@GetMapping("/sendTime")
public String sendTime() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello! waitTime";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
System.out.println("发送消息:"+messageId);
rabbitTemplate.convertAndSend("TimeDirectExchange", "timeRouteKey", map,new CorrelationData(messageId));
return "ok";
}
消息接收
注:这里不要对目标队列也就是timeRouteKey对应的队列创建消费者,因为创建了消息立即会被消费,也就是这种方式不需要目标队列的消费者。
package cn.xwl.xrabbitmq.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
@RabbitListener(queues = "TimeQueue")
public class DlcTimeDirectReceiver {
@RabbitHandler
public void process(Map testMessage, Channel channel, Message message) throws IOException {
System.out.println("TimeQueue消费者一收到消息 : " + testMessage.toString());
System.out.println("死信队列处理消息");
}
}
测试结果:
在发送消息20秒后控制台才会出现消息
2.使用插件方式
原理:在发送时就会延时,消费者要过一段时间才能接收,比较简单,但是要安装插件,还限制了版本,这里以rabbitMQ3.9做参考。
队列配置
定义死信队列 并设置x-message-ttl(必须的) 单位为毫秒
package cn.xwl.xrabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TimeRabbitConfig2 {
@Bean
public DirectExchange TimeExchange2(){
DirectExchange exchange = new DirectExchange("TimeExchange2",true, false);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Queue TimeQueue2(){
return new Queue("TimeQueue2", true);
}
@Bean
public Binding TimeBinding2(){
return BindingBuilder.bind(TimeQueue2()).to(TimeExchange2()).with("TimeKey2");
}
}
消息发送
@GetMapping("/sendTime2")
public String sendTime2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello! waitTime";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
System.out.println("发送消息:"+messageId);
rabbitTemplate.convertAndSend("TimeExchange2", "TimeKey2", map,new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setDelay(6000);
return message;
}},new CorrelationData(messageId));
return "ok";
}
消息接收
注:这里直接使用目标队列即可 发送时消息就会延时
package cn.xwl.xrabbitmq.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
@RabbitListener(queues = "TimeQueue2")
public class CommonTimeDirectReceiver2 {
@RabbitHandler
public void process(Map testMessage, Channel channel, Message message) throws IOException {
System.out.println("TimeQueue2消费者一收到消息 : " + testMessage.toString());
System.out.println("加入死信队列");
}
}
测试结果:
这里代码一点问题都没有,但是运行测试会出现 UNKNOWN EXCHANGE TYPE ‘X-DELAYED-MESSAGE‘ 因为没有安装插件,在git上下载自己对应版本的地址,低版本的可能没有注意! 这里不多说上网址 https://www.freesion.com/article/79811198095/ 解决
完成后进行测试 OK 完成拉
总结
延时队列可以用于订单支付,当我们生成订单时,定时向MQ发送一个延时消息,如果这段时间内没有支付,那么订单就会被推送到取消系统被取消掉,如果支付成功,那么可以通过修改数据库 字段,消息被推送过去时根据字段判断是否需要取消。
|