<Kafka核心技术与实战>学习笔记 -- 客户端实践 & 原理剖析
生产者消息分区机制
使用 Apache Kafka 生产和消费消息的时候,肯定是希望能够将数据均匀 地分配到所有服务器上
为什么分区?
Kafka 的主题(Topic): 承载真实数据的逻辑容器 在主题之下还分为若干个分区 Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息
主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份
Kafka 的三级结构图
为什么使用分区的概念而不是直接使用多个主题呢?
其实分区的作用就是提供负载均衡的能力 或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)
不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理 并且,还可以通过添加新的节点机器来增加整体系统的吞吐量
多个分区允许多个consumer同时消费 多个broker可以组成集群,提高性能,扩展性,以及可用性
分区的概念以及分区数据库早在 1980 年就已经有人在做了,比如那时候有个叫 Teradata 的数据库就引入了分区的概念 不同的分布式系统对分区的叫法也不尽相同 在 Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫 Region,在 Cassandra 中又被称作 vnode
从表面看起来它们实现原理可能不尽相同,但对底层分区(Partitioning)的整体思想却从未改变 除了提供负载均衡这种最核心的功能之外,利用分区也可以实现其他一些业务级别的需求,比如实现业务级别的消息顺序 的问题
Kafka 生产者的分区策略
分区策略: 决定生产者将消息发送到哪个分区的算法 Kafka提供默认的分区策略,同时也支持自定义分区策略
自定义分区策略
如果要自定义分区策略,需要显式地配置生产者端的参数partitioner.class
实现org.apache.kafka.clients.producer.Partitioner接口
接口定义了两个方法:`partition()和close()`,通常只需要实现最重要的 partition 方法
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
topic、key、keyBytes、value和valueBytes都属于消息数据
cluster则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)
计算出消息要被发送到哪个分区中
设置partitioner.class参数为实现类的 Full Qualified Name ,那么生产者程序就会按照代码逻辑对消息进行分区
分区策略 – 轮询
Round-robin 策略,即顺序分配 Kafka Java 生产者 API 默认提供的分区策略
轮询策略示意图
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是最常用的分区策略之一
分区策略 – 随机
Randomness 策略, 随意地将消息放置到任意一个分区上
随机策略示意图
随机策略版的 partition 方法
计算出该主题总的分区数,然后随机地返回一个小于它的正整数
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
随机策略是老版本生产者使用的分区策略,表现逊于轮询策略,在新版本中已经改为轮询了
按消息键保序策略
Kafka 允许为每条消息定义消息键,简称为 Key 这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等 也可以用来表征消息元数据 特别是在 Kafka 不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进 Key 里面的 一旦消息被定义了 Key,那么就可以保证同一个 Key 的所有消息都进入到相同的分区 里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示。
实现这个策略的 partition 方法:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
Kafka 默认分区策略
- 如果指定了 Key,那么默认实现按消息键保序策略;
- 如果没有指定 Key,则使用轮询策略
如何实现消息的顺序问题 方案1 : 给 Kafka 主题设置单分区,这样所有的消息都只在这一个分区内读写 这样做虽然实现了因果关系的顺序性,但也丧失了 Kafka 多分区带来的高吞吐量和负载均衡的优势 方案2 : 具有因果关系的消息都有一定的特点,比如在消息体中都封装了固定的标志位,可以对此标志位设定专门的分区策略,保证同一标志位的所有消息都发送到同一分区 这样既可以保证分区内的消息顺序,也可以享受到多分区带来的性能红利
基于个别字段的分区策略本质上就是按消息键保序的思想,其实更加合适的做法是把标志位数据提取出来统一放到 Key 中,这样更加符合 Kafka 的设计思想
其他分区策略
基于地理位置的分区策略 一般只针对那些大规模的 Kafka 集群,特别是跨城市、跨国家甚至是跨大洲的集群
假设所有服务都部署在北京的一个机房, 考虑在广州再创建一个机房 从两个机房中选取一部分机器共同组成一个大的 Kafka 集群 这个集群中有一部分机器在北京,另外一部分机器在广州
假设要为每个新注册用户提供一份注册礼品,南方的用户注册可以免费得到一碗“甜豆腐脑”,北方的新注册用户可以得到一碗“咸豆腐脑” 如果用 Kafka 来实现则很简单,只需要创建一个双分区的主题 ,然后再创建两个消费者程序分别处理南北方注册用户逻辑 即可
但问题是需要把南北方注册用户的注册消息正确地发送到位于南北方的不同机房 中,因为处理这些消息的消费者程序只可能在某一个机房中启动着
此时就可以根据 Broker 所在的 IP 地址实现定制化的分区策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream()
.filter(p -> isSouth(p.leader().host()))
.map(PartitionInfo::partition).findAny()
.get();
可以从所有分区中找出那些 Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送
小结
切记分区 是实现负载均衡 以及高吞吐量 的关键 在生产者这一端就要仔细盘算合适的分区策略,避免造成消息数据的“倾斜”,使得某些分区成为性能瓶颈,这样极易引发下游数据消费的性能下降
生产者压缩算法
压缩(compression) 用时间去换空间的经典 trade-off 思想 用 CPU 时间去换磁盘空间或网络 I/O 传输量,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输
怎么压缩
Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的
不论是哪个版本,Kafka 的消息层次 都分为两层:消息集合(message set)以及消息(message) 一个消息集合中包含若干条日志项(record item) ,而日志项才是真正封装消息的地方 Kafka 底层的消息日志由一系列消息集合日志 项组成 Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作
V2 版本主要是针对 V1 版本的一些弊端做了修正
消息的公共部分抽取出来放到外层消息集合 里面,不用每条消息都保存这些信息- 保存压缩消息的方法发生了变化
V1: 把多条消息进行压缩然后保存到外层消息的消息体字段中 V2: 对整个消息集合进行压缩
在相同条件下, 不论是否启用压缩,V2 版本都比 V1 版本节省磁盘空间 当启用压缩时,这种节省空间的效果更加明显
何时压缩
在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端
生产者程序中配置 compression.type 参数 即表示启用指定类型的压缩算法 。比如下面这段程序代码展示了如何构建一个开启 GZIP 的 Producer 对象:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存 两种 Broker 会重新压缩消息的情况
- Broker 端指定了和 Producer 端不同的压缩算法
Broker 端也有一个参数叫 compression.type, 默认值是 producer,表示 Broker 端会“尊重”Producer 端使用的压缩算法 如果在 Broker 端设置了不同的 compression.type 值,可能会发生预料之外的压缩 / 解压缩操作,通常表现为 Broker 端 CPU 使用率飙升 - Broker 端发生了消息格式转换
Kafka 集群中同时保存多种版本的消息格式, 为了兼容老版本的格式,Broker 端会对新版本消息执行向老版本格式的转换, 这个过程中会涉及消息的解压缩和重新压缩, 还会让 Kafka 丧失 Zero Copy 特性
Zero Copy零拷贝 当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝,从而实现快速的数据传输
何时解压缩
producer压缩,broker保持,consumer解压缩 使用的压缩算法信息被封装进消息集合
除了在 Consumer 端解压缩,Broker 端也会进行解压缩 每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证 对 Broker 端性能是有一定影响的,特别是对 CPU 的使用率而言
各种压缩算法对比
在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4 从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd) – Facebook 开源的一个压缩算法,能够提供超高的压缩比(compression ratio)
Facebook Zstandard 官网提供的一份压缩算法 benchmark 比较结果 zstd 算法有着最高的压缩比,而在吞吐量上的表现只能说中规中矩 LZ4 算法,它在吞吐量方面表现优异
对于 Kafka 而言,性能测试结果
- 在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP
- 而在压缩比方面,zstd > LZ4 > GZIP > Snappy
使用 Snappy 算法占用的网络带宽最多,zstd 最少,这是合理的,毕竟 zstd 就是要提供超高的压缩比 在 CPU 使用率方面,各个算法表现得差不多,只是在压缩时 Snappy 算法使用的 CPU 较多一些,而在解压缩时 GZIP 算法则可能使用更多的 CPU
实践
启用压缩的条件:
- Producer 程序运行机器上的 CPU 资源要很充足
- 环境中带宽资源有限
解压缩: 尽量保证不要出现消息格式转换的情况
小结
无消息丢失配置
Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证
- 已提交的消息: Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交
- 有限度的持久化保证: 假如消息保存在 N 个 Kafka Broker 上,这 N 个 Broker 中至少有 1 个存活, Kafka 就能保证这条消息永远不会丢失
消息丢失 – 生产者程序丢失数据
Kafka Producer 是异步发送消息的 producer.send(msg) fire and forget 调用后不管结果
不成功的原因:
- 网络抖动, 消息压根就没有发送到 Broker 端
- 消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等
Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg) ,而要使用 producer.send(msg, callback) 。
瞬时错误,那么仅仅让 Producer 重试就可以了; 如果是消息不合格造成的,那么可以调整消息格式后再次发送
消息丢失 – 消费者程序丢失数据
Consumer 程序 – 位移: 这个 Consumer 当前消费到的 Topic 分区的位置 如图, Consumer A 的位移值就是 9;Consumer B 的位移值是 11 保证消息不丢失的办法:
- 先消费消息(阅读)
- 再更新位移(书签)
可能会带来消息的重复处理
丢失消息的情况: Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移 假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了 因此这条消息对于 Consumer 而言实际上是丢失了
解决方案: 如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移
注意: 单个 Consumer 程序使用多线程来消费消息说起来容易,写成代码却异常困难 因为很难正确地处理位移的更新 ,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况
Kafka 无消息丢失的配置实践
- 不要使用 producer.send(msg),而要使用
producer.send(msg, callback) 。记住,一定要使用带有回调通知的 send 方法
Producer 端参数:
- 设置
acks = all 。acks 参数代表了对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交” 。这是最高等级的“已提交”定义 - 设置 retries 为一个较大的值。这里的 retries 对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够
自动重试消息发送,避免消息丢失
Broker 端参数:
- 设置
unclean.leader.election.enable = false ,控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生 - 设置
replication.factor >= 3 ,表述的是最好将消息多保存几份,毕竟目前防止消息丢失 的主要机制就是冗余 - 设置
min.insync.replicas > 1 ,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性 。在实际环境中千万不要使用默认值 1
- 确保
replication.factor > min.insync.replicas 。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1 - 确保
消息消费完成再提交 。Consumer 端有个参数 enable.auto.commit ,最好把它设置成 false,并采用手动提交位移的方式 。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的
第二条与第六条是否冲突? 其实不冲突。如果ISR中只有1个副本了,acks=all也就相当于acks=1了,引入min.insync.replicas 的目的就是为了做一个下限的限制: 不能只满足于ISR全部写入,还要保证ISR中的写入个数不少于min.insync.replicas
小结
开放讨论
Kafka 中特别隐秘的消息丢失场景:增加主题分区 当增加主题分区后,在某段“不凑巧”的时间间隔后,Producer 先于 Consumer 感知到新增加的分区,而 Consumer 设置的是“从最新位移处”开始读取消息,因此在 Consumer 感知到新分区前,Producer 发送的这些消息就全部“丢失”了,或者说 Consumer 无法读取到这些消息 这个小缺陷有什么解决的办法吗?
A: 新建分区丢失是因为没有offset就从lastest开始读取,可以改成没有offset的时候从ealiest读取应该就可以了
单个 Consumer 程序使用多线程来消费消息 Java 实践
【原创】Kafka Consumer多线程实例续篇
Kafka 拦截器
拦截器基本思想: 允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件 处理逻辑链。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑
Spring MVC 拦截器的工作原理 拦截器 1 和拦截器 2 分别在请求发送之前、发送之后以及完成之后三个地方插入了对应的处理逻辑 而 Flume 中的拦截器也是同理,它们插入的逻辑可以是修改待发送的消息,也可以是创建新的消息,甚至是丢弃消息 这些功能都是以配置拦截器类 的方式动态插入到应用程序中的,故可以快速地切换不同的拦截器而不影响主程序逻辑
Kafka 拦截器借鉴了这样的设计思路。可以在消息处理的前后多个时点动态植入不同的处理逻辑,比如在消息发送前或者在消息被消费后
拦截器detail
生产者拦截器 允许在发送消息前以及消息提交成功后植入你的拦截器逻辑; 而消费者拦截器 支持在消费消息前以及提交位移后编写特定逻辑
这两种拦截器都支持链的方式,即可以将一组拦截器串连成一个大的拦截器 ,Kafka 会按照添加顺序依次执行拦截器逻辑
在 Producer 端指定拦截器
生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor");
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……
Producer 端拦截器实现类都要继承 org.apache.kafka.clients.producer.ProducerInterceptor 接口
实现两个方法:
1. onSend:该方法会在消息发送之前被调用
2. onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用
与发送回调通知 callback 相比, onAcknowledgement 的调用要早于 callback 的调用
onAcknowledgement 方法和 onSend 不是在同一个线程中被调用的,因此如果要在这两个方法中调用了某个共享可变对象,一定要保证线程安全
onAcknowledgement 方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则 Producer 的 TPS 会直线下降
消费者拦截器也是同样的方法,只是具体的实现类要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口
1. onConsume:该方法在消息返回给 Consumer 程序之前调用
2. onCommit:Consumer 在提交位移之后调用该方法。通常可以在该方法中做一些记账类的动作,比如打日志等。
指定拦截器类时要指定它们的全限定名,即 full qualified name,并且还要保证你的 Producer 程序能够正确加载你的拦截器类。
典型应用场景
Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计 等多种功能在内的场景
端到端系统性能检测
Kafka 默认提供的监控指标都是针对单个客户端或 Broker 的,很难从具体的消息维度去追踪集群间消息的流转路径 同时,如何监控一条消息从生产到最后消费的端到端延时 也是很多 Kafka 用户迫切需要解决的问题
方案1: 在客户端程序中增加统计逻辑 在应用代码中编写统一的监控逻辑其实是很难的 监控逻辑与主业务逻辑耦合也是软件工程中不提倡的做法
方案2: Kafka 拦截器 可插拔的机制 √
消息审计
设想公司把 Kafka 作为一个私有云消息引擎平台向全公司提供服务,这必然要涉及多租户以及消息审计的功能 作为私有云的 PaaS 提供方,肯定要能够随时查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费 一个可行的做法就是编写一个拦截器类,实现相应的消息审计逻辑 ,然后强行规定所有接入 Kafka 服务的客户端程序必须设置该拦截器
案例分析 – 编写拦截器类来统计消息端到端处理的延时
计算总延时 需要让生产者和消费者程序都能访问 假设保存在redis中
生产者端拦截器实现
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
private Jedis jedis;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
jedis.incr("totalSentMessage");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
消费者端的拦截器实现
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Jedis jedis;
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
for (ConsumerRecord<String, String> record : records) {
lantency += (System.currentTimeMillis() - record.timestamp());
}
jedis.incrBy("totalLatency", lantency);
long totalLatency = Long.parseLong(jedis.get("totalLatency"));
long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
小结
讨论
Producer 拦截器 onSend 方法的签名如下:
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
如果return null 会怎么办?
onSend传null会在KafkaProducer类中调用doSend时引发NPE (NullPointException),并通过 ProducerInterceptors.onSendError 方法传导至onAcknowledgement,以及throw到用户编写的Producer中
Java生产者是如何管理TCP连接的?
幂等生产者和事务生产者是一回事吗?
消费者组到底是什么?
|