安装 RabbitMQ
请看另一篇文章教程:Linux 安装 RabbitMQ
强烈建议你使用虚拟机的方式安装 RabbitMQ,来模拟
如果为了方便也可以直接 Windows 安装 RabbitMQ,百度搜索:Windows 安装 RabbitMQ
初始化 SpringBoot 项目
我们使用Spring initalizr 初始化SpringBoot 项目,Spring initalizr
如果是 IDEA 旗舰版,有快捷创建的方式,点击新建即可创建,不需要这样。
导入依赖
pom.xml 文件中添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加配置
配置文件 application.yml
spring:
rabbitmq:
virtual-host: /
host: 192.168.43.171
port: 5672
username: admin
password: 123456
virtual-host 可以理解为每指定一个 virtual-host 就相当于设置了一个 RabbitMQ 服务器,不同的服务器是分离执行的,不同的 virtual-host 拥有的权限和其他配置也不同。可以在 RabbitMQ 后台界面的右上角可以看到有个 Virtual Host 标签,可以看到这个账号拥有的虚拟主机。
默认有个 guest 账号,账号名和密码都是 guest
进入 RabbitMQ 后台管理界面(比如我的是 http://192.168.43.171:15672/),可以看到用户和其虚拟主机,如果安装到了本地则是 http://127.0.0.1:15672/
添加配置类 RabbitMqConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;
@Configuration
public class RabbitMqConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
简单队列
RabbitMQ 的五种工作模式之最简单的:简单队列。
发送者
调用 send() 方法发送消息到名为 hello 的队列中。
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender: 发送消息 " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}
输入完你可能会注意到上面的 rabbitTemplate 会有个 Could not autowire. No beans of 'AmqpTemplate' type found. 的报红波浪线提示,不用理会,不影响可以正常运行。
消费者
通过@RabbitListener 注解定义该类对hello 队列的监听,并用@RabbitHandler 注解来指定对消息的处理方法。
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class Receiver {
@RabbitHandler
public void process(String s) {
System.out.println("Receiver: 消费成功" + s);
}
}
注意到上面的 RabbitListener 注解,其中 queuesToDeclare 它可以在队列存在的时候自动创建队列,不会出现 reply-code=404, reply-text=NOT_FOUND - no exchange 'XXX' in vhost '/', class-id=50, method-id= 的异常。
测试类
测试的 Controller
import com.example.rabbittest.component.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class TestController {
@Autowired
private Sender sender;
@GetMapping("/")
@ResponseBody
public String testSend() {
sender.send();
return "ok";
}
}
单元测试类
在 test 包下的测试类中写入如下代码,我的测试类名为下面的 RabbitTestApplicationTests
import com.example.rabbittest.component.Sender;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class RabbitTestApplicationTests {
@Autowired
private Sender sender;
@Test
void contextLoads() {
sender.send();
}
}
但是不建议使用单元测试类对此进行测试,否则会有消息接收不到等问题,总之不建议使用(测试时找了好久这个问题o(╥﹏╥)o)
运行
运行 SpringBoot 项目,运行后打开浏览器进入 Rabbit 后台管理,进入 Connections 标签页,可以看到如下连接的信息
如果运行失败,要注意配置的 IP 地址. 端口和账号密码都要保持正确,检查一下,如果报错信息有 timeout 等信息,有可能是配置的 ip 有问题或者 RabbitMQ 服务没有开启
然后我们使用浏览器访问 localhost:8080 或者执行测试类中的 contextLoads 方法发送一条消息
可以看到消息被消费成功
进入 RabbitMQ 后台查看一下,可以看到有一个消息波动(5 秒钟刷新一次,修改刷新速度在右上角的位置)
我们创建的队列 hello
路由(交换机)
在讲交换机之前,我们再创建一个 world 队列,先看看不使用交换机时的情况。
在 RabbitMqConfig 类里添加如下代码
@Bean
public Queue worldQueue() {
return new Queue("world");
}
新建一个 WorldReceiver 消费者类
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queuesToDeclare = @Queue("world"))
public class WorldReceiver {
@RabbitHandler
public void process(String s) {
System.out.println("Receiver: 消费成功 " + s);
}
}
新建一个 WorldSender 消息发送者类
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class WorldSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "world " + new Date();
System.out.println("WorldSender: 发送消息 " + context);
this.rabbitTemplate.convertAndSend("world", context);
}
}
测试用的 Controller 修改为
@Controller
public class TestController {
@Autowired
private Sender sender;
@Autowired
private WorldSender worldSender;
@GetMapping("/")
@ResponseBody
public String testSend() {
sender.send();
worldSender.send();
return "ok";
}
}
重新运行 Application,然后再访问 localhost:8080,再去看看 RabbitMQ 后台的 Overview 和 Queues 标签页看看有什么变化。
(我们的队列现在看起来类似这样的)
这就就可以让两个消息通过不同队列到达消费者。如果有特别多的队列的话,还要指定不同的队列,那岂不是太麻烦了,我让他根据发送的消息自动的控制发送到不同的队列该怎么做?
傻瓜式:使用 if 判断。
但这样死板,甚至会很麻烦,项目大了可能会有很复杂的规则,使用 if 判断将会非常笨重且麻烦了,且不好维护。
这时交换机就派出用场,交换机用来处理以太网数据帧(包) 达到交换转发的目的,用来交换消息。
有几种类型,下面是最常用的两种:
后面将对他们进行讲解
创建一个交换机(Exchange)
在 RabbitMqConfig 类中添加如下代码
@Bean
public DirectExchange exchange() {
return new DirectExchange("testexchange");
}
但这只是创建了,还未绑定交换机上,类如下图所示,只是加入了一个交换机
交换机与队列的绑定
在 RabbitMqConfig 类中添加以下代码,将路由键(routingKey )绑定到不同的队列上
@Bean
public Binding bindingExchangeHello(Queue helloQueue, DirectExchange exchange) {
return BindingBuilder.bind(helloQueue).to(exchange).with("red");
}
@Bean
public Binding bindingExchangeWorld(Queue worldQueue, DirectExchange exchange) {
return BindingBuilder.bind(worldQueue).to(exchange).with("blue");
}
@Bean
public Binding bindingExchangeWorld2(Queue worldQueue, DirectExchange exchange) {
return BindingBuilder.bind(worldQueue).to(exchange).with("yellow");
}
这里绑定的分别是:
red > hello队列(hello )blue > world队列(world )yellow > world队列(world )
(绑定之后的效果)
然后我们就可以将 WorldSender 类删除,对 Sender 类的 send 的方法进行修改一下,让他一次发送多个消息。
public void send() {
String[] keys = {"red", "yellow", "blue"};
for (int i = 0; i <= 9; i++) {
int random = (int) (Math.random() * 3);
String key = keys[random];
System.out.printf("第 %d 个,发送消息:%s%n", i, key);
this.rabbitTemplate.convertAndSend(
"testexchange",
key,
String.format("%s (第 %d 个内容)", key, i)
);
}
}
rabbitTemplate.convertAndSend 参数:
测试的 Controller 重新改为如下
@Controller
public class TestController {
@Autowired
private Sender sender;
@GetMapping("/")
@ResponseBody
public String testSend() {
sender.send();
return "ok";
}
}
访问 localhost:8080 测试一下
可以看到输出消息中,hello 和 world 的消费成功的内容是符合绑定的名称的。这样就不必考虑你要发送到哪个队列里了,他会自动的查找符合逻辑的队列,自动转发到这个队列里。
可以理解为:DirectExchange 类型的交换机通过绑定的名称来查找不同的队列
Topic Exchange
topic 类型常用的是通配符类型,这种类型的交换机用起来更加的灵活,相比于 direct ,能够适配更多的类型。
还是按照上面的例子,我们稍作修改,将原有的交换机注释掉,我们新建一个统配型交换机:mytopic 。
将之前的 RabbitMqConfig 里的 exchange 方法改为如下代码,之前的 direct 版本的全注释掉,你也可以直接删除。
@Bean
public TopicExchange exchange(){
return new TopicExchange("mytopic");
}
@Bean
public Binding bindingExchangeHello(Queue helloQueue, TopicExchange exchange) {
return BindingBuilder.bind(helloQueue).to(exchange).with("red.#");
}
@Bean
public Binding bindingExchangeWorld(Queue worldQueue, TopicExchange exchange) {
return BindingBuilder.bind(worldQueue).to(exchange).with("blue.blue");
}
给通配型交换机绑定规则
两个 . 之间就是一个词
根据这个规则我们可以知道我们的绑定的名称的规则:
red.* 可以通配 red.blue ,不可以通配 red.blue.yellow red.# 上面两种情况都是可以通配的。
worldQueue 绑定则没有上面两个符号,则是:必须是red.blue 才可以通过
修改 Sender 类的 send 方法
public void send() {
String[] keys = {"red", "yellow", "blue"};
for (int i = 0; i <= 9; i++) {
int random = (int) (Math.random() * 3);
String key = keys[random] + "." + keys[random];
System.out.printf("第 %d 个,发送消息:%s%n", i, key);
this.rabbitTemplate.convertAndSend(
"mytopic",
key,
String.format("%s (第 %d 个内容)", key, i)
);
}
}
测试
访问 localhost:8080 查看输出,可以看到只有第 7 和第 8 个是符合规则的,hello 队列需要是名称第一个词为 red 的消息,world 队列需要是跟 blue.blue 一模一样名称的消息才符合。
可以理解为:TopicExchange 类型的交换机通过绑定时的规则来查找不同的队列
Headers Exchange 自定义规则
相较于上面两种类型。基本上来说,消息的 rouding_key 都是一个固定的值,而 handers 则是一种自定义规则匹配。这种模式使用最少,很少用,这里只作为知道即可。
Fanout Exchange 订阅模式
订阅模式就是我们熟悉的广播模式,可能学过计算机基础的都知道,在局域网内通过 ARP 获取目标地址的 MAC 地址,就是用的是广播,把这个包广播出去,所有的队列都会收到。
只要绑定了订阅交换机的所有队列都会收到发过来的包。
创建订阅交换机
注释掉或删除掉之前的 exchange bindingExchangeHello bindingExchangeWorld 方法,改为以下方法:
@Bean
public FanoutExchange exchange(){
return new FanoutExchange("myfanout");
}
@Bean
public Binding bindingExchangeHello(Queue helloQueue, FanoutExchange exchange) {
return BindingBuilder.bind(helloQueue).to(exchange);
}
@Bean
public Binding bindingExchangeWorld(Queue worldQueue, FanoutExchange exchange) {
return BindingBuilder.bind(worldQueue).to(exchange);
}
将 Sender 方法里的 mytopic 改为 myfanout ,如下:
this.rabbitTemplate.convertAndSend(
"myfanout",
key,
String.format("%s (第 %d 个内容)", key, i)
);
重新启动,然后访问 localhost:8080
可以看到后面的序号都有两个,我们可以发现:就算带了路由键 rouding_key 只要经过订阅交换机,它就会把所有经过的消息转发到所有与自己绑定的队列里面。
可以看一下几个类型的示意图
为什么叫“消息队列”,看上面的图就可以知道,每个箭头就是不断发送的消息,然后消息一个个“排队”发出去,也许不是按照发送时的顺序接收到消息,但是肯定都是有先后顺序的,所以叫它“消息队列”。
消失丢失
上面写会存在一个问题:消息会丢失。
我们首先了解下一条消息从生产到消费的整个流程如下:
生产 > MQ Broker > 消费。所以这三个环节都有丢失消息的可能。
发送者将数据发送到rabbitmq的时候,可能因为网络问题导致数据就在半路给搞丢了。
1.使用事务(性能差)
RabbitMQ 客户端中与事务机制相关的方法有三个: channel.txSelect 、channel.txCommit 和 channel.txRollback 。channel.txSelect 用于将当前的信道设置成事务模式,channel.txCommit 用于提交事务,channel.txRollback 用于事务回滚。在通过 channel.txSelect 方法开启事务之后,我们便可以发布消息给 RabbitMQ 了,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback 方法来实现事务回滚。注意这里的 RabbitMQ 中的事务机制与大多数数据库中的事务概念并不相同,需要注意区分。
事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息成功被RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制会“吸干”RabbitMQ 的性能。
2.发送回执确认(推荐)
发送者将信道设置成 confirm (确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack )给发送者(包含消息的唯一 ID ),这就使得发送者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ 回传给发送者的确认消息中的 deliveryTag 包含了确认消息的序号,此外 RabbitMQ 也可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理,注意辨别这里的确认和消费时候的确认之间的异同。
(Confirm处理流程)
注意要点:
- 事务机制和
publisher confirm 机制两者是互斥的,不能共存。 - 事务机制和
publisher confirm 机制确保的是消息能够正确地发送至 RabbitMQ,这里的“发送至 RabbitMQ”的含义是指消息被正确地发往至 RabbitMQ 的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。
开启RabbitMQ的数据持久化
为了防止 rabbitmq 自己弄丢了数据,这个你必须开启rabbitmq的持久化,就是消息写入之后会持久化到磁盘,哪怕是 rabbitmq 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,rabbitmq 还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。
设置持久化有两个步骤,第一个是创建queue 的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue 的元数据,但是不会持久化queue 里的数据;第二个是发送消息的时候将消息的deliveryMode 设置为2 ,就是将消息设置为持久化的,此时rabbitmq就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,rabbitmq哪怕是挂了,再次重启,也会从磁盘上重启恢复queue ,恢复这个queue 里的数据。
参考
|