前言
简单来讲Kafka的Consumer Group是由多个Consumer实例共同组成的一个消费组,Consumer Group由一个Group ID来标识,该组内的所有Consumer共同协调来消费Topic下的所有分区,当然一个Consumer实例只能够消费一个分区。
所以最为理想的情况下当你的Consumer Group下的Consumer实例个数和你的Topic分区个数相同时,那么每个Consumer都能消费一个分区的数据。但如果你的Consumer个数比分区数还多的话,比如: 3个Consumer实例,Topic中只有两个分区,那么总有一个Consumer实例处于空闲状态,那么的话就太浪费了
Offset
在Kafka中有一个专门的术语Offset用来记录Consumer在消费过程中的位移。而对于Consumer Group来说,它是用一组键值对来记录,key= groupId+topic+分区编号,value=Consumer针对该分区的位移量,在新版本的Kafka中该键值对由Kafka内部维护(老版本的存储在zookeeper中)
Rebalance
Rebalance(重平衡)规定了一个 consumer group 是如何达成一致来分配订阅 topic 的所有分区的,说简单点就是让组内的所有Consumer实例来均匀的订阅分区
如上图所示,一共有三种类型的场景会导致Kafka Reblance的发生
1.所订阅的Topic分区数发生变化,比如说Topic1新增了一个分区3,那么会导致订阅该Topic的所有Consumer Group发生Rebalance 2.Consumer Group中的Consumer实例个数发生变化,比如说新增Consumer实例或者某个Consumer实例因故障被剔除Consumer Group(减少Consumer实例)也会导致Rebalance 3.订阅的Topic数量发生变更,一般这种场景是Consumer Group使用了正则表达式来匹配Topic,此时如果你新增的Topic满足该表达式的要求的话,那么就会导致Reblance的产生
当发生Rebalance时,Consumer Group下的所有Consumer实例都需要参与Rebalance。Kafka 提供了三种分区分配策略来进行Rebalance(Consumer Group中的Consumer选择Topic中的那个分区),下文会详细描述这三种策略
Rebalance 策略
range
该策略的原理是按照Consumer个数和分区个数整除得出一个数量(每个消费者分配多少个分区)。由于考虑到会有余数的产生,所以总有几个消费者是比其他的要多出一个分区的,这里给出计算公式:
假设 a = 分区数/消费者数, b(余数)= 分区数%消费者数, 那么我们可以很明显的知道前b个消费者每个消费者分配a+1个分区, 而后面的消费者则分配a个分区,下图描述了这一细节
round robin
该策略的实现方式比较简单,就是将分区按照字典序排序,然后遍历这些分区将分区逐一分配到每个Consumer中
sticky
该策略是Kafka从0.11才支持的分配策略,该策略在保证分配均匀的同时还要求这次的分配尽量和之前的相同。这么做的目的很明确,Consumer和Kafka之间是通过TCP进行连接,如果把连接都断开然后分配完成后重新建立连接,那么无故就会增多TCP的4次挥手(断联)和3次握手(建联)
总结
上述文章我们描述了Consumer Group发生Rebalance的原因,以及Rebalance的三种策略。 很明显「sticky」策略是最理想的策略,不过对于你的Kafka版本有要求(0.11 及以后的版本)。 另外当Consumer Group 发生Rebalance时,JVM(kafka由scala编写,最终会编译成.class文件交由java虚拟机)会STW(stop the world),当发生STW时,Consumer会等待Rebalance完成后才继续消费,所以我们要尽可能的避免Rebalance的产生。那么如何尽量的减少Rebalance的产生呢?
这里有两个参数需要你注意下 sesstion.timeout.ms : 一次session的连接超时时间 heartbeat.interval.ms: 发送心跳的间隔时间,建议设置成session连接超时时间的1/3
2.Consumer因消费超时导致Rebalance
这里同样有两个参数 max.poll.interval.ms: Consumer 间隔多长时间拉取消息 max.poll.records: Consumer 一次拉取的消息条数 这里给出的建议是:结合自身的业务特点,尽可能的在max.poll.interval.ms周期内消费完 max.poll.records
|