一、安装
-
将rabbigmq的安装包上传到linux系统中 erlang-22.0.7-1.el7.x86_64.rpm rabbitmq-server-3.7.18-1.el7.noarch.rpm -
安装Erlang依赖包 rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
-
安装RabbitMQ安装包(需要联网) yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
注意:默认安装完成后配置文件模板在:/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example目录中,需要将配置文件复制到/etc/rabbitmq/目录中,并修改名称为rabbitmq.config -
复制配置文件 cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
-
查看配置文件位置 ls /etc/rabbitmq/rabbitmq.config
-
修改配置文件(参见下图:) vim /etc/rabbitmq/rabbitmq.config
将上图中配置文件中红色部分去掉%%,以及最后的,逗号 修改为下图: -
执行如下命令,启动rabbitmq中的插件管理 rabbitmq-plubins enable rabbitmq_management
-
启动RabbitMQ的服务 systemctl start rabbitmq-server
systemctl restart rabbitmq-server
systemctl stop rabbitmq-server
-
查看服务状态 systemctl status rabbitmq-server
二、rabbitmq的工作原理
- rabbitmq是一个生产者—消费者的模型,生产者往消息队列里不断写入消息,另一端消费者读取或者订阅队列中的消息,基于AMQP协议。
- 核心组件是Exchange和Queue,在server端,生产者、消费者在应用端。
应用程序跟rabbitmq之间会创建一个TCP连接,连接通过后会创建一条信道(Channel),发布、订阅消息都是通过信道完成的;然后有一个交换器(exchange),用来接收生产者发送的消息并将这些消息路由给服务中的队列(Queue);队列用来保存消息直到发送给消费者;
三、模型
1.基本消息模型
RabbitMQ是一个消息代理:它接收和转发消息。你可以把它想象成一个邮局:当你把邮件放进邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。 RabbitMQ和邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。 P(producer/ publisher):生产者,一个发送消息的用户应用程序。 C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序 队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。 总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。
2.work消息模型
工作队列或者竞争消费者模式 工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多工人时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。 总之:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
3.订阅模型(三类)
解读: 1、1个生产者,多个消费者 2、每一个消费者都有自己的一个队列 3、生产者没有将消息直接发送到队列,而是发送到了交换机 4、每个队列都要绑定到交换机 5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的 X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。 Exchange类型有以下几种:
-
Fanout :广播,将消息交给所有绑定到交换机的队列 -
Direct:定向,把消息交给符合指定routing key的队列 -
Topic:通配符,把消息交给符合routing pattern(路由模式)的队列 Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
(1) 订阅模型-Fanout
Fanout,也称为广播。 在广播模式下,消息发送流程是这样的:
- 可以有多个消费者
- 每个消费者有自己的queue(队列)
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 交换机把消息发送给绑定过的所有队列
- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
(2)订阅模型-Direct:
在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key),消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。 P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key. X:Exchange(交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列 C1:消费者,其所在队列指定了需要routing key为error的消息 C2:消费者,其所在队列制定了需要routing key为info、error、warning的消息
(3) 订阅模型-Topic
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型的Exchange可以让队列在绑定Routing key 的时候使用通配符! 通配符规则:
四、SpringBoot 中使用 RabbitMQ
1.搭建初始环境
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
application:
name: springboot_rabbitmq
rabbitmq:
host: 10.15.0.9
port: 5672
username: ems
password: 123
virtual-host: /ems
2.第一种hello world模型使用
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testHello(){
rabbitTemplate.convertAndSend("hello","hello world");
}
五、RabbitMQ 的应用场景
1.异步处理
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1.串行的方式 2.并行的方式
-
串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。这有一种做法让客户端等待没有必要等待的东西。 -
并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。 -
消息队列:假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已经提高了处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库就返回。消息队列:引入消息队列后,把发送邮件、短信不是必须的业务逻辑异步处理 由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列处理后,响应时间是串行的3倍,是并行的2倍。
2.应用解耦
场景:淘宝双11,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。 这种做法的缺点是:
- 当库存系统出现故障时,订单就会失败。
- 订单系统和库存系统高耦合。
引入消息队列
- 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
- 库存系统:订阅下单的消息,获取下单消息,进行库操作。
就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。
3.流量削峰
流量削峰一般在秒杀活动中应用广泛 场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。 作用: 1.可以控制活动人数,超过此一定阈值的订单直接丢弃(我为什么秒杀一次都没有成功过呢?) 2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单) 1.服务器收到用户的请求之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面。 2.秒杀业务根据消息队列中的请求信息,再做后续处理。
六、RabbitMQ 的集群
1.集群架构
1.1普通集群(副本集群)
默认情况下:RabbitMQ代理操作所需的所有数据/状态都将跨所有节点复制。这方面的一个例外是消息队列,默认情况下,消息队列位于一个节点上,尽管他们可以从所有节点看到和访问 1.架构图 核心解决问题: 当集群中某一时刻master节点宕机,可以对Queue中信息进行备份 2.集群搭建
-
0.集群规划 node1: 10.15.0.3 mq1 master 主节点
note2: 10.15.0.4 mq2 repl1 副本节点
note3: 10.15.0.5 mq3 repl2 副本节点
-
1.克隆三台机器主机名和ip映射 vim /etc/hosts加入:
10.15.0.3 mq1
10.15.0.4 mq2
10.15.0.5 mq3
node1: vim /etc/hostname 加入: mq1
node2: vim /etc/hostname 加入: mq2
node3: vim /etc/hostname 加入: mq3
-
2.三个机器安装rabbitmq,并同步cookie文件,在node1上执行: scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/
-
3.查看cookie是否一致 node1: cat /var/lib/rabbitmq/.erlang.cookie
node2: cat /var/lib/rabbitmq/.erlang.cookie
node3: cat /var/lib/rabbitmq/.erlang.cookie
-
4.后台启动rabbitmq所有节点执行如下命令,启动成功访问管理界面: rabbitmq-server -detached
-
5.在node2和node3执行加入集群命令: 1.关闭 rabbitmqctl stop_app
2.加入集群 rabbitmqctl join_cluster rabbit@mq1
3.启动服务 rabbitmqctl start_app
-
6.查看集群状态,任意节点执行: rabbitmqctl cluster_status
-
7.如果出现如下图示,集群搭建成功 Cluster status of node rabbit@mq3 ...
1.2 镜像集群
镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间进行自动同步,且如果其中一个节点不可用,并不会导致消息丢失或服务不可用的情况,提高MQ集群的整体高可用性。 1.集群架构图 2.配置集群架构
-
0.策略说明 -
1.查看当前策略 rabbitmqctl list_policies
-
2.添加策略 rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
说明:策略正则表达式为 “^” 表示所有匹配所有队列名称 ^hello: 匹配hello开头队列 -
3.删除策略 rabbitmqctl clear_policy ha-all
-
4.测试集群
|