rebbitmq延迟消息示例
官方插件仅支持 >= 3.6x版本中支持。
官方推荐插件 rabbitmq-delayed-message-exchange。
官方文档:guthub,Rabbitmq插件列表。
安装
rabbitmq并未内置该插件,需要手动下载安装。已安装的插件可通过rabbitmq-plugins list查看。 下载后解压,将其拷贝至rabbitmq服务目录
(Linux Debian/RPM)/usr/lib/rabbitmq/plugins
(windows和其他系统)安装目录\rabbitmq_server-version\plugins
启用插件
cmd 安装目录\sbin 查看已安装列表 输入:
rabbitmq-plugins list
启用插件 输入:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
机制
安装插件后会生成新的Exchange类型x-delayed-message,支持延迟投递机制,接收到消息后不立即将消息投递至目标队列中,而是存储在mnesia(分布式数据系统)表中,检测消息延迟时间,达到可投递时间时将其通过x-delayed-type类型标记的交换机类型投递至目标队列。
java使用过程
声明x-delayed-message消息交换机
-
rabbitmq java client 实现 Map<String,Object> args = new HashMap<String,Object>();
args.put("x-delayed-type","direct");
channel.exchangeDeclare("my-exchange","x-delayed-message",true,false,args);
-
spring rabbitmq template 实现 Map<String,Object> aregs = new Hash<String,Object>();
ares.put("x-delayed-type","direct");
Exchange exchange = new CustomExchange("test.exchange","x-delayed-message",true,false,args) ;
admin.declareExchange(exchange);
消息发送
-
rabbitmq java client 实现 byte[] messageBodyBytes = "delayed pyload".getBytes("UTF-8");
Map<String,Object> headers = new HashMap<String,Object>();
headers.put("x-delay",5000);
AMQP.BesiProperties.Builder pros = new AMQP.BasiProperties.Builder().hearders(heards);
channel.basicPublish("test.exchange","test",props.build(),messageBobyBytes);
-
spring rabbitmq template 实现 MessageProperties properties = new MessageProperties();
properties.setHearder("x-delay",1000);
template.converAndSend("test.exchage","test",new Message(body,properties));
|