流程图设计
????????
?创建配置类进行绑定
/**
* TTL 队列 配置文件类
*/
@Configuration
public class TtlQueueConfig {
/**
* 普通交换机的名称
*/
private static final String X_EXCHANGE = "X";
/**
* 死信交换机的名称
*/
private static final String Y_DEAD_LETTER_EXCHANGE = "Y";
/**
* 普通队列的名称
*/
private static final String QUEUE_A = "QA";
private static final String QUEUE_B = "QB";
/**
* 死信队列的名称
*/
private static final String DEAD_LETTER_QUEUE = "QD";
/**
* 声明X交换机
*/
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
/**
* 声明Y交换机
*/
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
/**
* 声明队列 QA 绑定死信交换机Y routingKey=YD TTL 过期时间10S
*/
@Bean("queueA")
public Queue queueA() {
Map<String, Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
//设置TTL 单位是ms
arguments.put("x-message-ttl", 10 * 1000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
/**
* 声明队列 QB 绑定死信交换机Y routingKey=YD TTL 过期时间40S
*/
@Bean("queueB")
public Queue queueB() {
Map<String, Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
//设置TTL 单位是ms
arguments.put("x-message-ttl", 40 * 1000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
/**
* 死信队列 QD
*/
@Bean("queueD")
public Queue queueD() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
/**
* 队列绑定 QA和X交换机绑定 routingKey=XA
*/
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
/**
* 队列绑定 QB和X交换机绑定 routingKey=XB
*/
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
/**
* 队列绑定 QD和Y交换机绑定 routingKey=YD
*/
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
创建死信队列监听消费者
/**
* 延迟队列TTL 消费者
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
/**
* 监听死信队列QD 接收消息
*/
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信消息队列:{}", new Date().toString(), msg);
}
}
创建简单的消息推送接口
/**
* 返送延迟消息
*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 消息推送接口
*
* @param message 推送消息
*/
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date().toString(), message);
//进行消息推送
rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列:" + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列:" + message);
}
}
启动执行结果
调用接口
http://localhost:8080/ttl/sendMsg/娃哈哈
控制台打印结果
2021-07-24 15:01:40.714 INFO 38495 --- [nio-8080-exec-1] c.s.r.s.controller.SendMsgController : 当前时间:Sat Jul 24 15:01:40 CST 2021,发送一条信息给两个TTL队列:娃哈哈
2021-07-24 15:01:50.733 INFO 38495 --- [ntContainer#0-1] c.s.r.s.c.DeadLetterQueueConsumer : 当前时间:Sat Jul 24 15:01:50 CST 2021,收到死信消息队列:消息来自ttl为10s的队列:娃哈哈
2021-07-24 15:02:20.725 INFO 38495 --- [ntContainer#0-1] c.s.r.s.c.DeadLetterQueueConsumer : 当前时间:Sat Jul 24 15:02:20 CST 2021,收到死信消息队列:消息来自ttl为40s的队列:娃哈哈
自定义延迟时间队列配置
/**
* 普通队列的名称
*/
private static final String QUEUE_C = "QC";
//声明QC
@Bean("queueC")
public Queue queueC() {
Map<String, Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
//无需定义过期时间
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
/**
* 队列绑定 QC和X交换机绑定 routingKey=XC
*/
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
自定义时间消息接口
/**
* 自定义时间消息推送接口
*/
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
log.info("当前时间:{},发送一条时长{}毫秒的信息给一个TTL队列:{}", new Date().toString(), ttlTime, message);
//进行消息推送
rabbitTemplate.convertAndSend("X", "XC", "消息来自ttl为" + ttlTime + "ms的队列:" + message, msg -> {
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
启动执行结果
调用接口
http://localhost:8080/ttl/sendExpirationMsg/娃哈哈1/20000
http://localhost:8080/ttl/sendExpirationMsg/娃哈哈2/2000
控制台打印结果
2021-07-24 16:37:05.475 INFO 61239 --- [nio-8080-exec-4] c.s.r.s.controller.SendMsgController : 当前时间:Sat Jul 24 16:37:05 CST 2021,发送一条时长20000毫秒的信息给一个TTL队列:娃哈哈1
2021-07-24 16:37:10.360 INFO 61239 --- [nio-8080-exec-5] c.s.r.s.controller.SendMsgController : 当前时间:Sat Jul 24 16:37:10 CST 2021,发送一条时长2000毫秒的信息给一个TTL队列:娃哈哈2
2021-07-24 16:37:25.477 INFO 61239 --- [ntContainer#0-1] c.s.r.s.c.DeadLetterQueueConsumer : 当前时间:Sat Jul 24 16:37:25 CST 2021,收到死信消息队列:消息来自ttl为20000ms的队列:娃哈哈1
2021-07-24 16:37:25.477 INFO 61239 --- [ntContainer#0-1] c.s.r.s.c.DeadLetterQueueConsumer : 当前时间:Sat Jul 24 16:37:25 CST 2021,收到死信消息队列:消息来自ttl为2000ms的队列:娃哈哈2
存在问题
先后发送了两条消息,第一条延迟20s,第二条延迟2s,最终结果是同时到达。因为RabbitMQ只会检查第一条消息是否过期,过期就丢到死信队列进行排队,如果第一条消息延迟时间很长,那么之后的消息也不会得到优先执行。这个问题可以通过RabbitMQ插件得到解决。
|