RabbitMQ-实现延迟队列
1、为什么需要延迟队列
延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
**场景:**在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。
因为我们项目中本身就使用到了Rabbitmq,所以基于方便开发和维护的原则,我们使用了Rabbitmq延迟队列来实现定时任务,
Rabbitmq延迟队列
Rabbitmq本身是没有延迟队列的,只能通过Rabbitmq本身队列的特性来实现,想要Rabbitmq实现延迟队列,需要使用Rabbitmq的死信交换机(Exchange)和消息的存活时间TTL(Time To Live)
死信交换机 一个消息在满足如下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机可以对应很多队列。
一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
上面的消息的TTL到了,消息过期了。
队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
死信交换机就是普通的交换机,只是因为我们把过期的消息扔进去,所以叫死信交换机,并不是说死信交换机是某种特定的交换机
消息TTL(消息存活时间) 消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。
2、创建交换机(Exchanges)和队列(Queues)
上篇文章教大家使用代码创建交换机和队列,所以本文使用的管理平台来操作! 创建死信交换机
创建自动过期消息队列
创建一个一个名为delay_queue1的自动过期的队列,当然图片上面的参数并不会让消息自动过期,因为我们并没有设置x-message-ttl参数,如果整个队列的消息有消息都是相同的,可以设置,这里为了灵活,所以并没有设置
- x-message-ttl:队列中消息存活时间, 单位ms, 超时则认为是死信
- x-dead-letter-exchange:出现死信时路由到该参数指定的Exchange(dlx)
- x-dead-letter-routing-key: dlx路由到其他队列使用的routingKey
创建消息处理队列 这个队列才是真正处理消息的队列,所有进入这个队列的消息都会被处理! 注意两个队列的区别! 消息队列绑定到交换机
进入交换机详情页面,将创建的2个队列(delay_queue1和delay_queue2)绑定到交换机上面
点击我们建立的这个交换机 绑定成功后如图:
代码创建:
@Configuration
public class DelayRabbitConfig {
@Bean
public Queue delayQueue01(){
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl",6000);
map.put("x-dead-letter-exchange","delay_Exchange");
map.put("x-dead-letter-routing-key","delay2");
return new Queue("delay_queue1",true,false,false,map);
}
@Bean
public Queue delayQueue02(){
return new Queue("delay_queue2");
}
@Bean
DirectExchange delay_Exchange() {
return new DirectExchange("delay_Exchange",true,false);
}
@Bean
Binding bindingDirect01() {
return BindingBuilder.bind(delayQueue01()).to(delay_Exchange()).with("delay1");
}
@Bean
Binding bindingDirect02() {
return BindingBuilder.bind(delayQueue02()).to(delay_Exchange()).with("delay2");
}
}
实现延迟队列的流程
3、编写生产者代码
@RestController
@RequestMapping("/rabbit")
public class DelayQueueController {
@Autowired
private RabbitTemplate template;
@GetMapping("/delay")
public String TestMessageAck() {
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
+" 延迟6秒的消息";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("6000");
messageProperties.setCorrelationId(UUID.randomUUID().toString());
Message message = new Message(createTime.getBytes(),messageProperties);
template.convertAndSend("delay_Exchange", "delay1",message);
return "ok";
}
}
3、编写消费者代码
注意消费者绑定的队列是:delay_queue2
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames("delay_queue2");
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("消费延迟6秒队列中的消息======================");
System.out.println("消费者收到消息:" + new String(body)+",当前时间:"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
}
代码运行结果如下:
|