IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka系列文章二(Producer消息分区策略) -> 正文阅读

[大数据]kafka系列文章二(Producer消息分区策略)

前言

上一篇文章kafka系列文章一(kafka介绍)
中描述了下Kafka的整体结构以及相关术语,本篇文章我们一起来探讨Kafka中一个重要的角色Producer。他是整个流程的源头,文章中会结合Go的Kafka客户端框架代码来分析Producer生产的消息如何进行分区
Kafka Go客户端仓库地址

Kafka 为什么要引入分区

通过第一篇文章的描述我们知道,每个分区中有一个Leader副本是直接对客户端提供读写服务的。所以我们可以通过横向扩容的方式,让Topic能够拥有更多的分区,并且让这些分区能够分布在不同的节点上从而提高kafka整体的吞吐量

Producer的分区策略

Producer的分区策略说直接点就是决定当前Producer生产的消息采用何种Balancer算法将消息发送到Kafka指定的分区。
首先我们来看下kafka-go中关于Balancer的定义(部分重要的英文注释我会翻译成中文)

// Balancer是消息发送的抽象接口
// 该接口的实例可以按照指定的算法将消息路由到指定的分区
// 在多协程换成中使用Balancer时,请确保线程安全
type Balancer interface {
	// Balance receives a message and a set of available partitions and
	// returns the partition number that the message should be routed to.
	//
	// An application should refrain from using a balancer to manage multiple
	// sets of partitions (from different topics for examples), use one balancer
	// instance for each partition set, so the balancer can detect when the
	// partitions change and assume that the kafka topic has been rebalanced.
	Balance(msg Message, partitions ...int) (partition int)
}

下图是Balancer接口的具体实现(实现消息路由到具体分区的算法),下文中我会选几个比较常用的实现来进行说明
在这里插入图片描述

RoundRobin

轮询: 当多个客户端写入消息时,他会将消息均匀的写入到Topic对应的分区中。比如: 一个topic有两个分区,Producer一共写入了3条消息,
那么Msg1会被写入分区1,Msg2会被写入分区2,Msg3写入分区1,剩下的消息以此类推。
在这里插入图片描述
在阅读客户端源码时我们会发现 「RoundRobin」是默认的路由策略

func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
	if w.Addr == nil {
		return errors.New("kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address")
	}

	w.group.Add(1)
	defer w.group.Done()

	if w.isClosed() {
		return io.ErrClosedPipe
	}

	if len(msgs) == 0 {
		return nil
	}
	// 获取 Balancer实例
	balancer := w.balancer()
	// 无用的代码已经删除
}

func (w *Writer) balancer() Balancer {
	if w.Balancer != nil {
		return w.Balancer
	}
	// 当Producer没有指定路由策略时,使用roundRobin
	return &w.roundRobin
}

  • 优点
    该路由策略能够将消息均匀的分布在所有的分区中,当分区所处的节点机器配置差距不大时对客户端能够实现很好的负载均衡,也不会出现数据倾斜的情况(某个分区存储了大量消息)

  • 缺点
    不能根据分区所处节点的配置来调整对应的权重,比如说让配置高点的机器存储更多的消息,配置低的机器存储少量的消息。还有一点就是不能做到消息的顺序性(有些业务系统要求发送的消息前后有关联)

Random

随机 :该路由策略是根据分区的数量,然后随机生成一个该区间内的整数(也就是分区号码),最后将消息发往这个分区

func (b randomBalancer) Balance(msg Message, partitions ...int) (partition int) {
	if b.mock != 0 {
		return b.mock
	}
	// 这段代码是重点
	return partitions[rand.Int()%len(partitions)]
}

其实「Random」和「RoundRobin」看起来都差不多,极端情况下「Random」还不如「RoundRobin」让消息分布的更均匀,所以这里简单带过即可(了解下就行)

Hash

首先我们来看它源码中的介绍(同样我精简了注释)

// 它使用提供的哈希函数来确定将消息路由到哪个分区。
// 该Balancer确保拥有相同Key的Msg被路由到相同的分区。
// 计算分区的逻辑是:
// hasher.Sum32() % len(partitions) =>分区
type Hash struct {
	rr     RoundRobin
	Hasher hash.Hash32

	// lock protects Hasher while calculating the hash code.  It is assumed that
	// the Hasher field is read-only once the Balancer is created, so as a
	// performance optimization, reads of the field are not protected.
	lock sync.Mutex
}

从注释中我们好像发现了春天,原来Hash Balancer就是我要找的能够让消息保持顺序的实现啊!不过能够让消息保持顺序的前提时,需要业务侧在生产消息时根据不同的业务将不同类型的消息指定对应的Msg key,这些具有相同key的Msg会被发送到相同的分区,这样一来数据不就有序了么!(我记得这点在面试中出现的频率还是挺高的),我们接下来可以简单看下他是如何进行分区的

func (h *Hash) Balance(msg Message, partitions ...int) int {
    // 注意: 如果消息中没有指定Key的话,则还是会走默认的RoundRobin来选择分区
	if msg.Key == nil {
		return h.rr.Balance(msg, partitions...)
	}

	hasher := h.Hasher
	if hasher != nil {
		h.lock.Lock()
		defer h.lock.Unlock()
	} else {
		hasher = fnv1aPool.Get().(hash.Hash32)
		defer fnv1aPool.Put(hasher)
	}

	hasher.Reset()
	if _, err := hasher.Write(msg.Key); err != nil {
		panic(err)
	}
	
	// 这里就是具体的算法实现了,感兴趣的可以自行研究下
	// uses same algorithm that Sarama's hashPartitioner uses
	// note the type conversions here.  if the uint32 hash code is not cast to
	// an int32, we do not get the same result as sarama.
	partition := int32(hasher.Sum32()) % int32(len(partitions))
	if partition < 0 {
		partition = -partition
	}

	return int(partition)
}

总结

以上我们分析了Kafka Producer选择分区的三种常用路由策略
1.RoundRobin
2.Random
3.Hash

其中RoundRobin在实际业务中用的是比较多的,因为他能够让Topic中的消息均匀的分布在所有的分区中。
而当你的业务场景要求消息有序时,你就可以使用Hash Balancer了,注意在使用该策略时记得根据不同业务来给这一组业务中的Msg设置相同的Key

如果你对文章内容有疑问,欢迎评论区留言

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-01 23:28:20  更:2022-04-01 23:28:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 14:38:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码