kafka数据存储
一、kafka选型
kafka的设计初衷,是为了解决海量数据的读写,能否快速读写,成了决定因素
1、写
顺序读写比随机读写快
2、读
常用两种索引
- B+tree索引
- Hash索引
- 等值查询,只需经过一次算法即可获得对应的键值,时间复杂度:O(1)
选择hash索引的原因(两种索引方式的区别可自行百度):
- 不需要额外的空间存储索引,不需要维护索引
- 等值查询更快(由offset进行数据定位)
二、数据存储
segment分为索引文件和日志文件
1、索引文件
包含两个部分:偏移量、存储位置(message的物理位置)
- 偏移量索引文件.index
- 根据消息偏移量查找映射关系
- 4个字节存储相对偏移量
- 4个字节存储position
- 时间索引文件.timeindex(0.10.1版本才有)
- 根据时间戳查找映射关系
- 8个字节timestamp存储
- 4个字节存储position
2、日志文件
.log默认大小1G
包含两个部分:消息、消息物理偏移地址
根据上图一般会有些困惑?
-
1、如何查找?以偏移量368775为例
- 根据index文件名排序,通过二分法定位具体index
- 找到不比368775大的.index文件(368769.index,442365.index)
- 368775比368769大6,所以找到位置,6,1407
- 在368769.log中定位到偏移量为1407的消息
- Message368775即为对应的消息
- (timeindex文件同理)
-
2、索引文件的编号为什么不连续?
- 为了将索引存在内存中,采用稀疏存储
- 避免索引占用空间,每隔4k简历一条索引
- 没有建立索引的offset,无法一次定位,但是扫描范围较小
3、消息
3.1 消息格式的版本
kafka版本 | message版本 | segment |
---|
0.10前 | v0 | .log+.index | 0.10.x | v1 | .log+.index+.timeindex | 0.11~之后 | v2 | .log+.index+.timeindex |
(上图为message版本)
(上图为attributes各位含义)
一条/一批次消息的各部分含义
- CRC32:消息的CRC校验码。
- magic:消息版本。当magic为0时,消息的offset使用绝对offset;当magic为1时,消息的offset使用相对offset。
- attributes:
- 0~2位表示消息使用的压缩类型,
- 0(000)-无压缩;
- 1(001)-gzip;
- 2(010)-snappy;
- 3(011)-lz4。
- 第3位表示时间戳类型,
- 其他属性如图
- Control消息用来支持事务功能
- key length,key,value length,value顾名思义,表示key、value的长度与值
- producer id,producer epoch,first sequence用于幂等
v2版本record batch中attribute属性废弃,但仍然保留字段
3.2版本小结
-
V0 V1缺陷:无论key/value是否存在,都需要4字节保存信息 -
V0最小消息14字节,V1版最小消息为22字节,V2版本最小消息集为61字节。小于最小消息则消息非法 -
v2虽然最小字节变大了,然而批量发送场景下,提升了效率(record只存了消息内容)
v0无timestamp,是否不支持时间策略?
查询时timestamp返回-1,不支持
三、日志清理策略
log.cleanup.policy = delete,compact
delete-日志删除;compact-日志压缩
kafka同时支持两种清理策略
1、日志删除
属于粗粒度的清理,直接删除segment
1.1 三种策略
1.2 配置
// 时间
log.retention.hours=168 //7d
// 大小
log.retention.bytes= 1073741824 //1G
log.retention.check.interval.ms=300000 //5min
log.segment.bytes=1073741824 //1G
log.cleaner.delete.retention.ms=86400000 // 1d
log.cleaner.backoff.ms=15000 //15s
配置含义:
- segment的大小为1GB,每5分钟检查一次是否有segment已经过了7d,如果有将其标记为.deleted(0.index.deleted)。
- 清理线程会每隔15秒检查一次,是否有标记为deleted的segment的保留时间超过一天了,如果有将其从文件系统删除
若多种策略同时配置,谁先到达配置阈值,就用哪种策略
2、日志压缩(Log Compaction)
属于细粒度的清理,以key为粒度的清理
使用场景:kafka异常恢复、只关心最新数据
2.1 效果
根据key只保留最新的value,如果value为null,该key一段时间后会被清理
2.2 选择合适的文件
根据日志污浊率限定清理范围
log.cleaner.min.cleanable.ratio(默认值为0.5)
dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes) 日志的污浊率 = dirty部分的日志占用大小 / (clean部分的日志占用大小+dirty部分的日志占用大小)
2.3 流程
- 1、压缩过程中将需要保留的消息拷贝到一个.clean为后缀的临时文件中(0000.log.clean)
- 2、压缩后将.clean文件修改为.swap文件(0000.log.clean->0000.log.swap)
- 3、删除原有log文件,将.swap后缀删除(0000.log.swap->0000.log)
只以log为例,压缩方式是针对整个segment(.index .log)
2.4 原理
-
0、每个broker会启动log.cleaner.thread(默认值为1)个线程负责执行清理任务
- 检查cleaner-offset-checkpoint文件
- firstDirtyOffset:每次清理的起点,清理完成后会更新
- 根据firstDirtyOffset将log文件分成clean(清理)、dirty(未清理)两部分
-
1、第一遍把消息的每个key的哈希值和最后出现的offset存储在SkimpyOffsetMap(哈希表)中 -
2、第二遍检查消息是否符合保留条件,将不符合的删除
- 如果value不为null,直接保留最新
- 如果value为null(墓碑消息tombstone,默认24h)
- 先常规清理——消息会保留一段时间,consumer能获取到该消息,且发现value被删除
- 一段时间后,key会被删除,consumer也无法获取
- (为后续压缩打标记)
如果要删除一个key,put一个null就行
2.5 重新分组
因为压缩后的各个segment都会变小,需要重新分组来重写log和index
将多个小的文件进行合并
分组规则
- log大小之和为1g,index文件之和为10m(默认情况)
- 清理墓碑消息、保留最新value
2.6 真正的清理过程
清理过程一共分为3步
- step1、遍历所有需要清理的segment,开始清理日志,生成新的segment
- step2、清除墓碑消息,并将1中的多个小的segment进行merge
- step3、merge step2生成的segment,重写log和index
注意:日志压缩、日志删除都不包含activeSegment(正在写入的segment)
|