1. 为什么需要消息系统
1.1 削峰
数据库的处理能力是有限的,在峰值期,过多的请求落到后台,一旦超过系统的处理能力,可能会使系统挂掉。
如上图所示,系统的处理能力是 2k/s,MQ 处理能力是 8k/s,峰值请求 5k/s,MQ 的处理能力远远大于数据库,在高峰期,请求可以先积压在 MQ 中,系统可以根据自身的处理能力以 2k/s 的速度消费这些请求。
这样等高峰期一过,请求可能只有 100/s,系统可以很快的消费掉积压在 MQ 中的请求。
注意,上面的请求指的是写请求,查询请求一般通过缓存解决。
1.2 解耦
如下场景,S 系统与 A、B、C 系统紧密耦合。由于需求变动,A 系统修改了相关代码,S 系统也需要调整 A 相关的代码。
过几天,C 系统需要删除,S 紧跟着删除 C 相关代码;又过了几天,需要新增 D 系统,S 系统又要添加与 D 相关的代码;再过几天…
这样各个系统紧密耦合,不利于维护,也不利于扩展。现在引入 MQ,A 系统变动,A 自己修改自己的代码即可;C 系统删除,直接取消订阅;D 系统新增,订阅相关消息即可。
这样通过引入消息中间件,使各个系统都与 MQ 交互,从而避免它们之间的错综复杂的调用关系。
2. Kafka 架构原理
Kafka 是一种分布式的,基于发布/订阅的消息系统,主要设计目标如下:
- 以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输。
- 支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 Partition 内的消息顺序传输。
- 同时支持离线数据处理和实时数据处理。
- Scale out:支持在线水平扩展。
Kafka 相关概念:
- Producer:Producer 即生产者,消息的产生者,是消息的入口。
- Kafka Cluster:
Broker:Broker 是 Kafka 实例,每个服务器上有一个或多个 Kafka 的实例,我们姑且认为每个 Broker 对应一台服务器。它接受生产者发送的消息并存入磁盘。Broker 同时服务消费者拉取分区消息的请求,返回目前已经提交的消息 每个 Kafka 集群内的 Broker 都有一个不重复的编号,如图中的 Broker-0、Broker-1 等…… Topic:消息的主题,可以理解为消息的分类,Kafka 的数据就保存在 Topic。在每个 Broker 上都可以创建多个 Topic。每条消息都属于某个 Topic,不同的 Topic 之间是相互独立的,即 Kafka 是面向 Topic 的。 Partition:Topic 的分区,每个 Topic 可以有多个分区,分区的作用是做负载,提高 Kafka 的吞吐量。 同一个 Topic 在不同的分区的数据是不重复的,Partition 的表现形式就是一个一个的文件夹! Replication:每一个分区都有多个副本,副本的作用是保障 Partition 的高可用。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为 Leader。 在 Kafka 中默认副本的最大数量是 10 个,且副本的数量不能大于 Broker 的数量,Follower 和 Leader 绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。 - Message:每一条发送的消息主体。
- Consumer:消费者,即消息的消费方,是消息的出口。
- Consumer Group:我们可以将多个消费者组成一个消费者组,在 Kafka 的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。但可以被多个消费者组消费。
同一个消费者组的消费者可以消费同一个 Topic 的不同分区的数据,这也是为了提高 Kafka 的吞吐量! - Leader:Replica 中的一个角色, Producer 和 Consumer 只跟 Leader 交互。
- Follower:Replica 中的一个角色,从 Leader 中复制数据。
- Controller:Kafka 集群中的其中一个服务器,用来进行 Leader Election 以及各种 Failover。
Zookeeper:Kafka 集群依赖 Zookeeper 来保存集群的的元信息,来保证系统的可用性。
2.1 Topic and Logs
Message 是按照 Topic 来组织的,每个 Topic 可以分成多个 Partition(对应 server.properties/num.partitions)。
Partition 是一个顺序的追加日志,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 Kafka 吞吐率)。
是的,Kafka 的消息是存在于文件系统之上的。Kafka 高度依赖文件系统来存储和缓存消息,一般的人认为 “磁盘是缓慢的”,所以对这样的设计持有怀疑态度。
实际上,磁盘比人们预想的快很多也慢很多,这取决于它们如何被使用;一个好的磁盘结构设计可以使之跟网络速度一样快。
现代的操作系统针对磁盘的读写已经做了一些优化方案来加快磁盘的访问速度。
比如,预读会提前将一个比较大的磁盘快读入内存。后写会将很多小的逻辑写操作合并起来组合成一个大的物理写操作。
并且,操作系统还会将主内存剩余的所有空闲内存空间都用作磁盘缓存,所有的磁盘读写操作都会经过统一的磁盘缓存(除了直接 I/O 会绕过磁盘缓存)。
综合这几点优化特点,如果是针对磁盘的顺序访问,某些情况下它可能比随机的内存访问都要快,甚至可以和网络的速度相差无几。
其结构如下:server.properties/num.partitions 表示文件 server.properties 中的 num.partitions 配置项,下同。
Partition 中的每条记录(Message)包含三个属性:Offset,messageSize 和 Data。
其中 Offset 表示消息偏移量;messageSize 表示消息的大小;Data 表示消息的具体内容。
Partition 是以文件的形式存储在文件系统中,位置由 server.properties/log.dirs 指定,其命名规则为 <topic_name>-<partition_id>。
比如,Topic 为"page_visits"的消息,分为 5 个 Partition,其目录结构为:
Partition 可能位于不同的 Broker 上,Partition 是分段的,每个段是一个 Segment 文件。Segment 文件又由 index file 和 data file 组成,他们总是成对出现,后缀 “.index” 和 “.log” 分表表示 Segment 索引文件和数据文件。
Segment 是 Kafka 文件存储的最小单位。Segment 文件命名规则:Partition 全局的第一个 Segment 从 0 开始,后续每个 Segment 文件名为上一个 Segment 文件最后一条消息的 offset 值。
数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。如 00000000000000368769.index 和 00000000000000368769.log。
Segment的常用配置有:
#server.properties
#segment文件的大小,默认为 1G
log.segment.bytes=1024*1024*1024
#滚动生成新的segment文件的最大时长
log.roll.hours=24*7
#segment文件保留的最大时长,超时将被删除
log.retention.hours=24*7
Partition 目录下包括了数据文件和索引文件,下图是某个 Partition 的目录结构:
Index 采用稀疏存储的方式,它不会为每一条 Message 都建立索引,而是每隔一定的字节数建立一条索引,避免索引文件占用过多的空间。
缺点是没有建立索引的 Offset 不能一次定位到 Message 的位置,需要做一次顺序扫描,但是扫描的范围很小。
索引包含两个部分(均为 4 个字节的数字),分别为相对 Offset 和 Position。
相对 Offset 表示 Segment 文件中的 Offset,Position 表示 Message 在数据文件中的位置。
其中以索引文件中元数据 ??, 497> 为例,依次在数据文件中表示第 3 个 Message(在全局 Partition 表示第 368769 + 3 = 368772 个 message)以及该消息的物理偏移地址为 497。
注意该 Index 文件并不是从0开始,也不是每次递增 1 的,这是因为 Kafka 采取稀疏索引存储的方式,每隔一定字节的数据建立一条索引。
它减少了索引文件大小,使得能够把 Index 映射到内存,降低了查询时的磁盘 IO 开销,同时也并没有给查询带来太多的时间消耗。
因为其文件名为上一个 Segment 最后一条消息的 Offset ,所以当需要查找一个指定 Offset 的 Message 时,通过在所有 Segment 的文件名中进行二分查找就能找到它归属的 Segment。
再在其 Index 文件中找到其对应到文件上的物理位置,就能拿出该 Message。
Kafka 是如何准确的知道 Message 的偏移的呢?这是因为在 Kafka 定义了标准的数据存储结构,在 Partition 中的每一条 Message 都包含了以下三个属性:
- Offset:表示 Message 在当前 Partition 中的偏移量,是一个逻辑上的值,唯一确定了 Partition 中的一条 Message,可以简单的认为是一个 ID。
- MessageSize:表示 Message 内容 Data 的大小。
- Data:Message 的具体内容。
总结:Kafka 的 Message 存储采用了分区(Partition),磁盘顺序读写,分段(LogSegment)和稀疏索引这几个手段来达到高效性。
2.2 Partition and Replica
一个 Topic 物理上分为多个 Partition,位于不同的 Broker 上。如果没有 Replica,一旦 Broker 宕机,其上所有的 Patition 将不可用。
每个 Partition 可以有多个Replica(对应server.properties/default.replication.factor),分配到不同的 Broker 上。
其中有一个 Leader 负责读写,处理来自 Producer 和 Consumer 的请求;其他作为 Follower 从 Leader Pull 消息,保持与 Leader 的同步。
如何分配 Partition 和 Replica 到 Broker 上?步骤如下:
- 将所有 Broker(假设共 n 个 Broker)和待分配的 Partition 排序。
- 将第 i 个 Partition 分配到第(i mod n)个 Broker 上。
- 将第 i 个 Partition 的第 j 个 Replica 分配到第((i + j) mode n)个 Broker 上。
根据上面的分配规则,若 Replica 的数量大于 Broker 的数量,必定会有两个相同的 Replica 分配到同一个 Broker 上,产生冗余。因此 Replica 的数量应该小于或等于 Broker 的数量。
2.3 Leader 选举
Kafka 在 Zookeeper 中(/brokers/topics/[topic]/partitions/[partition]/state)动态维护了一个 ISR(in-sync replicas)。
ISR 里面的所有 Replica 都"跟上"了 Leader,Controller 将会从 ISR 里选一个做 Leader。
具体流程如下:
- Controller 在 Zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 Broker 宕机时 Zookeeper 会 Fire Watch。
- Controller 从 /brokers/ids 节点读取可用 Broker。
- Controller 决定 set_p,该集合包含宕机 Broker 上的所有 Partition。
- 对 set_p 中的每一个 Partition,从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR,决定新 Leader,将新 Leader、ISR、controller_epoch 和 leader_epoch 等信息写入 State 节点。
- 通过 RPC 向相关 Broker 发送 leaderAndISRRequest 命令。
当 ISR 为空时,会选一个 Replica(不一定是 ISR 成员)作为 Leader;当所有的 Replica 都歇菜了,会等任意一个 Replica 复活,将其作为 Leader。
ISR(同步列表)中的 Follower 都"跟上"了Leader,"跟上"并不表示完全一致,它由 server.properties/replica.lag.time.max.ms 配置。
表示 Leader 等待 Follower 同步消息的最大时间,如果超时,Leader 将 Follower 移除 ISR。配置项 replica.lag.max.messages 已经移除。
2.4 Replica 同步
Kafka 通过"拉模式"同步消息,即 Follower 从 Leader 批量拉取数据来同步。
具体的可靠性,是由生产者(根据配置项 producer.properties/acks)来决定的。
In Kafka 0.9,request.required.acks=-1 which configration of producer is replaced by acks=all, but this old config is remained in docs. 在 0.9 版本,生产者配置项 request.required.acks=-1 被 acks=all 取代,但是老的配置项还保留在文档中。 PS:最新的文档 2.2.x request.required.acks 已经不存在了。
在 Acks=-1 的时候,如果 ISR 少于 min.insync.replicas 指定的数目,将会抛出 NotEnoughReplicas 或 NotEnoughReplicasAfterAppend 异常。
这里 ISR 列表中的机器是会变化的,根据配置 replica.lag.time.max.ms,多久没同步,就会从 ISR 列表中剔除。
以前还有根据落后多少条消息就踢出 ISR,在 1.0 版本后就去掉了,因为这个值很难取,在高峰的时候很容易出现节点不断的进出 ISR 列表。
从 ISA 中选出 Leader 后,Follower 会把自己日志中上一个高水位后面的记录去掉,然后去和 Leader 拿新的数据。
因为新的 Leader 选出来后,Follower 上面的数据,可能比新 Leader 多,所以要截取。
这里高水位的意思,对于 Partition 和 Leader,就是所有 ISR 中都有的最新一条记录。消费者最多只能读到高水位。
从 Leader 的角度来说高水位的更新会延迟一轮,例如写入了一条新消息,ISR 中的 Broker 都 Fetch 到了,但是 ISR 中的 Broker 只有在下一轮的 Fetch 中才能告诉 Leader。
也正是由于这个高水位延迟一轮,在一些情况下,Kafka 会出现丢数据和主备数据不一致的情况,0.11 开始,使用 Leader Epoch 来代替高水位。
2.5 多集群
随着业务发展,我们往往需要多集群,通常处于下面几个原因:
当构建多个数据中心时,往往需要实现消息互通。举个例子,假如用户修改了个人资料,那么后续的请求无论被哪个数据中心处理,这个更新需要反映出来。又或者,多个数据中心的数据需要汇总到一个总控中心来做数据分析。
上面说的分区复制冗余机制只适用于同一个 Kafka 集群内部,对于多个 Kafka 集群消息同步可以使用 Kafka 提供的 MirrorMaker 工具。
本质上来说,MirrorMaker 只是一个 Kafka 消费者和生产者,并使用一个队列连接起来而已。它从一个集群中消费消息,然后往另一个集群生产消息。
3. Producer 如何发送消息
Producer 首先将消息封装进一个 ProducerRecord 实例中。
消息路由:
- 发送消息时如果指定了 Partition,则直接使用。
- 如果指定了 Key,则对 Key 进行哈希,选出一个 Partition。这个 Hash(即分区机制)由 producer.properties/partitioner.class 指定的类实现,这个路由类需要实现 Partitioner 接口。
- 如果都未指定,通过 Round-Robin 来选 Partition。
消息并不会立即发送,而是先进行序列化后,发送给 Partitioner,也就是上面提到的 Hash 函数,由 Partitioner 确定目标分区后,发送到一块内存缓冲区中(发送队列)。
Producer 的另一个工作线程(即 Sender 线程),则负责实时地从该缓冲区中提取出准备好的消息封装到一个批次内,统一发送到对应的 Broker 中。Kafka Produce 都是批量请求,会积攒一批,然后一起发送,不是调 send() 就立刻进行网络发包。
当 Broker 接收到消息后,如果成功写入则返回一个包含消息的主题、分区及位移的 RecordMetadata 对象,否则返回异常。
生产者接收到结果后,对于异常可能会进行重试。
其过程大致是这样的:
4. Consumer 如何消费消息
每个 Consumer 都划归到一个逻辑 Consumer Group 中,一个 Partition 只能被同一个 Consumer Group 中的一个 Consumer 消费,但可以被不同的 Consumer Group 消费。
若 Topic 的 Partition 数量为 p,Consumer Group 中订阅此 Topic 的 Consumer 数量为 c, 则:
p < c: 会有 c - p 个 consumer闲置,造成浪费
p > c: 一个 consumer 对应多个 partition
p = c: 一个 consumer 对应一个 partition
应该合理分配 Consumer 和 Partition 的数量,避免造成资源倾斜,最好 Partiton 数目是 Consumer 数目的整数倍。
4.1 如何将 Partition 分配给 Consumer
生产过程中 Broker 要分配 Partition,消费过程这里,也要分配 Partition 给消费者。
类似 Broker 中选了一个 Controller 出来,消费也要从 Broker 中选一个 Coordinator,用于分配 Partition。
下面从顶向下,分别阐述一下:
- 怎么选 Coordinator
- 交互流程
- Reblance 的流程
4.1.1 选 Coordinator
看 Offset 保存在那个 Partition;该 Partition Leader 所在的 Broker 就是被选定的 Coordinator。
这里我们可以看到,Consumer Group 的 Coordinator,和保存 Consumer Group Offset 的 Partition Leader 是同一台机器。
4.1.2 交互流程
把 Coordinator 选出来之后,就是要分配了。整个流程是这样的:
- Consumer 启动、或者 Coordinator 宕机了,Consumer 会任意请求一个 Broker,发送 ConsumerMetadataRequest 请求。
Broker 会按照上面说的方法,选出这个 Consumer 对应 Coordinator 的地址。 - Consumer 发送 Heartbeat 请求给 Coordinator,返回 IllegalGeneration 的话,就说明 Consumer 的信息是旧的了,需要重新加入进来,进行 Reblance。
返回成功,那么 Consumer 就从上次分配的 Partition 中继续执行。
4.1.3 Reblance
当 Partition 或 Consumer 数量发生变化时,比如增加 Consumer,减少 Consumer(主动或被动),增加 Partition,都会进行 Rebalance。
其过程如下:
- Consumer 给 Coordinator 发送 JoinGroupRequest 请求。这时其他 Consumer 发 Heartbeat 请求过来时,Coordinator 会告诉他们,要 Rebalance了。其他 Consumer 也发送 JoinGroupRequest 请求。
- Coordinator 在 Consumer 中选出一个 Leader,其他作为 Follower,通知给各个 Consumer,对于 Leader,还会把 Follower 的 Metadata 带给它。
- Consumer Leader 根据 Consumer Metadata 重新分配 Partition。
- Consumer 向 Coordinator 发送 SyncGroupRequest,其中 Leader 的 SyncGroupRequest 会包含分配的情况。Coordinator 回包,把分配的情况告诉 Consumer,包括 Leader。
重平衡是 Kafka 一个很重要的性质,这个性质保证了高可用和水平扩展。
不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。
而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。
消费者通过定期发送心跳(Hearbeat)到一个作为组协调者(Group Coordinator)的 Broker 来保持在消费组内存活。
这个 Broker 不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。
如果消费者超过一定时间没有发送心跳,那么它的会话(Session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。
可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费。
通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。
在 0.10.1 版本,Kafka 对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。
另外更高版本的 Kafka 支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。
4.2 Consumer Fetch Message
Consumer 采用"拉模式"消费消息,这样 Consumer 可以自行决定消费的行为。
Consumer 调用 Poll(duration)从服务器拉取消息。拉取消息的具体行为由下面的配置项决定:
#consumer.properties
#消费者最多 poll 多少个 record
max.poll.records=500
#消费者 poll 时 partition 返回的最大数据量
max.partition.fetch.bytes=1048576
#Consumer 最大 poll 间隔
#超过此值服务器会认为此 consumer failed
#并将此 consumer 踢出对应的 consumer group
max.poll.interval.ms=300000
在 Partition 中,每个消息都有一个 Offset。新消息会被写到 Partition 末尾(最新的一个 Segment 文件末尾), 每个 Partition 上的消息是顺序消费的,不同的 Partition 之间消息的消费顺序是不确定的。
若一个 Consumer 消费多个 Partition, 则各个 Partition 之前消费顺序是不确定的,但在每个 Partition 上是顺序消费。
若来自不同 Consumer Group 的多个 Consumer 消费同一个 Partition,则各个 Consumer 之间的消费互不影响,每个 Consumer 都会有自己的 Offset。
Consumer A 和 Consumer B 属于不同的 Consumer Group。Cosumer A 读取到 Offset=9, Consumer B 读取到 Offset=11,这个值表示下次读取的位置。
也就是说 Consumer A 已经读取了 Offset 为 0~8 的消息,Consumer B 已经读取了 Offset 为 0~10 的消息。
下次从 Offset=9 开始读取的 Consumer 并不一定还是 Consumer A 因为可能发生 Rebalance。
5. Offset 如何保存
Consumer 消费 Partition 时,需要保存 Offset 记录当前消费位置。
Offset 可以选择自动提交或调用 Consumer 的 commitSync() 或 commitAsync() 手动提交,相关配置为:
#是否自动提交 offset
enable.auto.commit=true
#自动提交间隔。enable.auto.commit=true 时有效
auto.commit.interval.ms=5000
以前保存在 ZK 中,由于 ZK 的写性能不好,以前的解决方法都是 Consumer 每隔一分钟上报一次。
这里 ZK 的性能严重影响了消费的速度,而且很容易出现重复消费。在 0.10 版本后,Kafka 把这个 Offset 的保存,从 ZK 总剥离,保存在名叫 __consumeroffsets 的 Topic 中。
写消息的 Key 由 GroupId、Topic、Partition 组成,Value 是 Offset。Topic 配置的清理策略是 Compact。总是保留最新的 Key,其余删掉。
一般情况下,每个 Key 的 Offset 都是缓存在内存中,查询的时候不用遍历 Partition,如果没有缓存,第一次就会遍历 Partition 建立缓存,然后查询返回。
__consumeroffsets 的 Partition 数量由下面的 Server 配置决定:
offsets.topic.num.partitions=50
Offset 保存在哪个分区上,即 __consumeroffsets 的分区机制,可以表示为:
groupId.hashCode() mode groupMetadataTopicPartitionCount
groupMetadataTopicPartitionCount 是上面配置的分区数。因为一个 Partition 只能被同一个 Consumer Group 的一个 Consumer 消费,因此可以用 GroupId 表示此 Consumer 消费 Offeset 所在分区。
6. 消息投递语义
Kafka 支持 3 种消息投递语义:
- At most once:最多一次,消息可能会丢失,但不会重复。
- At least once:最少一次,消息不会丢失,可能会重复。
- Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11 中实现,仅限于下游也是 Kafka)
在业务中,常常都是使用 At least once 的模型,如果需要可重入的话,往往是业务自己实现。
6.1 At most once
先获取数据,再 Commit Offset,最后进行业务处理:
- 生产者生产消息异常,不管,生产下一个消息,消息就丢了。
- 消费者处理消息,先更新 Offset,再做业务处理,做业务处理失败,消费者重启,消息就丢了。
6.2 At least once
先获取数据,再进行业务处理,业务处理成功后 Commit Offset:
- 生产者生产消息异常,消息是否成功写入不确定,重做,可能写入重复的消息。
- 消费者处理消息,业务处理成功后,更新 Offset 失败,消费者重启的话,会重复消费。
6.3 Exactly once
思路是这样的,首先要保证消息不丢,再去保证不重复。所以盯着 At least once 的原因来搞。
首先想出来的:
- 生产者重做导致重复写入消息:生产保证幂等性。
- 消费者重复消费:消灭重复消费,或者业务接口保证幂等性重复消费也没问题。
由于业务接口是否幂等,不是 Kafka 能保证的,所以 Kafka 这里提供的 Exactly once 是有限制的,消费者的下游也必须是 Kafka。
所以以下讨论的,没特殊说明,消费者的下游系统都是 Kafka(注:使用 Kafka Conector,它对部分系统做了适配,实现了 Exactly once)。生产者幂等性好做,没啥问题。
解决重复消费有两个方法:
- 下游系统保证幂等性,重复消费也不会导致多条记录。
- 把 Commit Offset 和业务处理绑定成一个事务。
本来 Exactly once 实现第 1 点就 OK 了。但是在一些使用场景下,我们的数据源可能是多个 Topic,处理后输出到多个 Topic,这时我们会希望输出时要么全部成功,要么全部失败。这就需要实现事务性。
既然要做事务,那么干脆把重复消费的问题从根源上解决,把 Commit Offset 和输出到其他 Topic 绑定成一个事务。
6.4 生产幂等性
思路是这样的,为每个 Producer 分配一个 Pid,作为该 Producer 的唯一标识。
Producer 会为每一个 Broker 维护一个单调递增的 Seq。类似的,Broker 也会为每个 Producer 记录下最新的 Seq。
当 req_seq == broker_seq+1 时,Broker 才会接受该消息,因为:
- 消息的 Seq 比 Broker 的 Seq 大超过时,说明中间有数据还没写入,即乱序了。
- 消息的 Seq 不比 Broker 的 Seq 小,那么说明该消息已被保存。
6.5 事务性/原子性广播
场景是这样的:
- 先从多个源 Topic 中获取数据。
- 做业务处理,写到下游的多个目的 Topic。
- 更新多个源 Topic 的 Offset。
其中第 2、3 点作为一个事务,要么全成功,要么全失败。这里得益于 Offset 实际上是用特殊的 Topic 去保存,这两点都归一为写多个 Topic 的事务性处理。
基本思路是这样的:
- 引入 Tid(transaction id),和 Pid 不同,这个 ID 是应用程序提供的,用于标识事务,和 Producer 是谁并没关系。
就是任何 Producer 都可以使用这个 Tid 去做事务,这样进行到一半就死掉的事务,可以由另一个 Producer 去恢复。 - 同时为了记录事务的状态,类似对 Offset 的处理,引入 Transaction Coordinator 用于记录 Transaction Log。
在集群中会有多个 Transaction Coordinator,每个 Tid 对应唯一一个 Transaction Coordinator。 注:Transaction Log 删除策略是 Compact,已完成的事务会标记成 Null,Compact 后不保留。
做事务时,先标记开启事务,写入数据,全部成功就在 Transaction Log 中记录为 Prepare Commit 状态,否则写入 Prepare Abort 的状态。
之后再去给每个相关的 Partition 写入一条 Marker(Commit 或者 Abort)消息,标记这个事务的 Message 可以被读取或已经废弃。成功后在 Transaction Log记录下 Commit/Abort 状态,至此事务结束。
数据流:
- 首先使用 Tid 请求任意一个 Broker(代码中写的是负载最小的 Broker),找到对应的 Transaction Coordinator。
- 请求 Transaction Coordinator 获取到对应的 Pid,和 Pid 对应的 Epoch,这个 Epoch 用于防止僵死进程复活导致消息错乱。
当消息的 Epoch 比当前维护的 Epoch 小时,拒绝掉。Tid 和 Pid 有一一对应的关系,这样对于同一个 Tid 会返回相同的 Pid。 - Client 先请求 Transaction Coordinator 记录的事务状态,初始状态是 Begin,如果是该事务中第一个到达的,同时会对事务进行计时。
Client 输出数据到相关的 Partition 中;Client 再请求 Transaction Coordinator 记录 Offset 的事务状态;Client 发送 Offset Commit 到对应 Offset Partition。 - Client 发送 Commit 请求,Transaction Coordinator 记录 Prepare Commit/Abort,然后发送 Marker 给相关的 Partition。
全部成功后,记录 Commit/Abort 的状态,最后这个记录不需要等待其他 Replica 的 ACK,因为 Prepare 不丢就能保证最终的正确性了。
这里 Prepare 的状态主要是用于事务恢复,例如给相关的 Partition 发送控制消息,没发完就宕机了,备机起来后,Producer 发送请求获取 Pid 时,会把未完成的事务接着完成。
当 Partition 中写入 Commit 的 Marker 后,相关的消息就可被读取。所以 Kafka 事务在 Prepare Commit 到 Commit 这个时间段内,消息是逐渐可见的,而不是同一时刻可见。
6.6 消费事务
前面都是从生产的角度看待事务。还需要从消费的角度去考虑一些问题。
消费时,Partition 中会存在一些消息处于未 Commit 状态,即业务方应该看不到的消息,需要过滤这些消息不让业务看到,Kafka 选择在消费者进程中进行过来,而不是在 Broker 中过滤,主要考虑的还是性能。
Kafka 高性能的一个关键点是 Zero Copy,如果需要在 Broker 中过滤,那么势必需要读取消息内容到内存,就会失去 Zero Copy 的特性。
7. 搭建 Kafka
7.1 下载 Kafka
这里以 Mac OS 为例,在安装了 Homebrew 的情况下执行下列代码:
brew install kafka
由于 Kafka 依赖了 Zookeeper,所以在下载的时候会自动下载。
7.2 启动服务
我们在启动之前首先需要修改 Kafka 的监听地址和端口为 localhost:9092:
vi /usr/local/etc/kafka/server.properties
然后修改成下图的样子:
依次启动 Zookeeper 和 Kafka:
brew services start zookeeper
brew services start kafka
然后执行下列语句来创建一个名字为 “test” 的 Topic:
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
我们可以通过下列的命令查看我们的 Topic 列表:
kafka-topics --list --zookeeper localhost:2181
7.3 发送消息
然后我们新建一个控制台,运行下列命令创建一个消费者关注刚才创建的 Topic:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
用控制台往刚才创建的 Topic 中添加消息,并观察刚才创建的消费者窗口:
kafka-console-producer --broker-list localhost:9092 --topic test
能通过消费者窗口观察到正确的消息:
8. FAQ
8.1 如何保证消息不被重复消费?(消息的幂等性)
对于更新操作,天然具有幂等性。对于新增操作,可以给每条消息一个唯一的 id,处理前判断是否被处理过。这个 id 可以存储在 Redis 中,如果是写数据库可以用主键约束。
8.2 如何保证消息的可靠性传输?(消息丢失的问题)
根据 Kafka 架构,有三个地方可能丢失消息:Consumer,Producer 和 Server。
消费端弄丢了数据:当 server.properties/enable.auto.commit 设置为 True 的时候,Kafka 会先 Commit Offset 再处理消息,如果这时候出现异常,这条消息就丢失了。
因此可以关闭自动提交 Offset,在处理完成后手动提交 Offset,这样可以保证消息不丢失;但是如果提交 Offset 失败,可能导致重复消费的问题, 这时保证幂等性即可。
Kafka 弄丢了消息:如果某个 Broker 不小心挂了,此时若 Replica 只有一个,Broker 上的消息就丢失了。
若 Replica>1,给 Leader 重新选一个 Follower 作为新的 Leader,如果 Follower 还有些消息没有同步,这部分消息便丢失了。
可以进行如下配置,避免上面的问题:
- 给 Topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 Partition 必须有至少 2 个副本。
- 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 Leader 至少感知到有至少一个 Follower 还跟自己保持联系,没掉队,这样才能确保 Leader 挂了还有一个 Follower 吧。
- 在 Producer 端设置 acks=all:这个是要求每条数据,必须是写入所有 Replica 之后,才能认为是写成功了。
- 在 Producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。
Producer弄丢了消息:在 Producer 端设置 acks=all,保证所有的 ISR 都同步了消息才认为写入成功。
8.3 如何保证消息的顺序性?
Kafka 中 Partition 上的消息是顺序的,可以将需要顺序消费的消息发送到同一个 Partition 上,用单个 Consumer 消费。但是 Kafka 只会保证在 Partition 内消息是有序的,而不管全局的情况。
8.4 为什么 Kafka 是 Pull 模型
消费者应该向 Broker 要数据(Pull)还是 Broker 向消费者推送数据(Push)?
作为一个消息系统,Kafka 遵循了传统的方式,选择由 Producer 向 Broker Push 消息并由 Consumer 从 Broker Pull 消息。
一些 logging-centric system,比如 Facebook 的 Scribe 和 Cloudera 的 Flume,采用 Push 模式。事实上,Push 模式和 Pull 模式各有优劣。
Push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 Broker 决定的。
Push 模式的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
而 Pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。
对于 Kafka 而言,Pull 模式更合适。Pull 模式可简化 Broker 的设计,Consumer 可自主控制消费消息的速率。
同时 Consumer 可以自己控制消费方式,即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
|