1、高可用机制
高可用性是指系统无间断地执行其功能的能力,代表系统的可用性程度。Kafka提供了高可用机制,可保障一个或多个Broker宕机后,其他Broker及所有Partition都能继续提供服务,且存储的消息不丢失。 对于分布式系统来说,当集群规模上升到一定程度后,一台或多台机器宕机的可能性大大增加,Kafka采用多机备份和消息应答确认的方式来解决数据丢失问题,并通过一套失败恢复机制解决服务不可用的问题。
1.2、高可用保障机制
1.2.1、消息备份机制
上一篇文章提到,Kafka的每个分区(Partition)都有一个副本集合AR,每个副本集合包含一个Leader副本,及0个以上的Follower副本。生产者将消息发送对应Partition的Leader副本,Follower副本从Leader副本同步消息,Kafka的Leader机制在保障数据一致性的同时,也降低了消息备份的复杂度。 同一个Partition的副本不会存储在同一个Broker上,Kafka会尽量将所有的Partition以及其各个副本均匀地分配在整个集群中,这样既做好了负载均衡,又提高了容错能力。
1.2.2、ISR
所有与Leader副本保持一定程度同步的副本(包括Leader副本)组成副本列表 ISR(In-Sync Replicas),实际存储的是副本所在Broker的BrokerId。这里的保持同步并不是Follower副本与Leader副本数据完全一致,只需要在一定时间内保持有效连接即可,这个时间由参数replica.lag.time.max.ms设定,默认值为10s。
Follower会周期性的向Leader发送FetchRequest请求获取要存储的消息,发送的时间间隔由参数replica.fetch.wait.max.ms设定,默认值为500ms。
在消息同步期间,Follower副本相对于Leader副本具有一定程度的滞后,Leader副本负责维护和跟踪ISR集合中所有Follower副本的滞后状态,并将ISR的变更同步至ZooKeeper。当Follower副本落后太多或失效时,Leader副本会将它从ISR集合中剔除,被移除ISR的Follower可以继续发送FetchRequest请求,尝试再次跟上Leader并重新进入ISR。追赶上Leader副本的判定标准是,此Follower副本的LEO不小于Leader副本的HW。
通常只有在ISR集合中的副本才有资格被选举为新的Leader。当Kafka中的unclean.leader.election.enable(是否可以从非ISR集合中选举leader副本,默认值为 false)配置为true,并且ISR中所有副本都宕机的情况下,才允许ISR外的副本(即OSR,上一篇文章有介绍)被选举为Leader,由于OSR副本不保证和Leader副本同步,可能造成数据丢失。
1.2.2.1、分区Leader副本的选举
同一个分区,同一个Broker节点中不允许出现多个副本,当分区的Leader节点发生功能故障时,其中一个Follower节点就会成为新的Leader节点。
分区Leader副本的选举由Kafka Controller负责具体实施。当创建分区(创建主题或增加分区都有创建分区的动作)或分区上线(比如分区中原先的Leader副本下线,此时分区需要选举一个新的Leader上线来对外提供服务)的时候都需要执行Leader的选举动作。
选举的基本思路是按照AR集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。一个分区的AR集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。注意这里是根据AR的顺序而不是ISR的顺序进行选举的。
分区进行重分配(reassign)的时候也需要执行leader的选举动作。同样也是从重分配的AR列表中找到第一个存活的副本,且这个副本在目前的ISR列表中。分区重分配指的是,当集群中新增Broker节点时,只有新创建的主题分区才有可能被分配到这个节点上,之前的主题分区是不会自动分配到新节点上的,就会出现新旧节点的负载不均衡,此时需要进行分区重分配,达到分区平衡。
优先副本的选举逻辑相同,直接将优先副本设置为Leader即可,AR集合中的第一个副本即为优先副本。
1.2.3、消息应答确认机制
生产者发送消息中包含acks字段,该字段代表Leader应答生产者前副本的写入数量
-
acks=0 生产者无需等待服务端任何确认,消息被添加到生产者套接字缓存区后即视为已发送,因此不保证服务端已收到消息 -
acks=1 Leader将消息写入本地日志后,无需等待Follower的消息确认就做出应答,如果Leader在响应生产者消息已接收之后立即宕机,其他Follower均为完成消息的复制,这批消息就会丢失 -
acks=-1/all Leader将等待ISR中所有副本都写入完成,并应答给Leader后,Leader才会响应生产者消息已接收。因此只有ISR中任意一个副本还存活,这些消息就不会丢失
Broker端的配置参数min.insync.replicas表示分区ISR集合中至少要有多少个副本,默认值为1。当ISR中的副本数量小于min.insync.replicas时,Leader停止写入生产者的消息,并向生产者抛出NotEnoughReplicas异常,阻塞等待更多的Follower赶上并重新进入ISR。 被Leader写入的消息都至少有min.insync.replicas个副本,因此容忍min.insync.replicas - 1个副本同时宕机。 为了保证不丢失消息,可以配置生产者acks=all 并且 min.insync.replicas >= 2
1.2.4、LEO和HW
上一篇文章介绍了消息的追加过程,提到了日志偏移量,Kafka的每个副本对象有两个重要的偏移量属性:
- LEO(log end offset),即日志末端位移,指向副本日志中下一条消息的偏移量(即下一条消息的写入位置)
- HW(High Watermark),高水位线,指已同步ISR中Follower副本的偏移量标识
所有高水位线以下的消息都是备份过的,消费者仅可以消费各个分区Leader高水位线以下的消息,所以Leader的HW值是由ISR中所有备份的LEO最小值决定的
1.3、故障恢复机制
Kafka使用ZooKeeper存储Broker、Topic等状态数据,Kafka集群中的Controller和Broker会在ZooKeeper指定节点上个注册Watcher(时间监听器),以便在特定事件触发时,由ZooKeeper将事件通知到对应的Broker。
1.3.1、Broker故障恢复分析
1.3.1.1、场景一:Broker0与其他Broker断开连接
由于Broker0和ZooKeeper还在正常连接中,因此ZooKeeper认为Broker0依然存活,则对于两个分区有不同的操作:
-
Partition0 Broker的副本为Partition0的Leader副本,当Broker0超过replica.lag.time.max.ms(默认值为10s)没有收到Broker1、Broker2的FetchRequest请求后,Broker0选择将Partition0的ISR收缩到仅剩Broker0本身,并将ISR的变更同步到ZooKeeper。Broker0根据min.insync.replicas的配置值决定是否继续接收生产者的消息 -
Partition1 超过replica.lag.time.max.ms后,Broker1会将Broker0的副本从Partition1的ISR中移除。如果后续Broker0恢复与其他Broker的连接,相应的Follower副本赶上Broker1,还会将其重新加入ISR中
1.3.1.2、场景二:Broker0与ZooKeeper断开连接
ZooKeeper会认为Broker0已经宕机,会删除Broker0的节点,对于两个分区也有不同的操作:
-
Partition0 ZooKeeper删除节点后该节点上注册的Watcher会通知控制器(Controller),控制器会发现Broker0是Partition0的Leader,于是从当前存活的ISR中选择Broker2作为Partition0的新Leader。控制器将Leader的变更通知Broker1、Broker2,Broker1改向Broker2发送FetchRequest请求数据。 生产者每隔60s从bootstrap.servers中的Broker获取最新的元数据(metadata),当发现Partition0的Leader节点发生变更后,会改向新的Leader发送消息。 Broker0由于收不到ZooKeeper的通知,依然认为自己是Partition0的Leader,当发现Broker1、Broker2不再向自己发送FetchRequest请求数据,缺失了ISR应答的Broker0停止写入acks=all的消息,但可以写入acks=1的消息。在replica.lag.time.max.ms时间之后,Broker0尝试向ZooKeeper发送ISR变更发现无法正常连接,便不再接收新的生产者的消息。 当Broker0与ZooKeeper恢复连接后,发现自己不再是Partition0的Leader,会向Broker2发送FetchRequest请求数据,并且将自己的本地日志阶段,为了与Leader数据一致。在失联开始与生产者重新向Broker2发送消息这段时间内的消息会丢失。 -
Partition1 由于Broker0与Broker1依然保持连接,因此Broker0依然会向Broker1发送FetchRequest请求数据。只要Broker0能继续保持同步,Broker1也不会向ZooKeeper变更ISR。
1.3.2、Controller故障恢复分析
1.3.2.1、场景一:Controller与某个Broker(如Broker0)断开连接
Controller无法通知Broker0,会认为Broker0已经宕机,会执行上述1.3.1.2中的场景
1.3.2.2、场景二:Controller与ZooKeeper断开连接
ZooKeeper会将Controller临时节点删除,进行重新选举新的Controller。Kafka中的控制器选举依赖于ZooKeeper,成功竞选为控制器的broker会在ZooKeeper中创建/controller这个临时节点。
选举成功后,控制器会读取ZooKeeper中各个节点的数据来初始化上下文信息(ControllerContext),并且需要管理这些上下文信息。
选举过程如下:
- 每个Broker启动的时候,都会去尝试读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其他Broker节点成功竞选为控制器,当前Broker就会放弃竞选。
- 如果ZooKeeper中不存在/controller节点或者该节点数据异常,那么就会去尝试创建/controller节点。
- 当前Broker取创建节点的时候,也有可能其他Broker同时去创建这个节点,只有创建成功的Broker才会成为控制器,创建失败的Broker竞选失败。
选举触发时机:
- ZooKeeper中/controller节点被删除
- 控制器对应的Broker宕机
- 手动更改/controller节点中的brokerid对应的数据
2、高性能
Kafka虽然采用的是磁盘存储,却有着高性能、高吞吐的特点,吞吐量可以达到上百万。
2.2、高性能实现机制
Kafka的架构如图所示,Producer生产消息,以Partition为维度,按照一定的路由策略,提交消息到Broker集群中各个Partition的Leader节点。Consumer同样以Partition为维度,从Broker中的Leader节点拉取并消费消息。
Producer生产消息后发送给Broker会涉及大量的消息网络传输,所以Kafka采用了批量发送的方式。Broker在持久化消息、读取消息的时候,如果采用传统IO读写方式,会严重影响性能,所以Kafka采用了顺序写+零拷贝的方式。
2.2.1、批量发送消息
生产者架构如图所示,消息会被追加到消息累加器(RecordAccumulator)的各个Partition的双端队列中的批记录(ProducerBatch),然后Sender线程从队列中读取消息并发送数据到Broker中
发送到Broker需满足消息大小达到阈值(由batch.size参数来指定,默认值为16KB),或者消息等待发送时间达到阈值(由linger.ms参数来指定,默认值为0,即有消息立刻发送)
Sender线程发送具体过程如下:
- Sender从消息累加器中找到已经准备好的Broker节点,前提条件是Partition等待发送消息的大小或等待发送时间达到阈值
- 如果客户端还没有与Broker节点建立连接,则创建连接
- 获取每个Partition要发送的ProducerBatch
- 创建要发送各个Broker的客户端请求,并将消息发送给Broker
2.2.2、日志持久化机制
Kafka消息是存储在磁盘上的,以日志(Log)的形式存储,每个Partition的的每个副本都有日志,为了防止日志过大,引入了日志分段(LogSegment),将Log切分成多个LogSegment,便于维护和清理。
每个LogSegment又有日志文件(.log)、偏移量索引文件(.index)、时间戳索引文件(.timeindex)、其他文件
- 偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置
- 时间戳索引文件是根据指定的时间戳(timestamp)来查找对应的偏移量信息
索引文件是以稀疏索引(sparse index)的方式构造消息的索引,每当写入一定量(由参数log.index.interval.bytes指定,默认值为4KB),偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。稀疏索引通过MappedByteBuffer将索引文件映射到内存中,加快索引查询速度。
日志分段的切分条件,满足任意一个即可:
- 日志分段文件大小超过log.segment.bytes配置的值,默认值为1GB
- 日志分段中消息的最大时间戳与当前系统的时间戳的差值大于log.roll.hours- 配置的值,默认值为7天
- 偏移量索引文件或时间戳索引文件的大小达到了log.index.size.max.bytes配置的值,默认值为10M
- 要写入的偏移量数值超过Integer.MAX_VALUE,即偏移量索引文件无法再写入
2.2.2.1、偏移量索引
偏移量索引以Key-Value的形式存储,Key为相对偏移量(relativeOffset),表示消息相对于基准偏移量(baseOffset)的偏移量,占用4个字节,当前索引文件的名称即为baseOffset的值,使用相对偏移量是为了减小索引文件占用的时间。Value表示为物理地址(position),占用4个字节,也就是消息在日志分段文件中对应的物理位置。
2.2.2.2、时间戳索引
时间戳索引以Key-Value的形式存储,Key为时间戳(timestamp),占用8个字节,表示当前日志分段的最大时间戳。Value为相对偏移量(relativeOffset),占用4个字节,表示时间戳所对应消息的相对偏移量。
每个要追加的时间戳索引项的timestamp必须大于之前追加的索引项的timestamp,都在不予追加。
2.2.2.3、基于索引文件的查询
- 查找偏移量为678的消息,先查找到00000500.index这个索引文件,678-500=178,则相对偏移量为178。
- 根据二分查找,查找到不大于178的最大索引项,即(100,459)这个索引项。
- 拿到该物理地址459,从日志数据文件中物理地址459处开始向后读取消息,直到查找到偏移量为678的消息
2.2.2.4、日志清理
Kafka提供了两种日志清理策略:
2.2.3、顺序写
Kafka采用顺序写入磁盘的方式提高吞吐量,通过文件追加的方式写入消息,即只能在日志文件的尾部追加新的消息,并且不允许修改已写入的消息。
2.2.3.1、磁盘读取流程
当需要从磁盘读取数据时,系统将数据逻辑地址传给磁盘,磁盘的控制电路将逻辑地址翻译成物理地址,即确定要读取数据的磁道和扇区,流程如下
- 寻找磁道:磁头移动定位到指定磁道,这段时间最长,最长可达0.1s左右
- 磁盘旋转:等待指定扇区旋转到磁头下,与硬盘自身性能有关,xxxx转/分
- 数据传输:数据通过系统总线从磁盘传送到内存的时间
2.2.3.2、顺序IO与随机IO
2.2.3.3、磁盘I/O流程
-
写操作 用户调用fwrite把数据写入C库标准IObuffer后返回,即写操作通常是异步操作。写入IObuffer后,不会立即刷新到磁盘,会将多次小数据量相邻写操作先缓存起来合并,最终调用write函数一次性写入(或者将大块数据分解多次write调用)页缓存。数据到达页缓存后也不会立即刷新到磁盘,内核有pdflush线程在不停地检测脏页,判断是否写入磁盘,需要写入的则发起磁盘I/O请求。 -
读操作 读操作是同步的,用户调用fread到C库标准IObuffer中读取数据,如果成功则返回。否则继续到页缓存中读取数据,如果成功则返回。否则继续发起I/O请求,从磁盘中读取,读取到数据后将数据缓存到页缓存和C库标准IObuffer并返回。 -
磁盘I/O请求 通用块层根据I/O请求构造一个或多个bio结构并提交给调度层。调度器将bio结构进行排序、合并(将一个或多个进程的读操作合并,将一个或多个进程的写操作合并)到请求队列中,然后调度器再将这些请求提交给驱动,由驱动开始从磁盘读取。
2.2.3.4、页缓存
页缓存(pagecache)是操作系统实现的一种主要的磁盘缓存,以此来减少对磁盘I/O的操作,把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。
当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页(page)是否在页缓存中,如果存在(命中)则直接返回数据,从而避免对物理磁盘的I/O操作。如果没有命中,则从磁盘中读取并将数据页存入页缓存,之后再将数据返回给进程。
Kafka中大量使用了页缓存,先将消息写入页缓存,然后由操作系统负责具体的刷盘任务。
2.2.4、零拷贝
零拷贝是指将数据直接数据从磁盘文件复制到网卡设备中,而不需要经过应用程序。零拷贝大大提高了应用程序的性能,减少了内核态和用户态之间的上下文切换。 以将磁盘文件传送给用户举例,先将文件复制出来放到内存buf中,然后再将这个buf通过套接字(Socket)传输给用户,如图所示,文件经历了4次复制过程:
- 读取文件时,DMA(Direct Memory Access,直接存储器访问)将文件从磁盘复制到内核态的Read Buffer中
- CPU控制将内核态数据复制到用户态之下
- 将文件写入时,将用户态下的内容复制到内核态下的Socket Buffer中
- 将内核态下Socket Buffer的数据复制到网卡设备(NIC Buffer)中传送
内核态和用户态的上下文切换也发生了4次 零拷贝技术依赖于Linux底层的sendfile()方法实现,如图所示,数据通过DMA拷贝到内核态Reader Buffer后,直接通过DMA拷贝到网卡设备(NIC Buffer),无需CPU拷贝,这也是零拷贝叫法的来源。
这里数据只经历2次复制就从磁盘中传送出去了,并且上下文切换也变成了2次。
Kafka的数据传输通过TransportLayer来完成,最终通过Java NIO的transferTo()和transferFrom()方法实现零拷贝,但是不能保证一定能使用零拷贝,还需要看操作系统是否提供了sendfile这样的零拷贝调用。
3、常见问题分析
3.1、消息积压
3.1.1、积压可能原因
- 消费流程卡死
- 消息消费耗时过长
- 消费组客户端启动失败
- 消费线程过少,消费能力不够。
3.1.2、正确做法
- 消费逻辑的业务处理尽量时间不要太长,如果存在长耗时逻辑尽量异步处理。
- 不要过多和外系统服务进行交互,避免其它服务问题导致消费能力下降。
- 消费线程要对异常进行分类处理,不要发生异常轻易终止或者关闭消费节点的注册。
- 对于单Partition消息消费在不需要保证有序的情况下开启并行消息。
- 发现问题及时扩容Partition并扩容消费者机器。
- 优化消费逻辑,能异步处理的尽量异步处理。
3.2、消息丢失
3.2.1、丢失可能原因
- Kafka partition leader选举策略问题造成消息丢失。
- 数据可靠性级别未设置为ack=-1。
- 消息过大造成发送失败
- 业务存在超时丢弃消息逻辑
3.2.2、正确做法
- 业务消费未执行成功不要返回消费成功。
- 如果对消息丢失零容忍可设置客户端 ack=-1。
- 不要发送超过1M以上消息
|