IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka控制器,复制与存储小结 -> 正文阅读

[大数据]kafka控制器,复制与存储小结

【README】

  • 1,本文主要总结kafka复制,存储细节;
  • 2,本文的kafka集群版本是3.0.0, 有3个broker,分别是 centos201, centos202, centos203 对应的brokerid为 1, 2, 3 ;

【1】kafka内部原理

【1.1】broker-消息中心点

1)broker一个独立的kafka服务器节点;也称为发送消息的中心点

  • kafka使用zk维护集群成员关系;
  • 每个broker都有自己的id存储在zk;broker启动时,创建zk节点把自己id注册到zk;

2)zk存储的kafka集群信息的节点列表

# zk存储的kafka集群信息的节点
[zk: localhost:2181(CONNECTED) 1] ls /
[cluster,
 controller_epoch,
 controller,
 brokers,
 zookeeper,
 feature,
 admin,
 isr_change_notification,
 consumers,
 log_dir_event_notification,
 latest_producer_id_block,
 config]

查看zk中的 broker id

# 查看kafka brokerid 和 topic
[zk: localhost:2181(CONNECTED) 2] ls /brokers 
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/ids
[1, 2, 3]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[hello04, hello05, hello02, hello03, hello01, hello10, __consumer_offsets]

【1.2】控制器

1)控制器定义:集群里第一个启动的broker通过在zk创建临时节点 /controller 让自己成为控制器;

其他broker也尝试创建 controller 节点,若已存在,则报错;其他 broker 会在控制器节点上创建 zk watch 对象,这样非控制器节点可以收到控制器节点状态变更的通知;(干货——这种方式可以确保一个集群只能有一个控制器存在,防止脑裂问题

2)控制器选举策略:一旦控制器被关闭或与zk断开,其他broker通过watch对象就会收到控制器消失的通知,这些 非控制器broker 会竞争在 zk 上创建 controller节点,谁最先创建成功,谁就是集群控制器; 然后其他broker在控制器节点上创建 zk watch对象;

  • 2.1)每次控制器选举后: 控制器纪元值(时代值)controller_epoch? 都会递增;其他broker若收到控制器发出的包含旧 epoch 的消息,就会忽略;

3)控制器实验

step1) 查看 控制器和控制器纪元

[zk: localhost:2181(CONNECTED) 5] get /controller_epoch
6

[zk: localhost:2181(CONNECTED) 6] get /controller
{"version":1,"brokerid":1,"timestamp":"1638692039821"}

显然, epoch是6,控制器是broker1;

step2)停止掉 broker1;?

这个时候,broker2,3 会竞争选举为控制器;我们再次查看控制器,发现控制器现在是broker2了;且 epoch自增为7;?

[zk: localhost:2181(CONNECTED) 7] get /controller_epoch
7

[zk: localhost:2181(CONNECTED) 8] get /controller      
{"version":1,"brokerid":2,"timestamp":"1638733315396"}

4)控制器作用

  • 控制器负责在broker加入或离开时进行分区首领选举;
  • 控制器使用 epoch 避免脑裂问题

【补充】脑裂指两个节点同时认为自己是集群控制器;?

5)zk的作用

  1. kafka使用zk的临时节点来选举控制器;
  2. zk在broker加入或退出集群时通知控制器;

【1.3】复制

复制功能是kafka架构的核心;在kafka 文档里,kafka把自己描述为 一个分布式的,可分区的,可复制的提交日志服务;(kakfa的日志就是数据或消息);

【1.3.1】副本

1)数据存储

kafka使用主题来组织数据(逻辑);使用分区为单位来读写数据(物理);

为什么说kakfa以分区为单位读写? 是因为我们创建带有分区数和副本数的主题后, kakfa会创建以这个分区命名的文件夹,分区文件夹下存储消息内容,索引文件等;


?2)主题,分区,副本关系

  • 1个主题对应多个分区;
  • 1个分区对应多个副本;
  • 1个副本对应多个分段文件;(分段存储)?

3)副本类型?

  • 3.1)首领副本:每个分区都有一个首领副本,消息读写首先会操作首领副本;
  • 3.2)跟随者副本:首领副本以外的副本;它们不处理读写请求,唯一任务是从首领副本复制消息,与首领保持数据同步;如果首领发生崩溃,其中一个同步的跟随者副本被提升为首领副本;

补充1:跟随者副本在成为不同步副本前的时间是通过 replica.lag.time.max.ms 来配置;

补充2:跟随者从首领副本复制消息时的请求,与消费者从首领副本消费消息时发出的请求是一样的;


【1.4】处理请求

1)broker处理请求过程

  • step1)broker会在监听端口上运行一个 Acceptor线程可以理解为服务器套接字 ServerSocket),这个线程会创建一个连接(类似ServerSocket.accept() 方法),把请求交给 Processor线程(网络线程)去处理;
  • step2)Processor线程从客户端获取请求消息,把它放进请求队列,然后从响应队列获取响应结果并发送给客户端;
  • step3) 在请求被放入请求队列后, IO线程会处理它们,并把处理结果放入 响应队列;

?2)常见请求类型

  • 生产请求:生产者发送的请求,包含要写入的消息;
  • 获取请求:消费者或跟随者副本所在broker需要从首领副本所在broker获取消息而发送的请求;

【注意】

  1. 生产请求和获取请求都必须发送给分区的首领副本,跟随者副本不参与消息读写,仅做备份和支持集群高可用;
  2. kafka客户端要自己负责把生产请求和获取请求发送到正确的broker上;否则broker会返回错误响应;

3)客户端怎么知道请求发送到哪里呢?

3.1)客户端在发送请求前,先发送元数据请求

  • 这种请求的响应结果包括 主题,主题分区,分区副本以及首领副本;

3.2)客户端会缓存这些元数据信息;

  • 获取元数据信息后,会直接往对应的 broker发送请求和获取请求;
  • 当然,客户端需要定时刷新元数据缓存; 刷新时间间隔通过? metadata.max.age.ms 来配置;?

?


【1.4.1】生产请求

1)生产者acks有3个值;

  • acks=0 ; 生产者在发送消息后,默认发送成功;而不会等待服务器响应;
  • acks=1 ; 只要集群的首领节点收到消息,生产者就会收到发送成功的响应;而不管副本节点是否收到消息;
  • acks=all; 需要集群的首领节点和跟随节点(副本节点)都收到消息后,生产者才会收到发送成功的响应;

2)首领副本所在broker收到生产请求后,会对请求做一些验证:

  • 发送数据的用户是否有写入权限;
  • acks的值是否合法; (只允许出现 0, 1, all);
  • 根据acks的值,进行副本复制策略;

【1.4.2】获取请求

1)首领副本所在broker收到获取请求后,会根据客户端指定的请求偏移量从分区里读取消息;

2)kafka使用 零复制技术 向客户端发送消息,即kafka直接把消息从文件发送到网络通道,而不经过任何中间缓冲区;(干货——这是kakfa与大部分数据库不一样的地方,其他数据库在把数据发送到客户端前,会把数据保存到本地缓存)

  • 零复制技术优点:避免了字节复制,也不需要管理内存缓冲区,从而获取更好性能;?

3)消费者客户端只能读取已经被写入所有同步副本的消息,而不是所有消息

  • 因为还没有被足够多副本复制的消息被认为是不安全的;如果首领副本所在broker发送崩溃,另一副本成为新首领,那这些不安全的消息就会丢失;

4)扩展 ISR, HW高水位

  1. ISR, In-Sync-Replica set, 同步副本集合,即所有与首领副本保持同步的副本集合;
  2. LEO,log end offset,日志末端偏移量 ,即副本收到的最后一次提交的偏移量;
  3. HW高水位,High Watermark, 所有副本最小的LEO

小结: 消费者只能看到已经复制所有副本的消息;


【1.4.3】其他请求

OffsetCommitRequest, 偏移量提交请求;

OffsetFetchRequest;

ListOffsetsRequest;


【1.5】物理存储

1)kafka的基本存储单元是分区; 分区会在所属broker上的kafka数据根目录下新建名为分区名的文件夹,如 hello04-2(主题为hello04的2号分区文件夹),kafka数据根目录由 server.properties 中的 log.dirs 来指定;

2)主题,分区,副本关系

  • 1个主题对应多个分区;
  • 1个分区对应多个副本;
  • 1个副本对应多个分段文件;(分段存储)?

?


【1.5.1】分区分配

1)创建指定分区和副本数的topic来做实验

# 创建分区数3副本数2的主题 
kafka-topics.sh --bootstrap-server centos201:9092 
--create --topic hello11 --partitions 3 --replication-factor 2 
# 副本数量必须小于等于broker数量,但分区数没有这个限制;

查看分区详情

[root@centos201 hello04-1]# kafka-topics.sh --bootstrap-server centos201:9092 \ 
--describe --topic hello11
Topic: hello11  TopicId: IliU_BDeS8ycreLufxCMMw PartitionCount: 3       ReplicationFactor: 2    Configs: segment.bytes=1024
 Topic: hello11 Partition: 0    Leader: 2       Replicas: 2,3   Isr: 2,3
 Topic: hello11 Partition: 1    Leader: 3       Replicas: 3,1   Isr: 3,1
 Topic: hello11 Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2

查看具体存储数据的文件夹,以broker1为例;?

根据topic详情,我们知道 broker1 存储了topic hello11的1号和2号分区; 且它是2号分区首领所在的broker

进入 broker1的kafka数据根目录,

?

?进入其中一个分区文件夹查看? hello11-1 ,如下:

再查看分区文件夹前,我们先写入10条消息; 指定topic hello11, 1号分区

 for (int i = 0; i < 10; i++) {
	Future<RecordMetadata> future = producer.send(
new ProducerRecord<String, String>("hello11", 1,"", String.format("[%s] ", order++) + now + " > " + DataFactory.INSTANCE.genOneHundred()));
	try {
		System.out.println("[生产者] " + future.get().partition() + "-" + future.get().offset());
	} catch (Exception e) {
		e.printStackTrace();
	}
}

查看分区文件夹下的文件 ;

2) kafka的分段存储

因为在一个大文件里查找和删除消息很耗时;所以把一个分区分成若干片段进行存储;默认情况下,一个片段存储1g数据,为了实验,这里我修改为 1k,可以在 server.properties文件中设置 log.segment.bytes=1024 来实现;

3)kafka的稀疏索引

  • kafka并没有对每条消息建立索引,那样太大了,而是采用稀疏索引(稀疏存储)的方式,即一条索引记录指向一个消息范围;

例如: 索引值 1~100 指向 数据文件1.log中的消息1到消息100的消息范围的起始地址;

refer2 Apache Kafka ;

当消费者指定消费某个offset记录时, kafka集群通过二分查找从索引文件找出包含offset的索引值,通过索引值找到对应数据文件的起始地址,然后从起始地址开始顺序读取对应offset的消息;


【1.5.2】文件格式

1)kafka 使用零复制技术给消费者发送消息,避免了对生产者已经压缩过的消息进行解压和再压缩;?

2)普通消息与压缩消息格式 ?

?可以看出,多个压缩消息共用同一个消息头,从而减少消息大小;

【References】

  1. kafka权威指南;
  2. Apache Kafka
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-06 15:20:16  更:2021-12-06 15:20:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 13:52:57-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码