目录
1、rabbitMQ
2、rabbitMQ的下载、安装、使用
3、使用场景
3.1 异步处理
3.2 应用解耦
3.3 流量削峰
3.4 定时任务
4、rabbitmq管理界面
5、六种队列
5.1 简单队列(hello world)
5.2 工作队列(work queue)
5.3 发布/订阅队列(publish/subscribe)
5.4 路由队列(routing)
5.5 通配符队列(Topics)
5.6 远程通信队列(RPC)
7、高级应用
7.1 ack模式
7.2 qos合理分发
7.3 ttl消息过期时间
7.4 死信队列
7.5 优先队列
1、rabbitMQ
rabbitmq是基于erlang语言,AMQP(Advanced Message Queuing Protocol)协议,实现的一款轻量级开源消息中间件
2、rabbitMQ的下载、安装、使用
erlang下载:https://www.erlang.org/downloads
最新版本和历史版本根据机器适配版本选择:
rabbitmq下载:https://www.rabbitmq.com/download.html
最新版本和历史版本同样根据适配的版本进行选择:
windows下使用:
找到安装的地址H:\rabbitmq\rabbitmq_server-3.9.2\sbin输入命令: rabbitmq-plugins enable rabbitmq_management
这样就启动了管理工具,浏览器访问:http://localhost:15672/
?默认账号/密码:guest / guest
3、使用场景
3.1 异步处理
场景:用户注册后会给用发送注册短信和邮箱
串行方式:用户信息添加到数据库,发送注册短信和邮箱后同步返回注册信息给用户
并行方式:用户信息添加到数据库,发送注册短信和邮箱采用并行方式,减少处理时间
?消息队列:引入消息队列,将不必要的发送短信和邮箱采用异步方式处理,大大减少处理时间
?如果保存到数据库是50ms,发送短信和邮箱各是50ms,消息队列是5ms,那串行是150ms,并行是100ms,而消息队列是55ms。
3.2 应用解耦
场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.
这种做法有一个缺点:
- 当库存系统出现故障时,订单就会失败
-
订单系统和库存系统高耦合. 引入消息队列
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
库存系统:订阅下单的消息,获取下单消息,进行库操作,就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失?
3.3 流量削峰
流量削峰一般在秒杀活动中应用广泛 场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。 作用: 1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^) 2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面. 2.秒杀业务根据消息队列中的请求信息,再做后续处理.
3.4 定时任务
场景:指定时间上线一个商品
手动上架:比较传统的是手动操作上线一个商品
定时器:自己在系统上写一个Timer(定时器),但Timer线程是不会捕获异常的,如果TimerTask抛出的了未检查异常则会导致Timer线程终止,同时Timer也不会重新恢复线程的执行,他会错误的认为整个Timer线程都会取消。同时,已经被安排单尚未执行的TimerTask也不会再执行了,新的任务也不能被调度。故如果TimerTask抛出未检查的异常,Timer将会产生无法预料的行为
消息队列:相较于定时器,使用消息队列可以达到应用解耦,便于管理,异常ack机制确认
4、rabbitmq管理界面
?用户:
1、超级管理员(administrator) 可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。 2、监控者(monitoring) 可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) 3、策略制定者(policymaker) 可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。 4、普通管理者(management) 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。 5、其他 无法登陆管理控制台,通常就是普通的生产者和消费者
队列:
message TTL:消息生存时间,超过设置时间自动删除
auto expire:队列生存时间,超过设置时间自动删除
overflow behaviour:队列溢出行为,可设置drop-head, reject-publish 或reject-publish-dlx,drop-head会丢弃最老的消息,reject-publish或reject-publish-dlx会丢弃最新的消息
dead letter exchange:死信交换机,如果消息被拒绝或过期,将发布到改交换机绑定的队列
dead letter routing key:死信交换机路由,由该交换机根据路由键发给队列
max length:最大消息容量,超过会从头部丢弃
max length bytes:最大消息容量字节数,超过会从头部丢弃
maximum priority:最大优先级,消息的优先级参数越靠近最大优先级,越早被消费
lazy mode:惰性模式,在磁盘上保留尽可能多的消息,以减少RAM的使用,否则是一个内存缓存,尽可能快的传递消息
master locetor:集群相关
?交换机:
default Exchange(默认交换机):是没有名字的direct exchange,name为空字符串,所有的queue都binding到该交换机,所有binding到该交换机的queue,routing都和queue 的name一样
direct Exchange(直接交换机):一个queue通过 routing=K?binding到该交换机,那交换机可以通过routing将消息传给queue
fanout Exchange(展开交换机):交换机会把消息发给所有binding 在该交换机的queue
headers Exchange(头部交换机,自定义交换机):交换机把消息传给一个(any)或全部(all)满足header 元素内容的queue,header元素匹配规则:可用hashMap封装,内容自定义
topic Exchange(主题交换机,通配符交换机):交换机把消息传给一个或多个满足通配符规则的routing-key的queue,这里的routing-key有通配符:#表示匹配多个单词,*表示匹配一个单词
5、六种队列
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>
rabbitmq client实现参考:https://blog.csdn.net/kavito/article/details/91403659
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring 封装实现,下面队列所用,两种方式实现效果相同,根据实际情况使用
5.1 简单队列(hello world)
P:生产者
P和C之间:队列
C:消费者
简单队列是在默认交换机下完成的消息传递,routing 和queue name是一样的
springboot pom.xml上引入amqp后,在application.yml或application.properties上配置:
#rabbit配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
queue队列配置
package com.example.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Configuration
public class QueueAndExchangeConfig {
@Bean("myFirstQueue")
public Queue getFirstQueue(){
return new Queue("my-first-queue");
}
}
生产者:
package com.example.demo.controller;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("producer")
public class ProducerController{
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/producerSendFirstQueue")
public String sendMsg(@RequestParam(value = "msg") String msg){
rabbitTemplate.convertAndSend("my-first-queue",msg);
return msg;
}
}
消费者:
package com.example.demo.config;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerHandler {
@RabbitListener(queues = "my-first-queue")
public void getFirstQueue(String msg){
System.out.println("消费者1:"+msg);
}
}
结果:
postman工具测试发送请求:
5.2 工作队列(work queue)
?代码跟上一个队列基本一样,不过工作队列有两个消费者:
package com.example.demo.config;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerHandler {
@RabbitListener(queues = "my-first-queue")
public void getFirstQueue(String msg){
System.out.println("消费者1:"+msg);
}
@RabbitListener(queues = "my-first-queue")
public void getFirstQueueTwo(String msg){
System.out.println("消费者2:"+msg);
}
}
我用postman请求了四次,可以看出工作队列是以公平方式,轮训发送消息到消费者的
?那到这里问题来了
1、在只有一个消费者的情况下,业务逻辑出现异常,但是队列中的消息被消费了,有什么好的解决办法吗?
2、工作队列中,消费者1处理业务比较耗时,而消费者2处理业务比较快速,怎么让消费者2处理更多消息?
具体可查看高级应用
5.3 发布/订阅队列(publish/subscribe)
发布、订阅队列是以fanout 广播方式发送消息给消费者的
?queue、exchange配置
package com.example.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueAndExchangeConfig {
//第一个队列:my-first-queue
@Bean("myFirstQueue")
public Queue getFirstQueue(){
return new Queue("my-first-queue");
}
//第二个队列:my-second-queue
@Bean("mySecondQueue")
public Queue getSecondQueue(){
return new Queue("my-second-queue");
}
//第一个交换机:my-first-exchange
@Bean("myFirstExchange")
FanoutExchange getMyFirstExchange(){
return new FanoutExchange("my-first-exchange");
}
//将my-first-queue队列绑定到my-first-exchange交换机上
@Bean
Binding bindingFirstQueueToFanoutExchange(){
return BindingBuilder.bind(getFirstQueue()).to(getMyFirstExchange());
}
//将my-second-queue队列绑定到my-first-exchange交换机上
@Bean
Binding bindingSecondQueueToFanoutExchange(){
return BindingBuilder.bind(getSecondQueue()).to(getMyFirstExchange());
}
}
生产者:
package com.example.demo.controller;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("producer")
public class ProducerController{
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/producerSendFirstExchange")
public String sendMsg(@RequestParam(value = "msg") String msg){
rabbitTemplate.convertAndSend("my-first-exchange","",msg);
return msg;
}
}
消费者:
package com.example.demo.config;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerHandler {
@RabbitListener(queues = "my-first-queue")
public void getFirstQueue(String msg){
System.out.println("消费者1:"+msg);
}
@RabbitListener(queues = "my-second-queue")
public void getSecondQueue(String msg){
System.out.println("消费者2:"+msg);
}
}
结果:
跟工作队列(work queue)做个比较,有什么不同和相同点:
不同:发布/订阅队列需要创建一个交换机并将队列绑定到交换机,而工作队列不需要
相同:多个消费者监听同一个队列,消息不会被重复消费
5.4 路由队列(routing)
路由队列是以direct? routing = key方式发送消息给消费者的
queue 可以通过一个或多个routing key绑定到Exchange上,不同queue相同routing ke绑定到Exchange也是完全可以的
?queue、exchange配置:
package com.example.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueAndExchangeConfig {
//第一个队列:my-first-queue
@Bean("myFirstQueue")
public Queue getFirstQueue(){
return new Queue("my-first-queue");
}
//第二个队列:my-second-queue
@Bean("mySecondQueue")
public Queue getSecondQueue(){
return new Queue("my-second-queue");
}
//第一个交换机:my-first-exchange
@Bean("myFirstExchange")
DirectExchange getMyFirstExchange(){
return new DirectExchange("my-first-exchange");
}
//将以my-first-queue队列绑定到my-first-exchange交换机上
@Bean
Binding bindingFirstQueueToFanoutExchange(){
return BindingBuilder.bind(getFirstQueue()).to(getMyFirstExchange()).with("orange");
}
//将以my-second-queue队列绑定到my-first-exchange交换机上
@Bean
Binding bindingSecondQueueToFanoutExchange(){
return BindingBuilder.bind(getSecondQueue()).to(getMyFirstExchange()).with("green");
}
}
生产者:
package com.example.demo.controller;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("producer")
public class ProducerController{
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/producerSendFirstQueue")
public String sendMsg(@RequestParam(value = "msg") String msg){
rabbitTemplate.convertAndSend("my-first-exchange","green",msg);
return msg;
}
}
消费者:
package com.example.demo.config;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerHandler {
@RabbitListener(queues = "my-first-queue")
public void getFirstQueue(String msg){
System.out.println("消费者1:"+msg);
}
@RabbitListener(queues = "my-second-queue")
public void getSecondQueue(String msg){
System.out.println("消费者2:"+msg);
}
}
结果:
可以看到my-second-queue会接收到消息被消费
如果将my-first-queue与my-second-queue的routing-key(路由键)都设置成一样的值,那消费者1和消费者2都会收到消息,一个queue如果有多个routing-key,任意一个routing-key都可以路由到queue中,然后被消费
5.5 通配符队列(Topics)
通配符队列这种模式是基于routing 模式上优化的一种队列,需要了解两种通配符规则:
*:匹配任意一个单词
#:匹配任意一个或多个单词
例子:
rabbit.*:匹配rabbit.first
rabbit.#:匹配rabbit.first 或者 rabbit.first.two
如果没有匹配的路由到队列,那么该消息会丢失
queue、exchange配置:
package com.example.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueAndExchangeConfig {
//第一个队列:my-first-queue
@Bean("myFirstQueue")
public Queue getFirstQueue(){
return new Queue("my-first-queue");
}
//第二个队列:my-second-queue
@Bean("mySecondQueue")
public Queue getSecondQueue(){
return new Queue("my-second-queue");
}
//第一个交换机:my-first-exchange
@Bean("myFirstExchange")
TopicExchange getMyFirstExchange(){
return new TopicExchange("my-first-exchange");
}
//将以my-first-queue队列绑定到my-first-exchange交换机上
@Bean
Binding bindingFirstQueueToFanoutExchange(){
return BindingBuilder.bind(getFirstQueue()).to(getMyFirstExchange()).with("green.*");
}
//将以my-second-queue队列绑定到my-first-exchange交换机上
@Bean
Binding bindingSecondQueueToFanoutExchange(){
return BindingBuilder.bind(getSecondQueue()).to(getMyFirstExchange()).with("green.#");
}
}
生产者:
@GetMapping("/producerSendFirstQueue")
public String sendMsg(@RequestParam(value = "msg") String msg){
rabbitTemplate.convertAndSend("my-first-exchange","green.red",msg);
return msg;
}
消费者还是监听my-first-queue和my-second-queue两个队列,这里就不帖了,得到结果如下:
?可以看到两个消费都匹配,并且路由到queue上被消费了
如果生产者的路由键routing-key改成green.red.blue,哪个队列会收到消息呢?
@GetMapping("/producerSendFirstQueue")
public String sendMsg(@RequestParam(value = "msg") String msg){
rabbitTemplate.convertAndSend("my-first-exchange","green.red.blue",msg);
return msg;
}
结果当然是my-second-queue收到消息然后消费了,通配符队列只要理清楚*和#的规则,逻辑就会很清晰了。
5.6 远程通信队列(RPC)
Remote Procedure Call:远程过程调用,一次远程过程调用的流程即客户端发送一个请求到服务,服务端根据请求信息进行处理后返回响应信息,客户端收到响应信息后结束
这种队列比较少用,而且有时间限制,在指定时间内收不到消息会抛异常,默认5s;
参考:https://juejin.cn/post/6844903863665819656
Exchange headers 交换机是自定义类型交换机,配置中可以指定任意类型的数据,不像routing只能是String类型的数据,需要注意的是匹配的数据分为全匹配(all)和匹配任意一种(any)
queue、exchange配置
package com.example.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueAndExchangeConfig {
//第一个队列:my-first-queue
@Bean("myFirstQueue")
public Queue getFirstQueue(){
return new Queue("my-first-queue");
}
//第二个队列:my-second-queue
@Bean("mySecondQueue")
public Queue getSecondQueue(){
return new Queue("my-second-queue");
}
//第一个交换机:my-first-exchange
@Bean("myFirstExchange")
HeadersExchange getMyFirstExchange(){
return new HeadersExchange("my-first-exchange");
}
@Bean
Binding bindingFirstQueueToFanoutExchange(){
Map<String,Object> map = new HashMap<String,Object>();
map.put("one","123");
map.put("two","456");
return BindingBuilder.bind(getFirstQueue()).to(getMyFirstExchange()).whereAny(map).match();
}
@Bean
Binding bindingSecondQueueToFanoutExchange(){
Map<String,Object> map = new HashMap<String,Object>();
map.put("one","123");
map.put("two","456");
return BindingBuilder.bind(getSecondQueue()).to(getMyFirstExchange()).whereAll(map).match();
}
}
生产者
@GetMapping("/producerSendFirstQueue")
public String sendMsg(@RequestParam(value = "msg") String msg){
//rabbitTemplate.convertAndSend("my-first-exchange","green.redblue",msg);
Map<String,Object> map = new HashMap<String,Object>();
map.put("one","123");
//map.put("two","456");
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
messageProperties.getHeaders().putAll(map);
Message message = new Message(msg.getBytes(),messageProperties);
rabbitTemplate.convertAndSend("my-first-exchange","",message);
return msg;
}
消费者还是my-first-queue和my-second-queue,这里不在帖代码,结果是
my-first-queue得到了消息?,而my-second-queue没有,这里将map.put("two","456");注释去掉,得到的结果是my-first-queue和my-second-queue都得到了消息并且消费了 ,注意两个队列绑定的方法whereAny()和whereAll()
7、高级应用
7.1 ack模式
#rabbitmq配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
#开启ack手动确认
listener:
simple:
acknowledge-mode: manual
direct:
acknowledge-mode: manual
ack确认模式:
none:rabbitmq的自动确认
manual:rabbitmq的手动确认,springboot中手动确认操作
auto:rabbitmq手动确认,springboot会自动发送回执(默认)
消费者
@RabbitListener(queues = "my-first-queue")
public void getFirstQueue(String msg,Channel channel,Message message) throws IOException {
System.out.println("消费者1:"+msg);
try {
/**
* deliveryTag:该消息的index
* boolean: true,批量ack小于deliveryTag的消息 false,确认当前消息已经被消费
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
e.printStackTrace();
//拒绝消息
/**
* deliveryTag:该消息的index
* boolean: true,重新入队,false,丢失消息
*/
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
/**
* deliveryTag:该消息的index
* boolean: true,批量ack小于deliveryTag的消息 false,确认当前消息
* boolean1: true,重新入队,false,丢失消息
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
/**
* boolean true,是否重新入队并将消息尽可能的投递给其他消费者,false,重新投递给自己
*/
channel.basicRecover(true);
}
}
7.2 qos合理分发
#rabbitmq配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
#开启ack手动确认
listener:
simple:
acknowledge-mode: manual
prefetch: 1 #qos 1 默认250
direct:
acknowledge-mode: manual
qos 主要应用于work queue工作队列的实现合理分发,将prefetch设置为1之后,rabbitmq会根据每个消费者的消费时间,合理分发,处理越快的消费者,处理的消息越多
7.3 ttl消息过期时间
ttl可以定义队列和消息的过期时间,如果同时设置了队列和消息的过期时间,按最小的时间删除消息,超过指定时间消息还没有被消费则会自动丢失
queue队列过期时间
@Bean("myFirstQueue")
public Queue getFirstQueue(){
return QueueBuilder.durable("my-first-queue").ttl(30000).build();
}
消息过期时间:
@GetMapping("/directExchangeSend")
public String directExchangeSend(@RequestParam(value = "msg") String msg){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setExpiration("5000");
messageProperties.setContentType("UTF-8");
Message message = new Message(msg.getBytes(),messageProperties);
rabbitTemplate.convertAndSend("my-second-exchange","second-routing",msg);
return msg;
}
7.4 死信队列
死信队列是面试中比较常问的一道考题,首先得明白什么叫死信队列,是如果工作的?
DLX,全程dead-letter-Exchange,又称死信交换机。当消息在一个队列变成死信(dead message)之后,它能重新被发送到另一个交换机中,这个交换机就是DLX,绑定到该交换机的队列就是死信队列
消息变成死信的原因:
1、消息被拒绝
2、消息过期
3、队列达到最大长度
queue、exchange配置
package com.example.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueAndExchangeConfig {
//第一个队列:my-first-queue
@Bean("myFirstQueue")
public Queue getFirstQueue(){
//return new Queue("my-first-queue");
return QueueBuilder.durable("my-first-queue").ttl(10000).deadLetterExchange("my-second-exchange").build();
}
//第二个队列:my-second-queue
@Bean("mySecondQueue")
public Queue getSecondQueue(){
return new Queue("my-second-queue");
}
//第一个交换机:my-first-exchange
@Bean("myFirstExchange")
DirectExchange getMyFirstExchange(){
return new DirectExchange("my-first-exchange");
}
@Bean("mySecondExchange")
DirectExchange getMySecondExchange(){
return new DirectExchange("my-second-exchange");
}
//将以my-first-queue队列绑定到my-first-exchange交换机上
@Bean
Binding bindingFirstQueueToFanoutExchange(){
return BindingBuilder.bind(getFirstQueue()).to(getMyFirstExchange()).with("green");
}
//将以my-second-queue队列绑定到my-first-exchange交换机上
@Bean
Binding bindingSecondQueueToFanoutExchange(){
return BindingBuilder.bind(getSecondQueue()).to(getMySecondExchange()).with("green");
}
}
两个队列queue,两个交换机Exchange,如果my-first-exchange绑定的my-first-queue 队列10s内还没有被消费就会被发送到my-second-exchange绑定的my-second-queue队列中消费
生产者
@GetMapping("/producerSendFirstQueue")
public String sendMsg(@RequestParam(value = "msg") String msg){
rabbitTemplate.convertAndSend("my-first-exchange","green",msg);
return msg;
}
消费者
消费者注释掉my-first-queue,只让my-second-queue做消费
/*@RabbitListener(queues = "my-first-queue")
public void getFirstQueue(String msg){
System.out.println("消费者1:"+msg);
}*/
@RabbitListener(queues = "my-second-queue")
public void getSecondQueue(String msg){
System.out.println("消费者2:"+msg);
}
结果
?10s后my-first-queue队列中的消息会被my-second-queue消费
7.5 优先队列
优先队列是通过x-max-priority参数来实现的,数值越大的优先被消费。如果消费的速度大于生产的速度,而队列中没有消息堆积,那优先级队列是没有意义的
queue、exchange配置
@Bean("myFirstQueue")
public Queue getFirstQueue(){
return QueueBuilder.durable("my-first-queue").maxPriority(10).build();
}
生产者
@GetMapping("/producerSendFirstQueue")
public String sendMsg(@RequestParam(value = "msg") String msg,int priority){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
messageProperties.setPriority(priority);
Message message = new Message(msg.getBytes(),messageProperties);
rabbitTemplate.convertAndSend("my-first-exchange","green",message);
return msg;
}
往my-first-queue插入hello1-5,优先级1-5的数据,先不做消费,重启并开启my-first-queue消费,查看结果
?
|