想要实现RabbitMQ延时队列有两种方式
一:TTL(消息过期) + 死信队列
TTL + 死信队列的实现方式主要是,TTL 来控制延时时间,等到延时时间过期,消息就会被扔到死信队列来处理,从而达到延时队列的效果。
TTL 过期时间有两种设置方式:
单独指定消息的过期时间
@GetMapping("/topic/sendMsg2")
public String topicSendMsg2(){
String msg = "hello World";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("20000");
Message message = new Message(msg.getBytes(),messageProperties);
System.out.println("topicSendMsg2{}"+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
rabbitTemplate.convertAndSend("test_topic_exchange_name","test_topic_routing_name",message);
return "ok~";
}
优点 :每条消息的过期时间都可以自由的控制,控制粒度小。 缺点 :没有统一的控制,如果过期时间一致的话,则需要每条都写过期配置 消息推送到队列后,如果指定时间内没有被消费,则会自动过期。
注意: RabbitMQ只会对队列头部的消息进行过期淘汰。如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。会造成消息不会及时地过期淘汰,导致消息的堆积。
给队列设置消息过期时间,队列中的所有消息都有同样的过期时间。
@Bean
public Queue createTopicQueue(){
Map<String, Object> arguments = new HashMap<>(2);
arguments.put("x-message-ttl",30000);
arguments.put("x-dead-letter-exchange", TEST_DEAD_TOPIC_EXCHANGE_NAME);
arguments.put("x-dead-letter-routing-key", TEST_DEAD_TOPIC_ROUTING_NAME);
return new Queue(TEST_TOPIC_QUEUE_NAME,true,false,false,arguments);
}
对队列设置过期时间,这队列的每条消息的过期时间都一致, 注意:如果两个过期时间都设置的话,则以时间最短的那个为主。
二:RabbitMQ插件使用
Docker 安装RabbitMQ的延时插件
下载插件
根据自己的版本下载对应的插件 rabbitmq-delayed-message-exchange
安装
上传到服务器的/zhanghang/rabbitmq/plugs 文件夹下,然后进行如下操作
docker cp /zhanghang/rabbitmq/plugs/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbit:/plugins
docker exec -it rabbit /bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq-plugins list
docker restart rabbit
使用
@Configuration
public class TopicRabbitConfig1 {
public static final String TEST_TOPIC_EXCHANGE_NAME = "test_topic_exchange_name1";
public static final String TEST_TOPIC_QUEUE_NAME = "test_topic_queue_name1";
public static final String TEST_TOPIC_ROUTING_NAME = "test_topic_routing_name1";
@Bean
public Queue createTopicQueue1(){
return new Queue(TEST_TOPIC_QUEUE_NAME,true,false,false);
}
@Bean
public CustomExchange createTopicExchange1(){
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
return new CustomExchange(TEST_TOPIC_EXCHANGE_NAME,"x-delayed-message",true,false,args);
}
@Bean
public Binding createTopicBinding1(){
return BindingBuilder.bind(createTopicQueue1()).to(createTopicExchange1()).with(TEST_TOPIC_ROUTING_NAME).noargs();
}
}
@GetMapping("/topic/sendMsg2")
public String topicSendMsg2(){
String msg = "hello World";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDelay(1 * 60 * 1000);
Message message = new Message(msg.getBytes(),messageProperties);
System.out.println("topicSendMsg2{}"+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
rabbitTemplate.convertAndSend("test_topic_exchange_name1","test_topic_routing_name1",message);
return "ok~";
}
测试发现经过一分钟后消费者收到消息
|