前言
上一篇文章kafka系列文章一(kafka介绍) 中描述了下Kafka的整体结构以及相关术语,本篇文章我们一起来探讨Kafka中一个重要的角色Producer。他是整个流程的源头,文章中会结合Go的Kafka客户端框架代码来分析Producer生产的消息如何进行分区 Kafka Go客户端仓库地址
Kafka 为什么要引入分区
通过第一篇文章的描述我们知道,每个分区中有一个Leader副本是直接对客户端提供读写服务的。所以我们可以通过横向扩容的方式,让Topic能够拥有更多的分区,并且让这些分区能够分布在不同的节点上从而提高kafka整体的吞吐量
Producer的分区策略
Producer的分区策略说直接点就是决定当前Producer生产的消息采用何种Balancer算法将消息发送到Kafka指定的分区。 首先我们来看下kafka-go中关于Balancer的定义(部分重要的英文注释我会翻译成中文)
type Balancer interface {
Balance(msg Message, partitions ...int) (partition int)
}
下图是Balancer接口的具体实现(实现消息路由到具体分区的算法),下文中我会选几个比较常用的实现来进行说明
RoundRobin
轮询: 当多个客户端写入消息时,他会将消息均匀的写入到Topic对应的分区中。比如: 一个topic有两个分区,Producer一共写入了3条消息, 那么Msg1会被写入分区1,Msg2会被写入分区2,Msg3写入分区1,剩下的消息以此类推。 在阅读客户端源码时我们会发现 「RoundRobin」是默认的路由策略
func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
if w.Addr == nil {
return errors.New("kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address")
}
w.group.Add(1)
defer w.group.Done()
if w.isClosed() {
return io.ErrClosedPipe
}
if len(msgs) == 0 {
return nil
}
balancer := w.balancer()
}
func (w *Writer) balancer() Balancer {
if w.Balancer != nil {
return w.Balancer
}
return &w.roundRobin
}
Random
随机 :该路由策略是根据分区的数量,然后随机生成一个该区间内的整数(也就是分区号码),最后将消息发往这个分区
func (b randomBalancer) Balance(msg Message, partitions ...int) (partition int) {
if b.mock != 0 {
return b.mock
}
return partitions[rand.Int()%len(partitions)]
}
其实「Random」和「RoundRobin」看起来都差不多,极端情况下「Random」还不如「RoundRobin」让消息分布的更均匀,所以这里简单带过即可(了解下就行)
Hash
首先我们来看它源码中的介绍(同样我精简了注释)
type Hash struct {
rr RoundRobin
Hasher hash.Hash32
lock sync.Mutex
}
从注释中我们好像发现了春天,原来Hash Balancer就是我要找的能够让消息保持顺序的实现啊!不过能够让消息保持顺序的前提时,需要业务侧在生产消息时根据不同的业务将不同类型的消息指定对应的Msg key,这些具有相同key的Msg会被发送到相同的分区,这样一来数据不就有序了么!(我记得这点在面试中出现的频率还是挺高的),我们接下来可以简单看下他是如何进行分区的
func (h *Hash) Balance(msg Message, partitions ...int) int {
if msg.Key == nil {
return h.rr.Balance(msg, partitions...)
}
hasher := h.Hasher
if hasher != nil {
h.lock.Lock()
defer h.lock.Unlock()
} else {
hasher = fnv1aPool.Get().(hash.Hash32)
defer fnv1aPool.Put(hasher)
}
hasher.Reset()
if _, err := hasher.Write(msg.Key); err != nil {
panic(err)
}
partition := int32(hasher.Sum32()) % int32(len(partitions))
if partition < 0 {
partition = -partition
}
return int(partition)
}
总结
以上我们分析了Kafka Producer选择分区的三种常用路由策略 1.RoundRobin 2.Random 3.Hash
其中RoundRobin在实际业务中用的是比较多的,因为他能够让Topic中的消息均匀的分布在所有的分区中。 而当你的业务场景要求消息有序时,你就可以使用Hash Balancer了,注意在使用该策略时记得根据不同业务来给这一组业务中的Msg设置相同的Key
如果你对文章内容有疑问,欢迎评论区留言
|