一、为什么使用RabbitMq
- RabbitMq是一个实现了AMQP(advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用erlang语言实现
- 可靠性,RabbitMQ的持久化支持,保证消息的稳定性·;
- 高并发高可用,RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开放的语言,本身具有高并发的优势
- 集群部署简单,(Erlang功劳)
- 使用频率较高
二、RabbitMq工作机制
- 生产者:消息的创建者,负责创建和推动数据到消息服务器;
- 消费者:消息的接受方用于处理数据和确认消息
- 代理:就是本身RabbitMQ本身,用于扮演“跑腿”的角色,本身不产生消息
原理流程:就是生产者发送到消息到虚拟主机,虚拟主机把消息交给指定的交换机,交换机按照规则扔给消息队列进行存储,消息队列等待消费者来消费
消息持久化原理:
- Rabbit会将持久化消息写入磁盘上的持久化日志,等消息被消费后,Rabbit会把这条消息标识为等待垃圾回收
持久化缺点
-因为写入硬盘要比写入内存性能低很多,从而降低了服务器的吞吐量,景观使用SSD硬盘可以使这种情况得到缓解,但同时仍然使Rabbit性能大幅度下降,当消息成千上万要写入磁盘的时候性能是很低的
三、 MQ应用场景
1.同步消息变异步消息
- 当用户下单完成,发送邮件和短信通知
- 运用消息队列之后,用户下单完成之后,下单的信息写入数据库,再写入消息队列,发送邮件消息和发送短信各自去消息队列进行读取
- 节省时间提高效率
2、应用解耦
场景:用户下单后,订单需要多渠道通知用户
- a.下单服务系统:用户使用下单服务后,讲下单信息写入数据库,下单成功。
- b.短信服务系统:用户下单后将短信写入消息队列,已发送短信信息通知用户交易信息‘
- b.邮件服务系统:用户下单后将邮件写入消息队列,已发送邮件信息通知用户交易信息
这样,如果微信通知不能正常使用,也不影响用户下单,用户下单后,只把下单的通知消息写入消息队列,也不需要关心后续的操作,实现了订单系统和通知系统的解耦
2、流量削峰
使用场景:(电商里秒杀或者团购活动)秒杀活动中一般流量暴增会导致应用挂掉。针对这个问题,一般在应用前段加入消息
- 1、可以控制活动人数
- 2、可以缓解短时间内高流量的应用压力
原因 服务器接收到用户的请求后,首先写入消息队列,如果消息队列的数量大于最大数量,则直接抛弃用户请求或者跳转到错误页面
消息流程
-
生产者(producer) 把消息发送给交换机。当你创建交换机的时候,你需要指定类型。交换机的类型接下来会讲到。 -
交换机(exchange) 接收消息并且负贵对消息进行路由。根据交换机的类型,消息的多个属性会被使用,例如路由键。 -
绑定(bindlng) 需要从交换机到队列的这种方式来进行创建。在这个例子里,我们可以看到交换机有到两个不同队列的绑定,交换机根据消息的属性来把消息分发到不同的队列上。 -
消息(message) 消息会一直留在队列里直到被消费。 -
消费者(consumer) 处理消息。
RabbitMQ核心概念
- ·生产者(Producer):发送消它的应用。
- 消费者(Consumer):接收消息的应用。
- 队列(Queue):存储消息的惯存。
- 消息(Message):又生产者通过RabbitMQ发送给消费者的信息。
- 连接(Connection):连接RabbitMQ和应用服务器的TCP连度。
- 通道(Channel):连接里的一个虚拟通道。当你通过消息队列发送或者接收消息时,这个操作都是通过通道进行的。交换机(Exchange):从生产者那里接收消息,并根据交换类型分发到对应的消息列队里。要实现消息的接收,一个队列必须绑定一个交换机。
- 绑定(Binding):绑定是队列和交换机的一个链接。
四、RabbitMQ几种集群模式
保证幂等性
场景1 :假设你有个系统,消费一条往数据库里插入一条,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下已经消费过了,直接扔了,不就保南了一条数据? 一条数据重复出现两次,数据库甲就只有一条数据,这就保证了系统的幕等性幂等性,我通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。 解决方案:
- 拿到数据要写库,先根据主键声一下,如果数据存在,则不能插入,update一下
- 通过redis,那没问题了,反正每次都是set,天然幂等性
场景2 :若上面场景不能满足业务需求 这时换个解决方案:
- 需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,
- 消息消费时,先根据此id去查一下,比如redis里查一下,之前消费过吗?
- 如果没有消费过,则正常进行消息处理,然后讲此id写入redis。
- 如果消费过,则拒绝进行处理,保证对每条消息只进行一次处理;
其他方案 比如基于数据库的唯一键来保证重复数据不会重复插入多条,我们之前线上系统就有这个问题,就是拿到数据的时候,每次重启可能会有重复,因为kafka消费者还没来得及提交offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据
五、消息丢失
-
场景:生产者将数据发送到rabbitmq的时候,可能数据就在半路给搞丢了,因为网络啥的问题,都有可能 -
生产者端丢失数据解决:
- rabbitmq事务机制(同步的):生产者发送数据之前开启rabbitmq事务(channel.b:Select),然后发送消息,如果消盘没有成功被rabbitmq接收到,那么生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重试发送消息;如果收到了消息,那么可以提交事务(channel.txCommit)
- 开启confirm模式(异步的),在生产者那里设置开启confirm楼式之后,你每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmg中,rabbitmq会给你回传一个ack消息,告诉你说这个消息ok了。如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发
-
rabbitmq弄丢了数据 就是rabbitmq自己弄丢了数据,这个你必须开启rabbitmq的持久化,就是消息写入之后会持久化到磁盘,哪怕是rabbitmq自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,rabbitmq还役持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。
-
步骤一:一个是创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里的数据; -
第二个是发送消息的时候将消息的dellveryMode设置为2,就是将消息设置为持久化的,此时rabbitmq就会将消息持久化到磁盘上去,必须要同时设置这两个持久化才行,rabbitmq哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。而且持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,rabbitmq挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。哪怕是你给rabbitmq开启了持久化机制,也有一种可能,就是这个消息写到了rabbitmq中,但是还没来得及结久化到磁盘上,结果不巧,此时rabbitmq挂了,就会导致内存里的一点点数据会丢失。 -
消费者端丢失数据
rabbitmq如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,rabbitmq认为你都消费了,这数据就丢了。这个时候得用rabbitmq提供的ack机制,简单来说,就是你关闭rabbitmq自动ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那rabbitmq就认为你还没处理完,这个时候rabbitmq会把这个消费分配给别的consumer去处理,消息是不今王的
六、消息顺序性
|