RabbitMQ介绍及使用
对rabbitMQ的介绍及简单使用(整合SpringBoot),暂不包括高级特性
1 概述
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
1.1 优势与劣势
作用:
- 解耦,不同服务之间的通信都只需要交给mq就可以了
- 削峰填谷,当用户峰值高时不会导致服务压力过大而宕机,mq可以限制消息流通的速度
缺点:
-
系统可用性降低 系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用? -
系统复杂度提高 MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性? -
一致性问题 A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?
1.2 AMQP 和 JMS
AMQP
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,遵循此协议,不收客户端和中间件产品和开发语言限制。2006年,AMQP 规范发布。类比HTTP。
JMS
JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API
JMS 是 JavaEE 规范中的一种,类比JDBC
很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,不过开源社区有
AMQP 与 JMS 区别
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模式;而AMQP的消息模式更加丰富
1.3 架构
RabbitMQ 基础架构如下图:
RabbitMQ 中的相关概念:
- Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
- Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
- Connection:publisher/consumer 和 broker 之间的 TCP 连接
- Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- Queue:消息最终被送到这里等待 consumer 取走
- Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍);
官网对应模式介绍: https://www.rabbitmq.com/getstarted.html
2 简单模式与工作模式
先实现一个简单模式案例:
2.1 安装mq
教程很多,暂不列举。安装后建议手动开启控制台,便于查看。
创建maven项目,导下依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
2.2 连接mq的工具类
public class ConnectionUtils {
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
return connection;
}
}
2.3 生产者
public class Producer {
static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "你好:qtds";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已发送消息:" + message);
channel.close();
connection.close();
}
}
2.4 消费者
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body, StandardCharsets.UTF_8));
}
};
channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
}
}
2.5 工作队列模式
只是多了一个消费端,另外加个消费端的类(Consumer2),和Consumer1一样就可以了
工作模式的消费者是公平接收的,也就是说有多个消费端的情况下,他们会交替消费,像下图这样:
3 订阅、路由、通配符模式
这些模式都是在之前模式的基础上添加了交换机exchange,并分配多个队列给消费端:
3.1 订阅模式
生产者需要声明交换机和队列,并将对应的队列绑定至交换机:
public class Producer {
public static String FANOUT_EXCHANGE = "fanout_exchange";
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");
channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");
for (int i = 0; i < 10; i++) {
String message = "你好:qtds ----" + i;
channel.basicPublish(FANOUT_EXCHANGE, "", null, message.getBytes());
System.out.println("已发送消息:" + message);
}
channel.close();
connection.close();
}
}
消费者在声明队列后同样要绑定对应的交换机:
channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);
channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE, "");
这样消息将会通过交换机发送消息到所有对应的队列,每个队列都会接收到一样的消息,也就是经典的消息发布/订阅模式!
3.2 路由模式
在有交换机的基础上,我们可以另外指定路由key,将不同的消息分开发送到队列中:
public class Producer {
public static String DIRECT_EXCHANGE = "direct_exchange";
static final String DIRECT_QUEUE_1 = "direct_queue_1";
static final String DIRECT_QUEUE_2 = "direct_queue_2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DIRECT_QUEUE_1, true, false, false, null);
channel.queueDeclare(DIRECT_QUEUE_2, true, false, false, null);
channel.queueBind(DIRECT_QUEUE_1, DIRECT_EXCHANGE, "insert");
channel.queueBind(DIRECT_QUEUE_1, DIRECT_EXCHANGE, "update");
channel.queueBind(DIRECT_QUEUE_2, DIRECT_EXCHANGE, "remove");
String message = "你好:qtds ----" + i;
channel.basicPublish(DIRECT_EXCHANGE, "insert", null, message.getBytes());
channel.close();
connection.close();
}
}
消费者绑定交换机同样指定好路由key:
channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");
3.3 通配符模式Topics
指定路由key时,可以使用通配符:
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
# :匹配一个或多个词
* :匹配不多不少恰好1个词
举例:
item.# :能够匹配 item.insert.abc 或者 item.insert
item.* :只能匹配 item.insert
4 SpringBoot整合
以topics模式为案例
4.1 生产者搭建
先搭个springBoot模块,导下依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.1.0.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rabbitmq-producer</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
配置文件:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /qtds
username: guest
password: guest
配置类:
package com.qtds.rabbitmq.config;
@Configuration
public class RabbitMQConfig {
public static final String ITEM_TOPIC_EXCHANGE = "springboot_item_topic_exchange";
public static final String ITEM_QUEUE = "springboot_item_queue";
@Bean("itemTopicExchange")
public Exchange itemTopicExchange() {
return ExchangeBuilder
.topicExchange(ITEM_TOPIC_EXCHANGE)
.durable(true)
.build();
}
@Bean("itemQueue")
public Queue itemQueue() {
return QueueBuilder.durable(ITEM_QUEUE).build();
}
@Bean
public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
@Qualifier("itemTopicExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
}
}
启动类:
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
测试类:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.insert", "商品新增,routing key 为item.insert");
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.update", "商品修改,routing key 为item.update");
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.delete", "商品删除,routing key 为item.delete");
}
}
运行测试类后,就可以在mp的控制台中看到对应的路由和队列了,消息也发送在队列中了
4.2 消费者
消费者也是同样方式创建一个springboot项目,导入相关依赖,编辑配置类。
然后配置一个消息监听器,绑定队列就能接收数据了;
@Component
public class MyListener {
@RabbitListener(queues = "springboot_item_queue")
public void myListener(String message) {
System.out.println("接收到的消息为:" + message);
}
}
启动类:
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
创建写个测试类测试一下:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ConsumerApplication.class)
public class ConsumerTest {
@Test
public void test() {
while (true) {
}
}
}
可以看到已经接收到我们生产者刚消费的消息了;
|