本文主要是普及一下RocketMQ的一些基础概念,为接下来的结群部署和消息实战做铺垫~
一、RocketMQ的消息模型
1、RocketMQ的基础消息模型,一个简单的Pub/Sub模型
上图就是一个基本的消息系统模型,包括生产者 (Producer),消费者 (Consumer),中间进行基于消息主题(Topic) 的消息传送。
在基于主题的系统中,消息被发布到主题或命名通道上。消费者将收到其订阅主题上的所有消息,生产者负责定义订阅者所订阅的消息类别。这是一个基础的概念模型,而在实际的应用中,结构会更复杂。例如为了支持高并发和水平扩展,中间的消息主题需要进行分区,同一个Topic会有多个生产者,同一个信息会有多个消费者,消费者之间要进行负载均衡等。
2、RocketMQ 扩展后的消息模型
上图就是一个扩展后的消息模型,包括两个生产者,两个消息Topic,以及两组消费者 Comsumer。
存储消息Topic的 代理服务器( Broker ),是实际部署过程对应的代理服务器。
- 为了消息写入能力的水平扩展,RocketMQ 对 Topic进行了分区,这种操作被称为队列(MessageQueue)。
- 为了消费能力的水平扩展,ConsumerGroup的概念应运而生。
3、RocketMQ的部署模型
Producer、Consumer又是如何找到Topic和Broker的地址呢?消息的具体发送和接收又是怎么进行的呢?当 当 当,RocketMQ的部署模型亮相啦~ 在接下来 系统结构 模型中详细讲解RocketMQ的系统架构~
二、RocketMQ的系统架构
先为大家罗列出各个组件以及作用
- Producer:消息的发送者,如:寄件人
- Producer Group :生产者组,生产者组是同一类生产者的集合,这类 Producer 发送相同 Topic 类型的消息。一个生产者组可以同时发送多个主题的消息。
- Consumer:消息的接受者,如:收件人
- Consumer Group:消费组,每一个 Consumer 实例都属于一个 Consumer Group。
(注意:每一条消息只会被同一个 Consumer Group 里的一个 Consumer 实例消费。广播模式除外,同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列。不同的 Consumer Group 可以同时消费同一条消息) - Broker:暂存和传输消息,如:快递公司
- NameServer:管理Broker,如:快递公司的管理机构
- Topic:区分消息的种类。
- 一个发送者可以发送消息给一个或者多个Topic
- 一个消息的接受者可以订阅一个或者多个Topic
- Message Queue:相当于 Topic 的分区,用于并行发送和接收消息
RocketMQ架构上主要由四部分构成:
1、Producer
消息生产者, 负责生产消息。 一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。
? 例如,业务系统产生的日志写入到 MQ 的过程,就是消息生产的过程。再如,电商平台中用户提交的秒杀请求写入到 MQ 的过程,就是消息生产的过程。
? RocketMQ 中的消息生产者都是以生产者组( Producer Group )的形式出现的。生产者组是同一类生产者的集合,这类 Producer 发送相同 Topic 类型的消息。一个生产者组可以同时发送多个主题的消息。
2、Consumer
? 消息消费者, 负责消费消息。一个消息消费者会从 Broker 服务器中获取到消息,并对消息进行相关业务处理 。一般是后台系统负责异步消费。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
- 支持以推(push),拉(pull)两种模式对消息进行消费。
- 同时也支持集群方式和广播方式的消费。
- 提供实时消息订阅机制,可以满足大多数用户的需求。
? 例如, QoS 系统从 MQ 中读取日志,并对日志进行解析处理的过程就是消息消费的过程。 ? 再如,电商平台的业务系统从 MQ 中读取到秒杀请求,并对请求进行处理的过程就是消息消费的过程。
? 为了消费能力的水平扩展,RocketMQ 中的消息消费者都是以消费者组 ( Consumer Group ) 的形式出现的. 消费者组是同一类消费者的集合, 这类 Consumer 消费的是同一个 Topic 类型的消息。
相同的ConsumerGroup下的消费者主要有两种负载均衡模式,即广播模式,和集群模式(图中是最常用的集群模式)。
-
在集群模式下,消费者组使得在消费消息方面,实现 负载均衡 和 容错 的目标变得非常容易 。同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费,如图中 ConsumerGroupA 订阅 TopicA,TopicA 对应 3个队列,则 GroupA 中的 Consumer1 消费的是 MessageQueue 0和 MessageQueue 1的消息,Consumer2是消费的是MessageQueue2的消息。
- 负载均衡:将一个 Topic 中的不同 Queue 平均分配给同一个 Consumer Group的不同的 Consumer, 注意:并不是将消息负载均衡。
- 容错:一个 Consumer 挂了, 该 Consumer Group 中的其他 Consumer 可以接着消费 原Consumer 消费的Queue
-
在广播模式下,同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列。需要注意的是,广播模式下因为每个 Consumer 实例都需要处理全部的消息,因此这种模式仅推荐在通知推送、配置同步类小流量场景使用。
需要注意以下问题:
- 消费者组在某一时刻只能消费一个 Topic 的消息,不能同时消费多个 Topic 消息;
- 一个消费者组中的消费者必须订阅完全相同的 Topic;
- 消费者组中 Consumer 的数量应该小于等于订阅 Topic 的 Queue 数量。如果超出 Queue 数量,则多出的Consumer 将不能消费消息。
- 一个Topic类型的消息可以被多个消费者组同时消费。(当多个消费者组订阅同一个Topic时)
3、Name Server
NameServer 是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。
? RocketMQ 的思想来自于 Kafka,而 Kafka 是依赖了 Zookeeper 的。所以,在 RocketMQ 的早期版本,即在 MetaQ v1.0 与 v2.0 版本中,也是依赖于 Zookeeper 的。从 MetaQ v3.0 ,即 RocketMQ 开始去掉了Zookeeper 依赖,使用了自己的 NameServer 。
其主要提供了以下两个功能:
- 管理 Broker ,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
- 路由信息管理,每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
详细谈谈 Name Server 的功能吧~
3.1、路由注册
NameServer通常会有多个实例部署,不过, NameServer 是无状态的,即 NameServer 集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。那么各节点是如何进行数据同步的呢?
注意,这是与其它像 zk 、 Eureka 、 Nacos 等注册中心不同的地方。那么NameServer这种无状态的方式,有什么优缺点呢?
- 优点:NameServer 集群搭建简单,扩容简单。
- 缺点:对于 Broker ,必须明确指出所有 NameServer 地址,否则未指出的将不会去注册。也正因为如此,NameServer 并不能随便扩容。因为,若 Broker 不重新配置,新增的 NameServer 对于 Broker 来说是不可见的,即其不会向这个 NameServer 进行注册。
Broker 节点是如何证明自己活着呢?
? Broker为了维护与 NameServer 间的长连接,会将最新的信息以 心跳包 的 方式上报给 NameServer ,每 30 秒发送一次心跳。心跳包中包含 BrokerId 、 Broker 地址 (IP+Port) 、 Broker 名称以及Broker 所属集群名称等等。 NameServer 在接收到心跳包后,会更新心跳时间戳, 记录这个 Broker 的最新存活时间。
3.2、路由剔除
由于Broker 关机、宕机或网络抖动等原因,NameServer在超过一定时间内没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除。
? 实现原理就是:NameServer 中有一个定时任务,每隔10秒就会扫描一次Broker表,查看每一个Broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定Broker失效,然后将其从Broker列表中剔除 。
扩展:对于 RocketMQ 日常运维工作,例如 Broker 升级,需要停掉 Broker 的工作。 OP(运维工程师)需要怎么做?
OP 需要将 Broker 的读写权限禁掉。一旦 client(Consumer 或 Producer) 向 broker 发送请求,都会收到 broker 的 NO_PERMISSION 响应,然后 client 会进行对其它 Broker 的重试。当 OP 观察到这个 Broker 没有流量后,再关闭它,实现 Broker 从 NameServer 的移除。
3.3、路由发现
我们先来谈谈路由发现有哪几种模型?
- Push 模型,即 推送模型。其实时性较好,是一个 “ 发布 - 订阅 ” 模型,需要维护一个长连接。而长连接的维护是需要资源成本的。该模型适合于的场景:实时性要求较高 Client 数量不多, Server 数据变化较频繁。
- Pull 模型,即 拉取模型。存在的问题是,实时性较差。
- Long Polling 模型,即长轮询模型。其是对 Push 与 Pull 模型的整合,充分利用了这两种模型的优势,屏蔽了它们的劣势。
而RocketMQ 的路由发现采用的是 Pull 模型。当 Topic 路由信息出现变化时, NameServer 不会主动推送给客户端,而是客户端定时拉取主题最新的路由。默认客户端每 30 秒会拉取一次最新的路由。
3.4、Client 对 NameServer选择策略
这里的Client(客户端)指的是 Producer 与 Consumer
客户端在配置时必须要写上 NameServer 集群的地址,那么客户端到底连接的是哪个 NameServer 节点 呢?
? 客户端首先会生产一个随机数,然后再与NameServer 节点数量取模,此时得到的就是所要连接的节点索引,然后就会进行连接。如果连接失败,则会采用round-robin 策略,逐个尝试着去连 接其它节 点。即 : 首先采用的是 随机策略 进行的选择,失败后采用的是 轮询策略 。
4、Broker
4.1、Broker概述
功能介绍
? Broker 充当着消息中转角色, 负责存储消息、转发消息. Broker 在 RocketMQ 系统中负责接收并存储从生产者发送来的消息, 同时为消费者的拉取请求作准备. Broker 同时也存储着消息相关的元数据, 包括消费者组消费进度偏移offset、主题、队列等 .
模块构成
下图为 Broker Server 的功能模块示意图。 Remoting Moudle,整个 Broker 的实体, 负责处理来自 Client 端的请求. 而这个 Broker 实体则由以下模块构成
- Client Manager,客户端管理器。负责接收、解析客户端(Producer/Consumer)请求,管理客户端。例如:维护 Consumer 的 Topic 订阅信息。
- Store Service,存储服务。提供方便简单的 API 接口, 处理 消息存储到物理硬盘 和 消息查询 功能。
- HA Service,高可用服务。提供 Master Broker 和 Slave Broker 之间的数据同步功能。
- Index Service,索引服务。根据特定的 Message Key,对投递到 Broker 的消息进行索引服务,同时也提供根据 Message Key 对消息进行快速查询的功能。
4.2、关于Broker的集群部署
NameServer几乎无状态节点,因此可集群部署,节点之间无任何信息同步。Broker部署相对复杂。
? 为了增强 Broker 性能与吞吐量, Broker 一般都是以集群形式出现的。各集群节点中可能存放着相同Topic 的不同 Queue 。不过,这里有个问题,如果某Broker 节点宕机,如何保证数据不丢失呢?其解决方案是,将每个Broker 集群节点进行横向扩展,即将 Broker 节点再建为一个 HA 集群,解决单点问 题。 Broker 节点集群是一个主从集群,即集群中具有 Master 与 Slave 两种角色。
? 在 Master-Slave 架构中,Broker 分为 Master 与 Slave。一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。Master 与 Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。
- Master 负责处理读写操作请求;
- Slave 负责对 Master 中的数据进行备份。
当 Master 挂掉了, Slave 则会自动切换为 Master 去工作。所以这个 Broker 集群是主备集群。
部署模型小结
- 每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所 NameServer。
- Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
- Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。
梳理一下整个工作流程
- 启动 NameServer , NameServer 启动后开始监听端口,等待 Broker 、 Producer 、 Consumer 连接。
- 启动 Broker 时, Broker 会与所有的 NameServer 建立并保持长连接,然后每 30 秒向 NameServer 定时 发送心跳包。
- 发送消息前,可以先创建 Topic ,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,当然,在创建 Topic 时也会将 Topic 与 Broker 的关系写入到 NameServer 中。不过,这步是可选的,也可以在发送消息时自动创建Topic 。
- Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取路由信息,即当前发送的 Topic 消息的 Queue 与 Broker 的地址( IP+Port )的映射关系。然后根据算法策略从队选择一个Queue ,与队列所在的 Broker 建立长连接从而向 Broker 发消息。当然,在获取到路由信息后, Producer 会首先将路由信息缓存到本地,再每 30 秒从NameServer更新一次路由信息。
- Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取其所订阅 Topic 的路由信息,然后根据算法策略从路由信息中获取到其所要消费的 Queue ,然后直接跟 Broker 建立长连接,开始消费其中的消息。Consumer 在获取到路由信息后,同样也会每 30 秒从 NameServer 更新一次路由信息。不过 不同于 Producer 的是, Consumer 还会向 Broker 发送心跳,以确保Broker的存活状态。
4.3、创建 Topic
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
手动创建 Topic 时,有两种模式:
- 集群模式 :该模式下创建的 Topic 在该集群中,所有 Broker 中的 Queue 数量是相同的 。
- Broker 模式 :该模式下创建的 Topic 在该集群中,每个 Broker 中的 Queue 数量可以不同。
自动创建 Topic 时,默认采用的是 Broker 模式,会为每个 Broker 默认创建 4 个 Queue 。
关于读写队列
? 从物理上来讲,读 / 写队列是同一个队列。所以,不存在读 / 写队列数据同步问题。读 / 写队列是逻辑上进行区分的概念。一般情况下,读 / 写队列数量是相同的。
-
如:当写队列 小于 读队列时 ? 创建 Topic 时设置的写队列数量为 4 ,读队列数量为 8 ,此时系统会创建 8 个 Queue ,分别是 0 1 2 3 4 5 6 7 。 Producer 会将消息写入到 0 1 2 3 这 四 个队列,但Consumer 只会消费 0 1 2 3 4 5 6 7 这 八 个队列中 的消息,但是 4 5 6 7 中是没有消息的。 此时假设 Consumer Group 中包含两个 Consuer , Consumer1 消 费 0 1 2 3 ,而 Consumer2 消费 4 5 6 7 。但实际情况是, Consumer2 是没有消息可消费的。 -
如:当写队列 大于 读队列时 ? 创建 Topic 时设置的写队列数量为 8 ,读队列数量为 4 ,此时系统会创建 8 个 Queue ,分别是 0 1 2 3 4 5 6 7 。 Producer 会将消息写入到这 8 个队列,但 Consumer 只会消费 0 1 2 3 这 四 个队列中的消息, 4 5 6 7 中的消息是不会被消费到的。
也就是说,当读 / 写队列数量设置不同时,总是有问题的。那么,为什么要这样设计呢?
其这样设计的目的是为了,方便 Topic 的 Queue 的缩容(将不需要的站点从原网络中移除的过程)。
如:原来创建的 Topic 中包含 16 个 Queue ,如何能够使其 Queue 缩容为 8 个,还不会丢失消息?
? 可以动态修改写队列数量为 8 ,读队列数量不变。此时新的消息只能写入到前 8 个队列,而消费都消费的却是 16 个队列中的数据。当发现后 8 个 Queue 中的消息消费完毕后,就可以再将读队列数量动态设置为 8 。整个缩容过程,没有丢失任何消息。
perm 用于设置对当前创建 Topic的操作权限:
三、RocketMQ中的消息
首先什么是消息?
消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题 。
RocketMQ 消息构成非常简单,如下图所示。
- topic,表示要发送的消息的主题。
- body 表示消息的存储内容
- properties 表示消息属性
- transactionId 会在事务消息中使用。
- Tag: 不管是 RocketMQ 的 Tag 过滤还是延迟消息等都会利用 Properties 消息属性机制,这些特殊信息使用了系统保留的属性Key,设置自定义属性时需要避免和系统属性Key冲突。
- Keys: 服务器会根据 keys 创建哈希索引,设置后,可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能保证 key 唯一,例如订单号,商品 Id 等。注意:使用过程中需要避免RocketMQ系统保留的属性key。
Message 可以设置的属性值包括:
字段名 | 默认值 | 必要性 | 说明 |
---|
Topic | null | 必填 | 消息所属 topic 的名称 | Body | null | 必填 | 消息体 | Tags | null | 选填 | 消息标签,方便服务器过滤使用。目前只支持每个消息设置一个 | Keys | null | 选填 | 代表这条消息的业务关键词 | Flag | 0 | 选填 | 完全由应用来设置,RocketMQ 不做干预 | DelayTimeLevel | 0 | 选填 | 消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费 | WaitStoreMsgOK | true | 选填 | 表示消息是否在服务器落盘后才返回应答。 |
1、Topic 主题
Topic 表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,Topic 是 RocketMQ 进行消息订阅的基本单位 .
- Topic:message 1:n,主题与消息是一对多的关系。
- Message:Topic 1:1,消息与主题是一对一的关系。
一个生产者可以同时发送多条 Topic 的消息,而每一个消费者只对某种特定的 Topic 感兴趣,即只可以订阅 和 消费一种 Topic 的消息 .
- Producer:Topic 1:n,消息生产者与主题是一对多的关系。
- Consumer:Topic 1:1,消息消费者与主题是一对一的关系。
2、Tag 标签
Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。使用 Tag 可以实现对 Topic 中的消息进行过滤。
- Topic:消息主题,通过 Topic 对不同的业务消息进行分类。
- Tag:消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出即带上的属性。
Topic 和 Tag 的关系如下图所示~
eg:一天黎明港口来了许多集装箱(消息),每个集装箱外都会有信息牌,我们可以根据信息知道集装箱中是什么货物(Topic)以及 货物来自于哪里(Tag)。
tag= 上海
tag= 江苏
tag= 浙江
------- 消费者 ---0--
topic= 货物A tag = 上海
topic= 货物B tag = 上海 | 浙江
topic= 货物C tag = *
既然Topic和Tag都是业务上用来归类的标识,那我们什么时候该用Topic,什么时候该用 Tag?
我们可以从以下几个方面进行判断:
- 消息类型是否一致:如普通消息、事务消息、定时(延迟)消息、顺序消息,不同的消息类型使用不同的Topic,无法通过Tag进行区分。
- 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫消息,电气类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
- 消息优先级是否一致:如同样是物流消息,河马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
- 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的Topic。
总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。
3、Queue 队列
为了支持高并发和水平扩展,需要对 Topic 进行分区,在 RocketMQ 中这被称为队列,一个 Topic 可能有多个队列,并且可能分布在不同的 Broker 上。
? 队列,是存储消息的物理实体。一个 Topic 中可以包含多个 Queue,每个 Queue 中存放的就是该 Topic 的消息. 一个 Topic 的 Queue 也被称为一个 Topic 中消息的分区 (Partrion) 。 注意:一个 Topic 的 Queue 中的消息只能被一个消费者组中的一个消费者消费,一个 Queue 中的消息不允许同一个消费者组中的多个消费者同时消费 。
在学习参考其它相关资料时,还会看到一个概念:分片( Sharding )。分片不同于分区。在 RocketMQ中,分片指的是存放相应 Topic 的 Broker 。 每个分片中会创建出相应数量的分区,即Queue,每个 Queue 的大小都是相同的。
一般来说一条消息,如果没有重复发送(比如因为服务端没有响应而进行重试),则只会存在在 Topic 的其中一个队列中,消息在队列中按照先进先出的原则存储,每条消息会有自己的位点,每个队列会统计当前消息的总条数,这个称为最大位点 MaxOffset;队列的起始位置对应的位置叫做起始位点 MinOffset。队列可以提升消息发送和消费的并发度。
4、MessageId/Key 消息标识
? RocketMQ 中每个消息拥有唯一的 MessageId ,且可以携带具有业务标识的 Key ,以方便对消息的查询。
? 不过需要注意的是, MessageId 有两个,当生产者在发送消息时会自动生成一个 MessageID,即 msgId 。当消息到达 Broker 后,Broker 也会自动生成一个 MessageID,即 offsetMsgId 。msgId 、offsetMsgId与key 都 称为消息标识。
- msgId,由 Producer 端生成,其生成规则为: producerIp + 进程pid + MessageClientIDServer 类的ClassLoader的hashCode + 当前时间 + AutomicInteger自增计数器;
- offserMsgId,由 Broker 端生成,其生成规则为:brokerIp + 物理分区的offset(Queue的偏移量)
Key,由用户制定的业务相关的唯一标识
? RocketMq 每个消息可以在业务层面的设置唯一标识,即 key 字段,方便将来定位消息丢失问题。应用可以通过 topic、key 来查询这条消息内容,以及消息被谁消费。
? 由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
// 订单Id
String orderId = "20034568923546";
message.setKeys(orderId);
|