思想
Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。清理策略针对的是过期的segment文件,而不是某条过期的数据。可以单独针对某topic配置,也可针对kafka集群配置(config/server.properties)。策略分三种:基于时间,基于日志文件大小,基于日志文件起始偏移量。为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。
topic级别修改命令
修改单个topic清理策略:bin/kafka-configs.sh --zookeeper ZK_IP:2181 --describe --entity-type topics --entity-name test
删除topic:bin/kafka-topics.sh --delete --topic test --zookeeper ZK_IP:2181
保留topic,删除数据,通过修改数据保留时间实现(如下:保留10s):bin/kafka-configs.sh --zookeeper ZK_IP:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=10000
查看topic清理策略:bin/kafka-configs.sh --zookeeper ZK_IP:2181 --describe --entity-type topics --entity-name test
删除策略:bin/kafka-configs.sh --zookeeper ZK_IP:2181 --entity-type topics --entity-name test --alter --delete-config retention.ms
配置文件级别删除
通过log.cleanup.policy,分为删除和压缩。
删除策略
当日志达到log.segment.bytes大小,会创建新的segment。当segment超过log.segment.bytes或保留时长达到log.retention.hours,就会被清理掉。
属性名 | 含义 | 默认值 |
---|
log.cleanup.polict | 日志清理保存策略,delete/compact | delete | log.retention.hours | 日志保存时间,可以选择hours,minutes和ms | 168(7day) | log.retention.bytes | 删除前日志文件允许保存的最大值 | -1 | log.segment.delete.delay.ms | 日志文件被真正删除前的保留时间 | 60000(1min) | log.cleanup.interval.mins | 清理工作执行时间间隔 | 10 | log.retention.check.interval.ms | 清理工作执行时间间隔(新版本) | 300000 |
达到清理条件的日志文件,进行“delete”标注,文件无法被索引到。log.segment.delete.delay.ms这个时间后,文件才会被真正的从文件系统中删除。
配置server.properties,添加删除策略:
# 默认delete策略
# 日志保留时间
log.retention.hours=168
# 超过此大小的segment被删除,默认1G
#log.retention.bytes=1073741824
# 达到此大小,创建一个新的segment,默认1G
log.segment.bytes=1073741824
# 清理工作执行的间隔
log.retention.check.interval.ms=300000
刷新策略
属性名 | 含义 | 默认值 |
---|
log.flush.interval.messages | 消息达到该条时,将数据写入到日志文件 | 10000 | log.flush.interval.ms | 当达到该时间间隔时,强制执行一次flush | null | log.flush.scheduler.interval.ms | 周期性检查,是否需要将消息flush | - |
分段策略属性
属性名 | 含义 | 默认值 |
---|
log.roll.{hours,ms} | 日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment | 168(7day) | log.segment.bytes | 每个segment的最大容量。到达指定容量时,将强制生成一个新的segment | 1G(-1表示不限制) | log.retention.check.interval.ms | 日志片段文件检查的周期时间 | 60000 |
压缩
将数据压缩,只保留每个key最后一个版本的数据,offset可能不连续。
#启用cleaner,默认是关闭的
log.cleaner.enable=true
log.cleanup.policy=compact
这种策略只适合特殊场景,比如消息的key是用户ID,消息体是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。压缩策略支持删除,当某个Key的最新版本的消息没有内容时,这个Key将被删除,这也符合以上逻辑。
据起始偏移量删除
日志配置
修改日志级别为INFO,防止长时间产生过多日志。将config/log4j.properties中必要选项改为INFO。 修改日志(server.log, controller.log, state-change.log等)存放位置,放置长时间产生日志占满磁盘。可以将日志目录配置到更大空间的分区盘。可通过更改kafka-run-class.sh中的LOG_DIR变量值实现。
参考
https://blog.csdn.net/shykevin/article/details/90103364. http://www.openskill.cn/article/550. https://blog.csdn.net/chenyixin121738/article/details/109590355. https://blog.csdn.net/u013256816/article/details/80418297.
|