1、搭建初始环境
1.1 引入相关依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.2 配置配置文件
spring:
rabbitmq:
host: 192.168.242.138
port: 5672
username: admin
password: 123
virtual-host: /
1.3 SpringBoot对RabbitMQ的自动配置
- ConnectionFactory:自动配置了连接工厂
- RabbitProperties:封装了RabbitMQ的配置
- RabbitTemple:给RabbitMQ发送和接收消息
- AmqpAdmin:RabbitMQ系统管理功能组件,创建和删除 Queue、Exchange、Binding
- @EnableRabbit + @RabbitListener 监听消息队列的内容
2.HelloWorld模型的使用
HelloWorld模型也就是点对点模型,生产者发消息到队列,消费者从队列中取消息
2.1 生产者代码
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testHello(){
rabbitTemplate.convertAndSend("hello","hello world");
}
2.2 消费者代码
@Component
@RabbitListener(queuesToDeclare = {@Queue("hello")})
public class HelloCustomer {
@RabbitHandler
public void receive(String message){
System.out.println("message="+message);
}
}
3.工作队列模型的使用
工作队列就是生产者向队列发送消息,队列会把消息平均分发给不同的消费者,不同消费者是竞争关系。同一条消息不会被两个消费者消费。
3.1 生产者代码
@Test
public void testWorkQueue() {
for (int i = 1; i < 11; i++) {
rabbitTemplate.convertAndSend("work", "work模型"+i);
}
}
3.2 消费者代码
@Component
public class WorkConsumer {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message) {
System.out.println("message1=" + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message) {
System.out.println("message2=" + message);
}
}
输出结果:
message2=work模型1 message1=work模型2 message2=work模型3 message1=work模型4 message2=work模型5 message1=work模型6 message2=work模型7 message1=work模型8 message2=work模型9 message1=work模型10
这两种模型都是不需要指定交换机的,使用的是默认交换机,路由键直接指定队列名,就可以将消息发送到指定队列
4.发布订阅模型的使用
发布订阅模型需要使用fanout类型的交换机,所以也叫作扇出模型或者是广播模型。这种模型是将队列绑定到交换机上,生产者发布消息到交换机,交换机会将消息发送给所有与他绑定的队列。
4.1 生产者代码
@Test
public void testFanout(){
rabbitTemplate.convertAndSend("logs","","fanout模型");
}
4.2 消费者代码
@Component
public class FanoutConsumer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "logs", type = "fanout")
)
})
public void receive1(String message) {
System.out.println("message1=" + message);
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "logs", type = "fanout")
)
})
public void receive2(String message) {
System.out.println("message2=" + message);
}
}
输出结果
message2=fanout模型 message1=fanout模型
5.路由模型的使用
在使用fanout模型时,我们在绑定queue与exchange中,忽略了路由键。而在使用路由模型时,我们需要指定路由键。队列与交换机绑定的时候会声明一个routingKey,生产者在发送消息给交换机时需要指定一个路由键,当routingKey与路由键相同时,交换机会把消息转发个这个队列。
5.1 生产者代码
@Test
public void testDirect(){
rabbitTemplate.convertAndSend("directs","info","发送info的key的路由信息");
}
5.2 消费者代码
@Component
public class DirectConsumer {
@RabbitListener(
bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs", type = "direct"),
key = {"info", "error", "warn"}
)
)
public void receive1(String message) {
System.out.println("message1=" + message);
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs", type = "direct"),
key = {"error"}
)
)
public void receive2(String message) {
System.out.println("message2=" + message);
}
}
输出结果
message1=发送info的key的路由信息
6.主题模型的使用
主题模型是在路由模型的基础上加入了通配符的效果。
*:(星号)可以正好代替一个词。 #:(hash) 可以代替零个或多个单词。
5.1 生产者代码
@Test
public void testTopic(){
rabbitTemplate.convertAndSend("topics","user.save","user.save的路由信息");
}
5.2 消费者代码
@Component
public class TopicConsumer {
@RabbitListener(bindings =
@QueueBinding(value = @Queue,
exchange = @Exchange(type = "topic", name = "topics"),
key = {"user.save","user.*"})
)
public void receive1(String message) {
System.out.println("message1="+message);
}
@RabbitListener(bindings =
@QueueBinding(value = @Queue,
exchange = @Exchange(type = "topic", name = "topics"),
key = {"order.#","produce.#","user.*"})
)
public void receive2(String message) {
System.out.println("message2="+message);
}
}
输出结果
message2=user.save的路由信息 message1=user.save的路由信息
7. @RabbitListener注解
这个注解是用来接收队列的消息。有三个参数需要重点关注
- queues
- queuesToDeclare
- bindings
参数1:返回值是字符串类型,传入一个需要监听的队列名称。使用这个参数需要提前声明队列,如果不存在此队列将会报错。 参数2:返回值是一个@Queue的注解,用来声明一个需要监听的队列,如果不存在此队列会先创建好这个队列 参数3:返回值是一个@QueueBinding的注解。用来声明队列和交换机以及它们之间的绑定关系。
@Queue:具有声明一个队列所需要的所有参数,如果不传参,就是指定一个临时队列。 @QueueBinding中的key参数,用来指定路由键,可以声明多个。
|