远赴人间惊鸿宴,老君山上吃泡面
Spring 整合RabbitMQ
1、创建工程spring-rabbitmq-producer和spring-rabbitmq-consumer,引入依赖和插件
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
2、生产者端创建?spring-rabbitmq.xml?整合配置文件,内容如下
指定要连接的MQ服务器,因为是简单模式,就只是创建队列(指定队列名)就好了
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="192.168.2.108"
port="5672"
username="admin"
password="123456"
virtual-host="MyVirtualHost"/>
<!--定义admin组件管理(创建)交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
默认交换机类型为direct,名字为:"",路由键为队列的名称
-->
<!--简单模式,没有交换机,1个队列
name="队列的名字"
id: 队列bean对象的唯一表示
-->
<rabbit:queue id="spring_simple_queue" name="spring_simple_queue" durable="true"
auto-delete="false" auto-declare="true" exclusive="false"></rabbit:queue>
<!--定义rabbitTemplate对象(模板对象,封装了大量的API操作)操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
生产端创建一个测试类:
convert :直译为“转换”
convertAndSend()方法的3个参数分别指:
参数1:交换机名称?
参数2:路由键名(广播设置为空)?
参数3:发送的消息内容
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducerTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void t1() {
String message = "简单模式: hello spring rabbitmq";
//简单模式 路由键与队列同名
rabbitTemplate.convertAndSend("", "spring_simple_queue", message);
}
}
运行生产端测试方法t1,在rabbitMq管理控制台中可以看到
消费端创建spring-rabbitmq.xml?
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="192.168.2.108"
port="5672"
username="admin"
password="123456"
virtual-host="MyVirtualHost"/>
<bean id="springQueueListener" class="com.atguigu.rabbitmq.listener.SpringQueueListener"/>
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_simple_queue"/>
</rabbit:listener-container>
</beans>
看的出来消费端出现一个很重要的东西?队列监听器,
干什么作用的呢,就是在消费端在消费某一个队列的消息时会和一个监听器匹配,消息的内容可由该监听器拿到
public class SpringQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
System.out.printf("接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
message.getMessageProperties().getConsumerQueue(),
msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费端创建测试类:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:?spring-rabbitmq.xml")
public class ConsumerTest {
@Test
public void test1(){
boolean flag = true;
//作为消费方,服务不停
while (flag){
}
}
}
?消费端接收到消息,控制台输出
工作模式
生产者端在spring-rabbitmq.xml 加入内容
<!--工作模式 1个队列-->
<rabbit:queue id="spring_work_queue" name="spring_work_queue" durable="true"
auto-delete="false" auto-declare="true" exclusive="false"></rabbit:queue>
在测试类ProducerTest里加测试方法。
convertAndSend()方法的3个参数分别指:
/**
* 参数1:交换机名称
* 参数2:路由键名(广播设置为空)
* 参数3:发送的消息内容
*/
@Test
public void t2() {
for (int i = 0; i < 10; i++) {
String message = "工作模式:hello spring rabbitmq---"+i;
rabbitTemplate.convertAndSend("", "spring_work_queue", message);
}
}
在消费者端就更简单了?,只需要在spring-rabbitmq.xml中配置监听器和队列名的绑定关系。在工作模式里,就是多个消费者监听同一个队列,配置如下:
测试结果
剩下的3种模式就一起说了吧
convertAndSend()方法的3个参数分别指: /** * 参数1:交换机名称 * 参数2:路由键名(广播设置为空) * 参数3:发送的消息内容 */
由于这个方法需要的参数,我们可以推论出在,这三种模式下都是需要指定交换机的名字,类型。并且队列和交换机之间是有不同的绑定关系的。定向交换机(路由模式)就用了key作为路由key。而通配符交换机使用pattern作为路由key。
<!--广播交换机 fanout 一个交换机两个队列 队列和交换机之间的关系 : 绑定 -->
<rabbit:queue id="spring_fanout_queue1" name="spring_fanout_queue1" durable="true" auto-delete="false" auto-declare="true" exclusive="false"></rabbit:queue>
<rabbit:queue id="spring_fanout_queue2" name="spring_fanout_queue2" durable="true" auto-delete="false" auto-declare="true" exclusive="false"></rabbit:queue>
<rabbit:fanout-exchange name="spring_fanout_ex" id="fanoutEx" auto-declare="true" auto-delete="false" durable="true">
<!--绑定-->
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue1"></rabbit:binding>
<rabbit:binding queue="spring_fanout_queue2"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!--定向交换机 direct 一个交换机 两个队列 有绑定关系,且指定绑定条件 routingkey-->
<rabbit:queue id="spring_direct_queue1" name="spring_direct_queue1" durable="true" auto-delete="false" auto-declare="true" exclusive="false"></rabbit:queue>
<rabbit:queue id="spring_direct_queue2" name="spring_direct_queue2" durable="true" auto-delete="false" auto-declare="true" exclusive="false"></rabbit:queue>
<rabbit:direct-exchange name="spring_direct_ex" durable="true" auto-declare="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="spring_direct_queue1" key="springmq"></rabbit:binding>
<rabbit:binding queue="spring_direct_queue2" key="springmq123"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- topic 通配符交换机 一个交换机两个队列 实现绑定关系 且 指定条件 注意: 条件支持通配符 # *-->
<rabbit:queue id="spring_topic_queue1" name="spring_topic_queue1" durable="true" auto-delete="false" auto-declare="true" exclusive="false"></rabbit:queue>
<rabbit:queue id="spring_topic_queue2" name="spring_topic_queue2" durable="true" auto-delete="false" auto-declare="true" exclusive="false"></rabbit:queue>
<rabbit:topic-exchange name="spring_topic_ex" durable="true" auto-declare="true" auto-delete="false">
<rabbit:bindings>
<!--pattern 路由key -->
<rabbit:binding pattern="topic.*" queue="spring_topic_queue1"></rabbit:binding>
<rabbit:binding pattern="topic.#" queue="spring_topic_queue2"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
对应的测试方法里
@Test
public void t3() {
String message = "发布订阅模式(广播): hello spring rabbitmq";
/**
* 参数1:交换机名称
* 参数2:路由键名(广播设置为空)
* 参数3:发送的消息内容
*/
rabbitTemplate.convertAndSend("spring_fanout_ex", "", message);
}
@Test
public void t4() {
String message = "路由模式: hello spring rabbitmq";
rabbitTemplate.convertAndSend("spring_direct_ex", "springmq", message);
}
@Test
public void t5() {
/**
* 参数1:交换机名称
* 参数2:路由键名
* 参数3:发送的消息内容
*/
String message = "通配符模式: hello spring rabbitmq";
rabbitTemplate.convertAndSend("spring_topic_ex", "topic.a", message);
}
在消费端,就不写了。累了,毁灭吧
SpringBoot提供了快速整合RabbitMQ的方式
创建工程producer-springboot并引入依赖
<!--
1. 父工程依赖
-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
</parent>
<dependencies>
<!--2. rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
创建配置文件application.yml、启动类ProducerApplication
spring:
rabbitmq:
host: 192.168.2.108
port: 5672
username: admin
password: 123456
virtual-host: MyVirtualHost
创建配置类RabbitMQConfig
bindQueueExchange方法参数入参:默认使用autoWired装配。(先类型再名字)
但是队列不可能只有一个,其实只要注意,还是可以不用@Qualifier的。
基本信息在yml中配置,队列交换机以及绑定关系在配置类中使用Bean的方式配置
@SpringBootConfiguration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_ex";
public static final String QUEUE_NAME = "boot_queue";
// 1 交换机
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2.Queue 队列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//3. 队列和交互机绑定关系 Binding
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,
@Qualifier("bootExchange") Exchange exchange){
//noargs():表示不指定参数
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
生产者启动,投递消息
生产端直接注入RabbitTemplate完成消息发送
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.boot","mq hello");
}
}
创建消费者工程consumber-springboot,引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
</parent>
<dependencies>
<!--RabbitMQ 启动依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
创建application.yml和启动类ConsumerApplication
spring:
rabbitmq:
host: 192.168.2.108
port: 5672
username: admin
password: 123456
virtual-host: MyVirtualHost
创建监听器,就可以自动监听到队列“boot_queue”
消费端直接使用@RabbitListener完成消息接收
@Component
public class MyListener {
//监听指定队列,队列数据一旦获取,把数据赋值给message
@RabbitListener(queues = "boot_queue")
public void myMessage(Message message){
System.out.println(new String(message.getBody()));
}
}
启动主程序类,控制台就可以调用我们自定义的myMessage方法,并拿到该队列中的消息了
|