一、Kafka生产者
1.生产者消息发送流程
1.1 发送原理简单介绍
在消息发送的过程中,涉及到了两个线程,一个是main线程(是调用send方法的主程序),一个是sender线程。mian线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker中。 在主线程中调用send方法向Kafka中发送消息,经过拦截器、序列化器和分区器,然后将数据缓存到了消息累加器RecordAccumulator中。 Sender线程负责从消息累加器RecordAccumulator中获取消息并将消息发送到Kafka中。
1.2 RecordAccumulator理解
RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认值为33554432B,即32M。如果生产者发送消息的速度超过发送到服务器的速度,会导致生产者空间不足,这个时候send方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms 的配置,此参数的默认值为60000,即60秒。
主线程调用send发送的消息都会被迫加到RecordAccumulator中的某个双端队列(Deque)中(内存池分配内存实现队列的创建和释放),RecordAccumulator内部为topic每个分区都维护了一个双端队列,消息写入缓存时,追加到双端队列的尾部。Sender读取消息时,从双端队列的头部读取。
发送的数据大小是ProducerBatch决定的,代表是消息批次的大小,默认为16K。
当一条消息进入RecordAccumulator时,会先寻找与消息分区对应的双端队列(如果没有就去新建)。
1.3 NetworkClient理解
sender从RecordAccumulator中获取到数据后将消息保存到InFlightRequests中,保存的主要格式为Map<Nodeld,Deque<request>> ,主要作用是缓存了已经发出去但还没有收到服务端响应的请求,Nodeld是节点id编号。 InFlightRequests中默认限制每个连接最多可以缓存5个未响应的请求,参数为max.in.flight.request.per.connextion ,超过这个数值后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应。 底层是通过Deque的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息。
1.4 发送原理再整理
如上图所示: 1)第一步,主线程main方法定义的KafkaProducer对象调用send方法向Kafka发送数据,数据经过拦截器、序列化器、分区器到达消息累加器RecordAccumulator中。 2)消息累加器RecordAccumulator内部为每个分区都维护了一个双端队列,到达消息累加器的数据根据分区器计算出自己应该去的分区,然后去相应的双端队列。 3)在双端队列的数据量到达batch.size时或者时间到达了linger.ms时,sender从消息累加器中拉取消息,此时数据格式为<分区,Deque<Producer Batch>> 4)sender拉取消息后,在NetorkClient中创建消息请求队列,队列中保存的是发送数据的请求,它为每一个node节点都保存这样一个队列,此时数据格式为Map<NodeId,Deque<request>> 。 5)sender将相应的请求写入到队列中,然后将这次请求的数据通过Selector发送给Kafka 6)如果发送成功,Kafka接收到数据后经过Selector返回一个确认收到,在NetorkClient中把相应的请求队列的请求删除,然后再消息累加器中把对应的消息清理掉。 7)如果发送失败,会进行重试发送的操作
2.生产者重要参数列表
参数名称 | 描述 |
---|
bootstrap.servers | 生产者连接集群所需的 broker 地 址 清 单 。 例 如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker 信息 | key.serializer 和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名 | buffer.memory | RecordAccumulator 缓冲区总大小,默认 32m | batch.size | 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加 | linger.ms | 如果数据迟迟未达到 batch.size,sender 等待linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间 | acks | 0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader 收到数据后应答。-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的 | max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字 | retries | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了 | retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms | enable.idempotence | 是否开启幂等性,默认 true,开启幂等性 | compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。 |
3.异步发送API
3.1 普通异步发送
异步发送可以发送一条数据(时间到了,但是大小还没达到),也可以发送多条数据(达到了容量大小),特点是不需要等待broker响应(就是不需要等待leader返回的ack),就可以接着发送。之前的发送原理图中介绍的就是异步发送。 代码如下:
public class CustomProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","ssl "+i));
}
kafkaProducer.close();
}
}
3.2 带回调函数的异步发送
普通异步发送只管发送数据,不管发送数据后的结果。 这里的异步发送带了回调函数,在producer收到了leader返回的ack后将调用这个回调函数。 回调函数中的两个参数,一个是元数据信息RecordMetadata,可以用来获取分区信息等;一个是异常信息Exception,如果为null,说明消息发送成功,如果不为null,说明消息发送失败。 代码如下:
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "ssl " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e==null){
System.out.println(" 主题: " + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
}else{
e.printStackTrace();
}
}
});
}
4.同步发送API
同步发送就是逐条发送。这时请求队列InFlightRequest中永远最多只有一个请求。 这也就相当于在异步模式下,设置参数:max.in.flight.requests.per.connection=1 & batch.size=1 ,这样的效果也是逐条发送,当收到leader返回的ack信息后,才能去发送下一条数据。在收到ack消息前,生产者一直处于阻塞状态。 代码如下:
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","ssl "+i)).get();
}
5.生产者分区
5.1 分区的好处
好处一: 便于合理使用存储资源,每个partition在不同的节点上存储。合理控制分区的任务,可以实现负载均衡的效果。 例如有三个节点,节点一有1T大小,节点二和节点三都是10T大小,为了合理利用资源可以设计一定的分区方式,将更多的数据存放到节点二和节点三种,以便达到更好的资源利用效果。 好处二: 可以提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位消费数据,大大提高了吞吐量。
5.2 生产者分区策略
1)默认的分区器DefaultPartitioner 在IDEA中ctrl+n,全局查找DefaultPartitioner,可以看到: 如果指定了一个分区,就使用指定的分区。 如果没有指定分区,但是声明了key的值,就用key的hashcode对分区个数取余数作为分区。 如果既没有指定分区也没有声明key,就选择一个粘性分区。 在IDEA中ctrl+n,全局查找ProducerRecord类,可以看到类中有以下构造方法: 重点说明一下粘性分区: 在第一次的时候Kafka会先随机选择一个分区并尽可能的使用这个分区,等到这个这个分区的batch.size已满或者达到超时时间后,Kafka会再随机选择一个分区进行使用(如果随机到的分区和上一个使用的分区一致,那么继续随机知道不一致为止)
5.3 生产者自定义分区器
在开发中,如果需要对数据进行清洗、过滤操作,可以考虑进行自定义分区器。
实现步骤: 1)定义类实现Partitioner接口 2)重写类中的partition()方法
实现需求: 实现一个自定义分区器,发送过来的数据如果包含ssl ,就发往0号分区,不包含ssl ,就发送1号分区
自定义分区器代码如下:
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String s = value.toString();
int partition;
if(s.contains("ssl")){
partition=0;
}else{
partition=1;
}
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
在KafkaProducer的配置文件中加入对应配置即可生效:
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.ssl.producer.MyPartitioner");
6.生产经验—提高吞吐量
6.1 提高吞吐量
影响吞吐量的参数有以下几个: 1)batch.size 一批次数据量的大小,默认为16K。更大的batch.size可以将更多的消息封装进同一个请求,从而减少总请求数。 2)linger.ms 等待发送时间,默认为0ms,调整为5-100ms。更大的linger.ms使producer等待更长的时间才发送消息。这样就能缓存更多的消息填满batch,提升吞吐量。但是可能会增加消息的延时。 3)compression.type 采用消息压缩算法,默认为none不采用。对消息进行压缩可以极大的减少网络传输量,从而可以提高吞吐量。 4)RecordAccumulator 这里说的是RecordAccumulator的缓冲区大小buffer.memory ,默认大小为32M。如果生产消息的速度远大于发送消息的速度,缓冲区很快就会被写满消息,此时producer立即进入阻塞状态。一旦阻塞时间超过最大时间max.block.ms 默认为60秒,就会抛出超时异常,所以可以适当调大该参数,调大至64M.
在代码中设置:
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16*1024);
properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,1024*1024*64);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
7.生产经验—数据可靠性
7.1 数据可靠性ack
这里说的数据可靠性指的是ack这个参数。 ack有以下三种: 1)ack为0 ack为0时,生产者发送完数据不需要等到Leader的应答。此时如果在发送完数据后,Leader完成同步数据之前Leader发生了故障,那么会发生丢数据的情况。 2)ack为1 ack为1时,Leader接收完所有数据后给producer返回一个确认收到。之后Follower会从Leader中同步数据,如果此时Leader挂掉了,那么会从Follower中选出新的Leader,因为没有同步完成,所以也会出现数据丢失的情况。 3)ack为-1或为all 此时需要leader和ISR队列里面的所有节点都接收完数据后再进行应答,此时的数据可靠性最高。
7.2 ISR理解
Leader维护了一个动态的in-sync replica set(ISR) ,意思是和Leader保持同步的Follower+Leader集合。
ISR中存放的都是所有的能够跟Leader保持同步数据的副本的集合,如果出现以下情况就要将这个副本踢出ISR: 1)超过延迟时间。某个Followere长时间未向Leader发送通信请求或者同步数据,该时间由参数replica.lag.time.max.ms 确定。 2)超过延迟条数。如果某个Follower和Leader的数据条数超过了一定阈值,就将该Follower踢出ISR。但是在3.0版本中被弃用,因为可能出现频繁的踢出和进入ISR,需要经常跟zookeeper通信,浪费集群资源。
注意:Leader也在ISR里
7.3 可靠性总结
条件 | 效果 |
---|
acks=0 | 可靠性差,效率高 | acks=1 | 可靠性中等,效率中等 | acks=-1 | 可靠性最高,效率最低 |
这里acks=-1 时,如果设置副本个数为1或者设置ISR里最小应答的副本数量为1(默认为1),和ack=1的效果是一样的,仍然有丢数的风险,因为最小应答副本为1时,如果Leader挂了,就没有第二个拥有全部数据的Follower了。 因此,数据完全可靠性条件是:ack=-1 + 副本数量大于等于2 + ISR里应答的最小副本数量大于等于2
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别苏剧;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
代码设置:
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,3);
8.生产经验—数据去重
8.1 数据重复分析
如下图所示,当所有副本都同步数据完成之后,Leader准备发送ack时突然挂了,就会从ISR里的Follower中选出一个新的Leader,因为没有收到ack,所以还会重新发送一次数据,但是实际上已经接收过一次数据了,所以会造成数据重复。
8.2 三种语义
1)最多一次(At Least Once) 最多一次指的是Kafka最多收到数据了一次。把ack设置为0时,可以保证至少有一次Leader接收到了发送的数据,那么以后就不会在发了。 2)至少一次(At Most Once) 至少一次指的是Kafka至少能够收到数据一次,当然还有可能会收到同样的数据多次。ack=-1 + 副本数量大于等于2 + ISR里应答的最小副本数量大于等于2 可以保证至少一次。 3)总结 至少一次可以保证数据不丢失,但是不能保证数据不重复。 至多一次可以保证数据不重复,但是不能保证数据不丢失。
很多时候,我们要求可靠性级别达到精确一次(Exactly Once),要求数据既不能丢时也不能重复。
8.3 幂等性
幂等性是指Producer无论向Broker发送多少次重复的数据,Broker端都只会持久化一条,保证了不重复。 所以,精确一次 = 幂等性 + 至少一次(ack=-1 + 分区副本数>=2 + ISR最小应答副本数量>=2)
Kafka判断数据是否重复的标准: 数据被提交后,它会产生<PID,Partition,SeqNumber> 这样的一条信息,当这信息中三部分和之前某个信息都相等时,Broker只会持久化一条。 PID是Kafka每次重启后生成的新的ID号;Partition代表分区号;Sequence Number是单调自增的。
所以可以看出来幂等性只能保证在一次会话中的一个分区内不会有重复的数据。
开启幂等性(默认为开启状态):enable.idempotence设置为true
8.4 生产者事务
9.生产经验—数据乱序
产生原因: 在Kafka中,单个分区内消息是有序的,但是分区和分区间是无序的,因为一个Topic被分为了多个分区,一个消费者可以消费多个分区,而消费的速度是不可知的,所以对于一个消费者来说可能会消费无序的数据。
如何保证单分区有序: 1)开启幂等性下 设置max.in.flight.requests.per.connection 的值小于等于5,这个参数是发送数据的请求个数。在开启幂等性的条件下,设置这个参数小于等于5后,Kafka集群服务端会缓存Producer发来的最多最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。如果设置大于5个请求,因为最多保留5个,所以最终可能会导致乱序情况的发生。 整个流程: request1发送成功,根据序列化id自增可知它是第一个,所以直接输出。 request2发送成功,根据序列化id自增可知它是第二个,所以直接输出。 request3发送失败,进行重试。 request4发送成功,根据序列化id自增可知它前面有一个数据没有发送成功,因此先不输出,缓存起来。 request5发送成功,根据序列化id自增可知它前面有数据没有发送成功,因此先不输出,缓存起来。 request3重试成功,根据序列化id自增可知该它输出了,因此输出。 最后request4和request5依次输出。 2)不开启幂等性 设置max.in.flight.requests.per.connection 的值为1,此时要求只能最多有一个发送请求,前一个发送完成才能发送后一个,如果发送失败,就会去重试而不会让后面的发送,所以肯定时有序的。
|