一、什么是消息中间件?
1、什么是消息中间件?
在分布式项目中,一个系统 A (消费者),调用另一个系统 B (提供者)去向用户发送一些成功提示消息(下单成功等)。如果我们直接让 A 去调用 B,那么会存在耦合性的问题,系统的性能也会收到局限
业务场景说明: 像这样的消息中间件(也叫消息队列)在大型电子商务类网站,如京东、淘宝、去哪儿等网站有着深入的应用,为什么会产生消息队列?有几个原因:
- 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间在添加一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;这就是降低耦合度
- 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;这就是对系统进行消峰处理
- 在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
2、什么是消息队列?
- 消息队列(MQ Message Queue)就是这样的一款消息中间件的系统,它是一种应用程序对应用程序的通信方法
- 消息队列就是数据结构中的 “先进先出” 队列,消息进入 MQ 后会排队等待订阅它的系统来处理它
- 消息传递:指的是程序之间通过消息发送数据j进行通信(上图第二种),而不是通过彼此之间调用来通信(上图第一种)
- 队列的主要作用是消除高并发访问高峰(消峰),加快网站的响应速度。
2.1、消息队列的三大作用
解耦服务、异步处理、流量消峰
1. 应用解耦
传统模式 传统模式下,系统间的耦合性太强;
中间件模式 中间件模式的优点:
- 将消息写入消息队列,需要消息的系统可以自己从消息中间件中订阅,无需消息发布者做任何修改 (发布订阅者模式)
2. 异步处理 场景说明:用户注册后,需要发注册消息和注册短信给用户,传统的方法有两种:
- 串行的方法
- 并行的方法
1)串行方式
将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。
这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。
2)并行方式
将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。但是顾客还是多等了无关紧要的 50ms
3)消息队列
引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理
这样可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计);
引入消息队列后处理后,响应时间是串行的3分之1,是并行的2分之1。
总之: 中间件模式将一些非必要的业务,以异步的方式执行,加快响应速度
3. 流量消峰 流量消峰一般在秒杀活动中应用广泛。
场景: 秒杀活动,一般会应为流量过大,导致服务挂掉。为了解决这个问题,一般在应用前端加入消息队列。
传统模式
如订单系统,在下单的时候就会向数据库中写入数据,但是数据库只能支撑 1000/s 左右的并发操作,并发量再高就会容易宕机。在高峰期的时候,并发量可能会突然激增,达到数据库承载极限,这个时候数据库可能就会卡死。
中间件模式
前端发送过来的数据被 消息中间件保留下来,通过消息队列一点一点(1000/s)的被后端服务订阅处理,这样中间件就会起到缓冲(消峰)的作用了
MQ:帮我们处理了流量洪峰,保护了系统 A
中间模式的优点
- 系统按照数据库能够处理的并发量(1000/s)慢慢地从消息队列中拉取消息。在生产中,短暂的高峰期积压是被允许的
- 流量消峰也叫做消峰填谷
- 使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”
2.2、AMQP 和 JMS
MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。
- AMQP 是一种高级消息队列协议(Advanced Message Queuing Protocol),更准确的说是一种 binary wire-level protocol(链接协议)。这是其和 JMS 的本质差别,AMQP 不从 API 层进行限定,而是直接定义网络交换的数据格式。
- JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
AMQP 和 JMS 的区别
- JMS 是定义了统一的接口,来对消息操作进行统一;AMQP 是通过规定协议来统一数据交互的格式
- JMS 限定了必须使用Java语言;AMQP 只是协议,不规定实现方式,因此是跨语言的。
- JMS 规定了两种消息模式;而 AMQP 的消息模式更加丰富
2.3、消息队列的产品
市场上常见的消息队列有如下:
- ActiveMQ:基于JMS
- ZeroMQ:基于C语言开发
- Rabbitmq:基于AMQP协议,erlang语言开发,稳定性好
- RocketMQ:基于JMS,阿里巴巴产品
- Kafka:类似MQ的产品;分布式消息系统,高吞吐量
2.4、RabbitMQ
RabbitMQ 简介
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,它只是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。
所以 RabbitMQ 是跨平台的
RabbitMQ 基础架构如下图:
- Broker:就相当于一个数据库服务
接收和分发消息的应用,RabbitMQ Server就是 Message Broker。 - Virtual host:就相当于是数据库服务中的一个库
出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等; - Connection:相当于 jdbc链接
publisher/consumer 和 broker 之间的 TCP 连接; - Channel:就是对一个链接进行多路复用,省去每次建立链接的开销
如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP - Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了- channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销; - Exchange:交换机或者说是路由,负责按照指定模式分发消息到多个消息队列中
message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:点对点 direct (point-to-point) ,发布订阅 topic (publish-subscribe) ,广播 fanout (multicast) - Queue:存储消息的容器,消息最终被送到这里,等待 consumer 取走
- Binding:通过 key 将一个或一类消息绑定到某一个消息 Queue 中
exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
二、RabbitMQ 的安装
1、安装
由于 RabbitMQ 是 erlang 语言开发的,所以我们要首先安装 erlang 语言的开发环境,此外 RabbitMQ 还依赖了 socat,所以要下载 socat 安装工具提取码:6rzc
RabbitMQ 版本对应的 erlang 版本: https://www.rabbitmq.com/which-erlang.html
-
顺序安装 rpm -ivh erlang-21.3.8.9-1.el7.x86_64.rpm rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm rpm -ivh rabbitmq-server-3.8.1-1.el7.noarch.rpm -
启动管理插件(/usr/lib/rabbitmq/bin/) rabbitmq-plugins enable rabbitmq_management -
启动 RabbitMQ systemctl start rabbitmq-server.service systemctl status rabbitmq-server.service systemctl restart rabbitmq-server.service systemctl stop rabbitmq-server.service -
测试 在虚拟机浏览器中输入:localhost:15672/ 默认账户:guest,密码:guest 注:guest 账户不支持远程访问 -
添加自定义账户(支持远程访问) 添加管理员账号密码:rabbitmqctl add_user admin admin 分配账号角色:rabbitmqctl set_user_tags admin administrator 修改密码:rabbitmqctl change_password admin 123456 查看用户列表:rabbitmqctl list_users
管理界面标签页介绍
- overview:概览
- connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
- channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
- Exchanges:交换机,用来实现消息的路由
- Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。
端口
- 5672:rabbitMq 的编程语言客户端连接端口
- 15672:rabbitMq 管理界面端口
- 25672:rabbitMq 集群的端口
2、卸载
- rpm -qa | grep rabbitmq
- rpm -e rabbitmq-server
3、管理界面
如果不使用 guest,我们也可以自己创建一个用户
3.1、添加用户及用户类别
添加用户
添加管理员账号密码:rabbitmqctl add_user admin admin 分配账号角色:rabbitmqctl set_user_tags admin administrator 修改密码:rabbitmqctl change_password admin 123456 查看用户列表:rabbitmqctl list_users
用户类别
- 超级管理员(administrator)
可登录管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。 - 监控者(monitoring)
可登录管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) - 策略制定者(policymaker)
可登录管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息。 - 普通管理者(management)
仅可登录管理控制台,无法看到节点信息,也无法对策略进行管理。 - 其他
无法登录管理控制台,通常就是普通的生产者和消费者。
3.2、创建 Virtual Hosts
虚拟主机:类似于 mysql 中的 database。他们都是以 “/” 开头
设置虚拟主机的权限
三、RabbitMQ 入门案例
官方文档:https://www.rabbitmq.com/getstarted.html
1、简单模式
需求:使用简单模式完成消息传递
步骤:
- 创建工程(生成者、消费者)
- 分别添加依赖
- 编写生产者发送消息
- 编写消费者接收消息
创建生产者
public class Producer {
public static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.17.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello, Rabbit2!!!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
channel.close();
connection.close();
}
}
创建消费者
public class Consumer {
public static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.17.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag = " + consumerTag);
System.out.println("envelope = " + envelope);
System.out.println("properties = " + properties);
System.out.println("message = " + new String(body, "UTF-8"));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消费者
2、Work 模式
先启动消费者,在启动生产者
生产者
public class Producer {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 30; i++) {
String message = "Hello, Rabbit" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
channel.close();
connection.close();
}
}
消费者
多个消费者同时消费一个队列,默认是平均分配,每个消费者拿到的消息数量是相等的
如果想实现性能高的消费者处理的消息更多,性能差的消费者处理的消息较少,则需要:
- 指定Qos为 1;
- 手动确认消息
不确认消息会怎样?
- 如果消息没有被确认,一旦消费者退出,停止消费,未被确认的消息不会被删除,原位返回队列;
什么时候该确认消息?
- 不重要的消息自动确认即可,重要的消息手动确认
- 假如:当我们下订单的时候,订单成功(数据库写入)就确认消息;如果数据库写入失败(报异常)就不确认消息
public class Consumer1 {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message = " + new String(body, "UTF-8"));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
public class Consumer2 {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message = " + new String(body, "UTF-8"));
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
3、发布与订阅模式
|