什么是消息队列
消息队列就是一个使用队列来通信的组件 但就现在而言我们日常所说的 消息队列常常指代的是消息中间件,它的存在不仅仅只是为了通信这个问题。
为什么需要消息队列
异步处理、 服务解耦、 流量控制
异步处理
项目请求链路越来越长,调用链路长,响应就慢 消息队列可以减少请求的等待,还能让服务异步并发处理,提升系统总体性能
随着公司的发展你可能会发现你项目的请求链路越来越长,例如刚开始的电商项目,可以就是粗暴的扣库存、下单。慢慢地又加上积分服务、短信服务等。 这一路同步调用下来客户可能等急了,这时候就是消息队列登场的好时机。 调用链路长、响应就慢了,并且相对于扣库存和下单,积分和短信没必要这么的"及时"。因此只需要在下单结束那个流程,扔个消息到消息队列中就可以直接返回响应了。而且积分服务和短信服务可以并行的消费这条消息。 可以看出消息队列可以减少请求的等待,还能让服务异步并发处理,提升系统总体性能
服务解耦
上面我们说到加了积分服务和短信服务,这时候可能又要来个营销服务,之后领导又说想做个大数据, 又来个数据分析服务等等。 可以发现订单的下游系统在不断的扩充,为了迎合这些下游系统订单服务需要经常地修改,任何一个下 游系统接口的变更可能都会影响到订单服务,这订单服务组可疯了,真 ·「核心」项目组。 所以一般会选用消息队列来解决系统之间耦合的问题,订单服务把订单相关消息塞到消息队列中,下游 系统谁要谁就订阅这个主题。这样订单服务就解放啦!
流量控制
想必大家都听过「削峰填谷」,后端服务相对而言都是比较「弱」的,因为业务较重,处理时间较长。 像一些例如秒杀活动爆发式流量打过来可能就顶不住了。因此需要引入一个中间件来做缓冲,消息队列再适合不过了。 网关的请求先放入消息队列中,后端服务尽自己最大能力去消息队列中消费请求。超时的请求可以直接返回错误。 当然还有一些服务特别是某些后台任务,不需要及时地响应,并且业务处理复杂且流程长,那么过来的请求先放入消息队列中,后端服务按照自己的节奏处理。这也是很 nice 的。 上面两种情况分别对应着生产者生产过快和消费者消费过慢两种情况,消息队列都能在其中发挥很好的缓冲效果。
注意
引入消息队列固然有以上的好处,但是多引入一个中间件系统的稳定性就下降一层,运维的难度抬高一层。因此要权衡利弊,系统是演进的。
消息队列基本概念
消息队列有两种模型:队列模型和发布/订阅模型。
队列模型
生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费 者, 但是消费者之间是竞争关系,即每条消息只能被一个消费者消费。
发布/订阅模型
为了解决一条消息能被多个消费者消费的问题,发布/订阅模型就来了。该模型是将消息发往一个Topic 即主题中,所有订阅了这个 Topic 的订阅者都能消费这条消息。 其实可以这么理解,发布/订阅模型等于我们都加入了一个群聊中,我发一条消息,加入了这个群聊的人都能收到这条消息。 那么队列模型就是一对一聊天,我发给你的消息,只能在你的聊天窗口弹出,是不可能弹出到别人的聊天窗口中的。 讲到这有人说,那我一对一聊天对每个人都发同样的消息不就也实现了一条消息被多个人消费了嘛。 是的,通过多队列全量存储相同的消息,即数据的冗余可以实现一条消息被多个消费者消费。 RabbitMQ 就是采用队列模型,通过 Exchange 模块来将消息发送至多个队列,解决一条消息需要被多个消费者消费问题 RabbitMQ 采用队列模型, RocketMQ 和 Kafka 采用发布/订阅模型。
常用术语
一般我们称发送消息方为生产者 Producer ,接受消费消息方为消费者 Consumer ,消息队列服务端为Broker 。 消息从 Producer 发往 Broker , Broker 将消息存储至本地,然后 Consumer 从 Broker 拉取消息,或者 Broker 推送消息至 Consumer ,最后消费。 为了提高并发度,往往发布/订阅模型还会引入队列或者分区的概念。即消息是发往一个主题下的某个队列或者某个分区中。 RocketMQ 中叫队列, Kafka 叫分区,本质一样
例如某个主题下有 5 个队列,那么这个主题的并发度就提高为 5 ,同时可以有 5 个消费者并行消费该主题的消息。一般可以采用轮询或者 key hash 取余等策略来将同一个主题的消息分配到不同的队列中。
与之对应的消费者一般都有组的概念 Consumer Group , 即消费者都是属于某个消费组的。一条消息会发往多个订阅了这个主题的消费组
假设现在有两个消费组分别是 Group 1 和 Group 2 ,它们都订阅了 Topic-a 。此时有一条消息发往 Topic-a ,那么这两个消费组都能接收到这条消息。 然后这条消息实际是写入 Topic 某个队列中,消费组中的某个消费者对应消费一个队列的消息。 在物理上除了副本拷贝之外,一条消息在 Broker 中只会有一份,每个消费组会有自己的 offset 即消费点位来标识消费到的位置。在消费点位之前的消息表明已经消费过了。当然这个 offset 是队列级别的。每个消费组都会维护订阅的 Topic 下的每个队列的 offset 。 来个图看看应该就很清晰了。
如何保证消息不丢失
生产消息
生产者发送消息至 Broker ,需要处理 Broker 的响应,不论是同步还是异步发送消息,同步和异步回调都需要做好 try-catch ,妥善的处理响应,如果 Broker 返回写入失败等错误消息,需要重试发送。 当多次发送失败需要作报警,日志记录等。 这样就能保证在生产消息阶段消息不会丢失。
存储消息
存储消息阶段需要在消息刷盘之后再给生产者响应,假设消息写入缓存中就返回响应,那么机器突然断 电这消息就没了,而生产者以为已经发送成功了。 如果 Broker 是集群部署,有多副本机制,即消息不仅仅要写入当前 Broker ,还需要写入副本机中。那 配置成至少写入两台机子后再给生产者响应。这样基本上就能保证存储的可靠了。一台挂了还有一台还 在呢(假如怕两台都挂了…那就再多些)。
消费消息
这里经常会有同学犯错,有些同学当消费者拿到消息之后直接存入内存队列中就直接返回给 Broker 消 费成功,这是不对的。 你需要考虑拿到消息放在内存之后消费者就宕机了怎么办。所以我们应该在消费者真正执行完业务逻辑 之后,再发送给 Broker 消费成功,这才是真正的消费了。 所以只要我们在消息业务逻辑处理完成之后再给 Broker 响应,那么消费阶段消息就不会丢失
小结一下 可以看出,保证消息的可靠性需要三方配合。 生产者 需要处理好 Broker 的响应,出错情况下利用重试、报警等手段。 Broker 需要控制响应的时机,单机情况下是消息刷盘后返回响应,集群多副本情况下,即发送至两个 副本及以上的情况下再返回响应。 消费者 需要在执行完真正的业务逻辑之后再返回响应给 Broker 。
但是要注意消息可靠性增强了,性能就下降了,等待消息刷盘、多副本同步后返回都会影响性能。因此 还是看业务,例如日志的传输可能丢那么一两条关系不大,因此没必要等消息刷盘再响应。
如何处理重复消息
重复消息的产生
我们先来看看能不能避免消息的重复。 生产者: 假设我们发送消息,就管发,不管 Broker 的响应,那么我们发往 Broker 是不会重复的。 但是一般情况我们是不允许这样的,这样消息就完全不可靠了,我们的基本需求是消息至少得发到Broker 上,那就得等 Broker 的响应,那么就可能存在 Broker 已经写入了,当时响应由于网络原因生产者没有收到,然后生产者又重发了一次,此时消息就重复了。 消费者: 再看消费者消费的时候,假设我们消费者拿到消息消费了,业务逻辑已经走完了,事务提交了,此时需要更新 Consumer offset 了,然后这个消费者挂了,另一个消费者顶上,此时 Consumer offset 还没更新,于是又拿到刚才那条消息,业务又被执行了一遍。于是消息又重复了。
可以看到正常业务而言消息重复是不可避免的,因此我们只能从另一个角度来解决重复消息的问题。 关键点就是幂等。既然我们不能防止重复消息的产生,那么我们只能在业务上处理重复消息所带来的影响
幂等处理重复消息
幂等是数学上的概念,我们就理解为同样的参数多次调用同一个接口和调用一次产生的结果是一致的 例如这条 SQL
update t1 set money = 150 where id = 1 and money = 100;
执行多少遍 money 都是150,这就叫幂等。 因此需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果 可以通过上面我那条 SQL 一样,做了个前置条件判断,即 money = 100 情况,并且直接修改, 更通用的是做个 version 即版本号控制,对比消息中的版本号和数据库中的版本号。 或者通过数据库的约束例如唯一键,例如 insert into update on duplicate key… 。 或者记录关键的key,比如处理订单这种,记录订单ID,假如有重复的消息过来,先判断下这个ID是否已经被处理过了,如果没处理再进行下一步。当然也可以用全局唯一ID等等。 基本上就这么几个套路,真正应用到实际中还是得看具体业务细节。
如何保证消息的有序性
有序性分:全局有序和部分有序。
全局有序
如果要保证消息的全局有序,首先只能由一个生产者往 Topic 发送消息,并且一个 Topic 内部只能有 一个队列(分区)。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的! 不过一般情况下我们都不需要全局有序,即使是同步 MySQL Binlog 也只需要保证单表消息有序即可
部分有序
因此绝大部分的有序需求是部分有序,部分有序我们就可以将 Topic 内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。这样即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率。 图中我画了多个生产者,一个生产者也可以,只要同类消息发往指定的队列即可。
如何处理消息堆积
消息的堆积往往是因为生产者的生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了。
因此我们需要先定位消费慢的原因,如果是 bug 则处理 bug ,如果是因为本身消费能力较弱,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次我们批量处理,比如数据库的插入,一条一条插和批量插效率是不一样的。 假如逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加 Topic 的队列数和消费者数量, 注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者。
如何写个消息中间件?
首先我们需要明确地提出消息中间件的几个重要角色,分别是生产者、消费者、Broker、注册中心。 简述下消息中间件数据流转过程,无非就是生产者生成消息,发送至 Broker,Broker 可以暂缓消息,然后消费者再从 Broker 获取消息,用于消费。
而注册中心用于服务的发现包括:Broker 的发现、生产者的发现、消费者的发现,当然还包括下线,可以说服务的高可用离不开注册中心。
然后开始简述实现要点,可以同通信讲起:各模块的通信可以基于 Netty 然后自定义协议来实现,注册中心可以利用 zookeeper、consul、eureka、nacos 等等,也可以像 RocketMQ 自己实现简单的namesrv (这一句话就都是关键词)。 为了考虑扩容和整体的性能,采用分布式的思想,像 Kafka 一样采取分区理念,一个 Topic 分为多个partition,并且为保证数据可靠性,采取多副本存储,即 Leader 和 follower,根据性能和数据可靠的权衡提供异步和同步的刷盘存储。 并且利用选举算法保证 Leader 挂了之后 follower 可以顶上,保证消息队列的高可用。 也同样为了提高消息队列的可靠性利用本地文件系统来存储消息,并且采用顺序写的方式来提高性能。 可根据消息队列的特性利用内存映射、零拷贝进一步的提升性能,还可利用像 Kafka 这种批处理思想提高整体的吞吐。
之后可以深挖的点就很多了,比如提到的 Netty,各种注册中心就能问很多,比如各注册中心之间的选 型对比等。 你还提到了选举算法,所以可能会问 Bully 算法、Raft 算法、ZAB 算法等等。 你还提到了分区,可能会问这个分区和 RocketMQ 的队列有什么不同啊?具体分区要怎么实现? 然后你提到顺序写,可能会问为什么要顺序写啊?你说的内存映射和零拷贝又是什么啊?那你知道 RocketMQ 和 Kafka 用了哪个吗? 当然还有可能问各种细节,比如消息的写入如何存储、消息的索引如何生成等等,来深挖看你有没有看 过消息中间件的源码。 可以问的还很多,这篇文章我也不可能每个点都延伸开说,这些知识点还是得靠大家日积月累和平日的 多加思考。 当然日后的文章可以写一写今天提到的一些点,比如 Netty、选举算法啊,多种注册中心对比啊啥的。
消息队列设计成推消息还是拉消息?RocketMQ和Kafka是怎么做的?
推拉模式 首先明确一下推拉模式到底是在讨论消息队列的哪一个步骤,一般而言我们在谈论推拉模式的时候指的是 Comsumer 和 Broker 之间的交互。 默认的认为 Producer 与 Broker 之间就是推的方式,即 Producer 将消息推送给 Broker,而不是Broker 主动去拉取消息。
推模式
推模式指的是消息从 Broker 推向 Consumer,即 Consumer 被动的接收消息,由 Broker 来主导消息的发送。
我们来想一下推模式有什么好处? 1.消息实时性高, Broker 接受完消息之后可以立马推送给 Consumer。 2.对于消费者使用来说更简单,简单啊就等着,反正有消息来了就会推过来。 推模式有什么缺点? 1推送速率难以适应消费速率,推模式的目标就是以最快的速度推送消息,当生产者往 Broker 发送消息的速率大于消费者消费消息的速率时,随着时间的增长消费者那边可能就“爆仓”了,因为根本消费不过来啊。当推送速率过快就像 DDos 攻击一样消费者就傻了。 并且不同的消费者的消费速率还不一样,身为 Broker 很难平衡每个消费者的推送速率,如果要实现自适应的推送速率那就需要在推送的时候消费者告诉 Broker ,我不行了你推慢点吧,然后 Broker 需要维护每个消费者的状态进行推送速率的变更。 这其实就增加了 Broker 自身的复杂度。 所以说推模式难以根据消费者的状态控制推送速率,适用于消息量不大、消费能力强要求实时性高的情况下。
拉模式
拉模式指的是 Consumer 主动向 Broker 请求拉取消息,即 Broker 被动的发送消息给 Consumer。 我们来想一下拉模式有什么好处? 拉模式主动权就在消费者身上了,消费者可以根据自身的情况来发起拉取消息的请求。假设当前消费者觉得自己消费不过来了,它可以根据一定的策略停止拉取,或者间隔拉取都行。
拉模式下 Broker 就相对轻松了,它只管存生产者发来的消息,至于消费的时候自然由消费者主动发起,来一个请求就给它消息呗,从哪开始拿消息,拿多少消费者都告诉它,它就是一个没有感情的工具人,消费者要是没来取也不关它的事。 拉模式可以更合适的进行消息的批量发送,基于推模式可以来一个消息就推送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者到底能不能一次性处理这么多消息。而拉模式就更加合 理,它可以参考消费者请求的信息来决定缓存多少消息之后批量发送。 拉模式有什么缺点? 1消息延迟,毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢?所以它只能不断地拉取,但是 又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker 了。因此需要降低请求的频率,比如隔个 2 秒请求一次,你看着消息就很有可能延迟 2 秒了。 2消息忙请求,忙请求就是比如消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的, 在做无用功。
那到底是推还是拉
RocketMQ 和 Kafka 都选择了拉模式,当然业界也有基于推模式的消息队列如 ActiveMQ。 我个人觉得拉模式更加的合适,因为现在的消息队列都有持久化消息的需求,也就是说本身它就有个存储功能,它的使命就是接受消息,保存好消息使得消费者可以消费消息即可。 而消费者各种各样,身为 Broker 不应该有依赖于消费者的倾向,我已经为你保存好消息了,你要就来拿好了。 虽说一般而言 Broker 不会成为瓶颈,因为消费端有业务消耗比较慢,但是 Broker 毕竟是一个中心点, 能轻量就尽量轻量。 那么竟然 RocketMQ 和 Kafka 都选择了拉模式,它们就不怕拉模式的缺点么? 怕,所以它们操作了一波,减轻了拉模式的缺点。
长轮询
RocketMQ 和 Kafka 都是利用“长轮询”来实现拉模式 具体的做法都是通过消费者等待消息,当有 消息的时候 Broker 会直接返回消息,如果没有消息都会采取延迟处理的策略,并且为了保证消息的及时性,在对应队列或者分区有新消息到来的时候都会提醒消息来了,及时返回消息。 一句话说就是消费者和 Broker 相互配合,拉取消息请求不满足条件的时候 hold 住,避免了多次频繁的拉取动作,当消息一到就提醒返回。
|