IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ延时队列 -> 正文阅读

[大数据]RabbitMQ延时队列


想要实现RabbitMQ延时队列有两种方式

一:TTL(消息过期) + 死信队列

TTL + 死信队列的实现方式主要是,TTL 来控制延时时间,等到延时时间过期,消息就会被扔到死信队列来处理,从而达到延时队列的效果。

TTL 过期时间有两种设置方式:

单独指定消息的过期时间

@GetMapping("/topic/sendMsg2")
	public String topicSendMsg2(){
		String msg = "hello World";
		// 对每条消息设置过期时间
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setExpiration("20000"); // 时间单位为 毫秒
		Message message = new Message(msg.getBytes(),messageProperties);
		System.out.println("topicSendMsg2{}"+  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
		rabbitTemplate.convertAndSend("test_topic_exchange_name","test_topic_routing_name",message);
		return "ok~";
	}

优点:每条消息的过期时间都可以自由的控制,控制粒度小。
缺点:没有统一的控制,如果过期时间一致的话,则需要每条都写过期配置
消息推送到队列后,如果指定时间内没有被消费,则会自动过期。

注意:
RabbitMQ只会对队列头部的消息进行过期淘汰。如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。会造成消息不会及时地过期淘汰,导致消息的堆积。

给队列设置消息过期时间,队列中的所有消息都有同样的过期时间。

// 正常的队列
	@Bean
	public Queue createTopicQueue(){
		Map<String, Object> arguments = new HashMap<>(2);
		// 设置过期时间,对 队列设置统一的过期时间,单位为 毫秒
		arguments.put("x-message-ttl",30000);

		// 绑定死信交换机
		arguments.put("x-dead-letter-exchange", TEST_DEAD_TOPIC_EXCHANGE_NAME);
		// 绑定死信的路由key
		arguments.put("x-dead-letter-routing-key", TEST_DEAD_TOPIC_ROUTING_NAME);
		// 绑定死信队列的交换机和路由
		return new Queue(TEST_TOPIC_QUEUE_NAME,true,false,false,arguments);
	}

对队列设置过期时间,这队列的每条消息的过期时间都一致,
注意:如果两个过期时间都设置的话,则以时间最短的那个为主。

二:RabbitMQ插件使用

Docker 安装RabbitMQ的延时插件

下载插件

根据自己的版本下载对应的插件
rabbitmq-delayed-message-exchange
在这里插入图片描述

安装

上传到服务器的/zhanghang/rabbitmq/plugs文件夹下,然后进行如下操作
在这里插入图片描述
在这里插入图片描述

#拷贝到rabbitmq容器 773067241f96 中
docker cp /zhanghang/rabbitmq/plugs/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbit:/plugins
# rabbit 是我启动的rabbitmq的别名

#进入容器
docker exec -it rabbit /bin/bash

#启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

#查看
rabbitmq-plugins list

#重新启动容器
docker restart rabbit

在这里插入图片描述
在这里插入图片描述

使用

@Configuration
public class TopicRabbitConfig1 {
	// 正常的topic 队列
	public static final String TEST_TOPIC_EXCHANGE_NAME = "test_topic_exchange_name1";
	public static final String TEST_TOPIC_QUEUE_NAME = "test_topic_queue_name1";
	public static final String TEST_TOPIC_ROUTING_NAME = "test_topic_routing_name1";


	// 正常的队列
	@Bean
	public Queue createTopicQueue1(){
		return new Queue(TEST_TOPIC_QUEUE_NAME,true,false,false);
	}

    // 定义延时的交换机
	@Bean
	public CustomExchange createTopicExchange1(){
		Map<String, Object> args = new HashMap<String, Object>();
		args.put("x-delayed-type", "direct");
		// 参数说明:
		// name:    交换机名称
        // type:  交换机类型
		// durable: 是否持久化
		// autoDelete: 是否自动删除
        // arguments:  配置
		return new CustomExchange(TEST_TOPIC_EXCHANGE_NAME,"x-delayed-message",true,false,args);
	}

	@Bean
	public Binding createTopicBinding1(){
		return BindingBuilder.bind(createTopicQueue1()).to(createTopicExchange1()).with(TEST_TOPIC_ROUTING_NAME).noargs();
	}

}
// 发送消息
@GetMapping("/topic/sendMsg2")
	public String topicSendMsg2(){
		String msg = "hello World";
		// 对每条消息设置过期时间
		MessageProperties messageProperties = new MessageProperties();
        // 设置延时时长,单位为毫秒
		messageProperties.setDelay(1 * 60 * 1000);
		Message message = new Message(msg.getBytes(),messageProperties);
		System.out.println("topicSendMsg2{}"+  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
		rabbitTemplate.convertAndSend("test_topic_exchange_name1","test_topic_routing_name1",message);
		return "ok~";
	}

测试发现经过一分钟后消费者收到消息

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-24 15:37:49  更:2021-08-24 15:40:05 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 18:55:23-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码