RabbitMQ
对于中间件的学习,我们主要学习它的设计思想和使用方式,并且要学会在不同情况下,灵活使用中间件解决业务问题
不要过分纠结于中间件的底层实现
一、MQ 的优势
为什么需要使用消息队列?
我们先来看一下传统的远程调用和使用了消息队列后的区别:
可以看到,使用消息队列之后,可以再调用者服务器之间形成一个缓冲,减少网络问题带来的影响
上面只是一个粗略的消息队列的优势,其优势还有下面这些:
1、任务解耦
2、任务异步处理
可能有读者不明白,这所谓的异步处理,是什么个实现原理,这里我要稍微解释一下
原本的订单系统,是要等库存、支付、物流这三个子系统真正的处理完毕了,才会给用户以响应,但是这三个子系统每个耗时都不小,等全部处理完了,用户等的花都谢了
而是用消息队列的话,我们可以不等子系统处理完毕,而是先给用户一个处理结束的返回,让用户以为已经结束了,事实上,库存、支付、物流这三个子系统,还在服务器上自己运行
所以说,即使调用每个子服务的耗时没有变,但是用户的体验升级了
3、削峰填谷
这就是秒杀服务常有的场景了
可以保证大量请求情况下的高可用
说他是削峰填谷,其实是一个十分形象的说法,削峰,就是削去瞬时超大并发量的峰,填谷,就是把之前削去的部分,让后面并发量比较小的时段来填,就像下面这张图:
二、MQ 的劣势
万物有利也有弊, MQ 也不例外
使用 MQ 的劣势如下:
如果 mq 挂了,会影响系统的运行
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息,如果 B 系统、C 系统处理成功,D 系统处理 失败。如何保证消息数据处理的一致性?
三、MQ 产品
市面上 MQ 产品很多,可选的有这么几类
我们以比较火热的 RabbitMQ 为学习重点。
四、MQ 实现方式
MQ 的实现方式有两个 AMQP 和 JMS
1、AMQP
Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用 层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,遵 循此协议,不收客户端和中间件产品和开发语言限制。2006年,AMQP 规范发布。类比HTTP。
AMQP 的架构如下:
2、JMS
JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间 件的API
JMS 是 JavaEE 规范中的一种,类比JDBC
很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有
3、二者区别
AMQP 和 JMS 的区别,就和 http 和 rpc 差不多
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模式;而AMQP的消息模式更加丰富
五、RabbitMQ
1、RabbitMQ 简介
下面是 RabbitMQ 的架构图:
我们可以对比一下 AMQP 的架构图,发现是差不多的,无非就是多了 Connection 和 vh
- Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
- Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类 似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务 时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
- Connection:publisher/consumer 和 broker 之间的 TCP 连接
- Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接, 如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含 了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- Queue:消息最终被送到这里等待 consumer 取走
- Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息
被保存到 exchange 中的查询表中,用于 message 的分发依据
RabbitMQ 有如下 6 种模式:
简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由 模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍);
https://www.rabbitmq.com/getstarted.html
2、RabbitMQ 安装
为了方便,我们使用 docker 进行安装
1、拉去MQ镜像,这里我们要选带 management 版本的,因为后面我们需要使用 MQ 管理控制台
docker pull rabbitmq:3.7.7-management
2、创建挂载镜像文件夹,后期我们要做配置的话,都要通过挂载的文件夹进行
3、启动容器
docker run -d --name rabbitmq3.7.7 \
-p 5672:5672 \
-p 15672:15672 \
-v `pwd`/rabbitmq:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123 \
2888deb59dfc
4、使用 ECS 的用户,一定别忘了把 15672 端口添加到安全组中
5、登录管理界面
输入:http://<你的主机ip>:15672
登录后,就会出现如下界面
3、RabbitMQ 入门案例
我们快速搭建一个生产者,一个消费者,来演示一下 RabbitMQ 的使用
模式的话,我们使用简单模式即可
1)生产者
Maven 依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
生产者:
public class Producer {
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("<输入RabbitMQ所在的主机的ip>");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
String message = "Hello ,this is faroz";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("已经发送消息:"+message);
channel.clearConfirmListeners();
connection.close();
}
}
在执行结束之后,我们来看看 rabbitmq 的管理控制台:
可以看到,queue 多了一条可以被消费的信息
我们点开来看,确实是我们发送的 message
2)消费者
我们先将创建连接的操作封装起来
public class ConnectionUtil {
private static ConnectionFactory factory = new ConnectionFactory();
static {
factory.setHost("47.117.129.89");
factory.setPort(5672);
factory.setVirtualHost("/test");
factory.setUsername("admin");
factory.setPassword("123");
}
public static Connection getConn() {
try {
return factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
public static void close(Connection conn, Channel channel) {
try {
if (channel!=null) {
channel.close();
}
if (conn!=null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void close(Connection conn) {
close(conn,null);
}
public static void close(Channel channel) {
close(null,channel);
}
}
消费者:
public class Consumer {
public static void main(String[] args) throws IOException {
Connection conn = ConnectionUtil.getConn();
Channel channel = conn.createChannel();
channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(Producer.QUEUE_NAME,true,consumer);
}
}
执行结果如下:
我们在多执行几次 producer,可以发现,消费者打印出了多条信息:
4、RabbitMQ 工作模式
之前提到,RabbitMQ 中,有六种工作模式
1)简单模式
快速入门案例中,使用的就是简单模式,这里不再赘述
2)工作队列模式(working queue)
这种模式相比于简单模式,只是多了几个消费者,所以这种模式,适用于任务较重的情况
因为代码都差不多,这里就不再赘述了
这里要注意,这种情况是多个消费者监听一个队列,对队列中的资源获取,是竞争性的
3)订阅模式(publish/subscribe)
相较于前两个模式,这个模式多了一个对象,交换机(exchange)
就是上图的 X
**交换机(图中的 X)**一方面,接收生产者发送的消息。另一方面,知道如何处理消 息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange的类型。Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
1-发布订阅模式 (Publisher/Subscribe)
发布订阅,其实就是广播,生产者会把消息广播到所有绑定的队列中,所有相关队列的消费者都可以进行消费
Publisher:
这里我们使用广播模式发送消息,即所有被绑定的队列,都会获得发布者发布的消息
public class PSPublisher {
public static final String FANOUT_EXCHANGE = "fanout_exchange";
public static final String FANOUT_QUEUE1 = "fanout_queue1";
public static final String FANOUT_QUEUE2 = "fanout_queue2";
public static void main(String[] args) throws IOException {
Connection conn = ConnectionUtil.getConn();
Channel channel = conn.createChannel();
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(FANOUT_QUEUE1,true,false,false,null);
channel.queueDeclare(FANOUT_QUEUE2,true,false,false,null);
channel.queueBind(FANOUT_QUEUE1,FANOUT_EXCHANGE,"");
channel.queueBind(FANOUT_QUEUE2,FANOUT_EXCHANGE,"");
for (int i = 0; i < 10; i++) {
String message = "这是FANOUT模式发送的信息:" + i;
channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());
System.out.println("发送的消息为:"+message);
}
ConnectionUtil.close(conn,channel);
}
}
执行完成后,我们查看一下控制台:
Subscriber1:
public class PSConsumer1 {
public static void main(String[] args) throws IOException {
Connection conn = ConnectionUtil.getConn();
Channel channel = conn.createChannel();
channel.exchangeDeclare(PSPublisher.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(PSPublisher.FANOUT_QUEUE1,true,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("消息 id 为:"+envelope.getDeliveryTag());
System.out.println("消息为:"+new String(body,"utf-8"));
}
};
channel.basicConsume(PSPublisher.FANOUT_QUEUE1,true,consumer);
}
}
Subscriber2:
消费者2 和消费者1差不多,只要修改绑定的队列即可
public class PSConsumer2 {
public static void main(String[] args) throws IOException {
Connection conn = ConnectionUtil.getConn();
Channel channel = conn.createChannel();
channel.exchangeDeclare(PSPublisher.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(PSPublisher.FANOUT_QUEUE2,true,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("消息 id 为:"+envelope.getDeliveryTag());
System.out.println("消息为:"+new String(body,"utf-8"));
}
};
channel.basicConsume(PSPublisher.FANOUT_QUEUE2,true,consumer);
}
}
我们两个消费者都执行以下,可以看到,队列中已经没有消息了:
2- 路由模式(routing)
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey (路由key)
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey 。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列 的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息
图解:
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
这里为了编写方便,我们将路由简化为只有两个,一个是插入,一个是更新:
生产者:
public class RPublisher {
public static final String ROUTER_EXCHANGE = "router_exchange";
public static final String QUEUE_INSERT = "queue_insert";
public static final String QUEUE_UPDATE = "queue_update";
public static void main(String[] args) throws IOException {
Connection conn = ConnectionUtil.getConn();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ROUTER_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_INSERT,true,false,false,null);
channel.queueDeclare(QUEUE_UPDATE,true,false,false,null);
channel.queueBind(QUEUE_INSERT,ROUTER_EXCHANGE,"insert");
channel.queueBind(QUEUE_UPDATE,ROUTER_EXCHANGE,"update");
String msg1 = "新增商品;路由模式为:insert router-key 为:insert";
channel.basicPublish(ROUTER_EXCHANGE,"insert",null,msg1.getBytes());
String msg2 = "更新商品;路由模式为:update router-key 为:update";
channel.basicPublish(ROUTER_EXCHANGE,"update",null,msg2.getBytes());
ConnectionUtil.close(conn,channel);
}
}
消费者1:
public class RConsumer1 {
public static void main(String[] args) throws IOException {
Connection conn = ConnectionUtil.getConn();
Channel channel = conn.createChannel();
channel.exchangeDeclare(RPublisher.ROUTER_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(RPublisher.QUEUE_INSERT,true,false,false,null);
channel.queueBind(RPublisher.QUEUE_INSERT,RPublisher.ROUTER_EXCHANGE,"insert");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("消息 id为:"+envelope.getDeliveryTag());
System.out.println("消费者1收到的消息为:"+new String(body,"utf-8"));
}
};
channel.basicConsume(RPublisher.QUEUE_INSERT,true,consumer);
}
}
消费者2:
消费者2同理,只要将消费者1 中的队列信息进行更换即可
3 - 通配符模式(topic)
通配符模式和路由模式一样,都是可以通过路由 key 指定消息发送的位置
区别在于,通配符模式可以使用通配符,一次匹配多个路由 key
通配符规则:
# :匹配多个单词
* :匹配一个单词
item.# :能够匹配 item.insert.abc 或者 item.insert
item.* :只能匹配 item.insert
示例其实和上面的路由模式差不多,要改的地方在于把之前生产者中声明的交换机,交给消费者声明,然后将路由 key 换为通配符
|