Producer概览
Producer SDK
Producer就是负责向Kafka集群中写入消息数据的应用程序,自 Kafka 0.9 版本提供了Java版本的Producer SDK供用户使用,
Kafka官方支持的语言SDK较少,更多都是由第三方社区维护的SDK,如果需要使用对应语言的SDK,需要额外下载,
第三方库信息地址:docs.confluent.io/platform/cu…
Producer 通信协议
Kafka自己内部封装了一套二进制通信协议,用户可使用任意语言按照该协议进行编程和消息发送,官方提供的SDK和第三方库都是基于该协议实现的,除了易用性和性能方面没有其他的差别,
kafka为不同的协议类型定义了各自的紧凑的二进制字节数组格式,通过Socket发送给Broker,之后等待Broker处理完成返回响应(Response),
由于是自定义的二进制格式,消息的收发将不用依赖任何外部序列化框架,显得更加轻量级以及具有良好的拓展性。
Producer 要做的事情
Producer 在功能实现上要比 Consumer 消费者来的简单,因为每个 Producer 都是独立工作的,它们之间没有交集,不会涉及到复杂的组管理操作。
1. 使用分区器(partitioner)向分区发送消息
Producer首要功能是向topic的分区中发送消息,每个topic都是有若干个分区的,因此Producer需要确定将该消息到底发向何处,这个确认过程就是 partitioner 分区器要实现的功能了。
默认分区器
Kafka Producer 默认提供了一个分区器,对于每条消息,如果该消息存在key,那么该 partitioner 将根据key的哈希值选择目标分区;如果该消息没有key,该 partitioner 则使用轮询的方式确认目标分区(保证分区上消息的均匀性)。
自定义分区器
用户也可以实现自定义的分区策略,而非使用默认的 partitioner,可以根据自身的业务需求使用不同的分区策略,将消息发送到合适的分区上
跳过分区器
Producer的API允许用户跳过分区器直接指定消息该发送到哪个分区上。
2. 寻找分区leader
确定目标分区后,Producer需要找到该分区对应的副本leader,只有副本leader才能响应Producer发送的请求,其余的副本将与副本leader保持数据同步,即ISR,
在Producer发送消息时,对Broker的响应有多种选择,是要求Broker不等待任意副本写入成功便返回响应成功,还是副本leader写入成功后再返回响应成功,以及其他等等。
Java版Producer发送消息的过程
Java版本的Producer SDK是官方提供的,并且使用最为广泛,其工作原理过程如下:
-
Producer在用户线程中将消息封装成一个ProducerRecord类实例,将其序列化后发送给 partitioner -
partitioner确定目标分区后将其发送到位于Producer程序中的一块内存缓冲区中 -
Producer内部的Sender线程(IO发送线程),负责实时从缓冲区中提取就绪的消息封装进一个批次(batch),发送给对应的broker
Java版Producer开发
Producer的开发步骤如下:
1. 构造Properties配置对象
Properties(java.util.Properties)用于配置Producer对象访问broker的参数配置,其中有三个参数是必须要指定的:
-
bootstrap.servers 该参数用于指定borker服务器地址,多个地址之间用,分隔,如果broker集群很多,也不用全部都指定,producer会根据配置的borker发现全部的broker, 之所以要指定多个,是方便故障转移使用,即使 bootstrap.servers 中的某一台挂了,producer 也可通过其他的地址接入 kafka 集群, 因为kafka内部采用FQDN(Fully Qualified Domain Name), 因此如果broker端没有显式配置 listeners 使用IP地址,最好 bootstrap.servers 参数中的地址配置为主机名,而非IP地址。 -
key.serializer 发送到broker端的任何消息格式必须都是字节数组,因此发送消息前,必须将消息进行序列化,再进行发送, key.serializer参数是为了消息key的序列化使用的,即指定key的序列化方式,该参数的值必须是 org.apache.kafka.common.serialization.Serializer 接口的实现类, kafka提供了 org.apache.kafka.common.serialization.StringSerializer 类,该类会将字符串类型转换为字节数组,一般消息的key都是String类型,因此可以直接使用该类作为属性值, 当然用户也可以自定义序列化器,只要实现 Serializer 接口就好。 即使Producer程序发送消息时,不指定key,这个参数也必须要配置,否则会抛出 ConfigException 异常,提示 key.serializer 无默认值,需要配置。 -
value.serializer 与上面的 key.serializer 一样,只不过 value.serializer 是针对消息体部分做序列化的。
序列化方式的值必须是全限定类型,否则将会抛出错误。
2. 构造Producer对象
根据上面提到的三个必要属性,即可创建出一个Producer对象
// 参数
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// new一个Producer对象
Producer<String, String> producer = new KafkaProducer<>(props);
复制代码
也可简化为
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
Producer<String, String> producer = new KafkaProducer<>(props,
new StringSerializer(), new StringSerializer());
复制代码
3. 构造ProducerRecord对象
Producer创建完毕后,就可以发送消息了,因此我们需要构造一个消息(ProducerRecord)对象出来。
new ProducerRecord<>(topic, key, value)
复制代码
ProducerRecord的构造函数中,可指定topic,key,value几个关键参数,也支持指定消息发送的分区、消息时间戳等,
消息时间戳最好不要随意指定,由kafka自行指定比较稳妥,因为时间戳索引文件中的索引项是严格按照时间戳顺序排列的,如果在producer端随意指定时间戳,
会导致消息的时间序列混乱,在使用根据时间戳查询位移的功能时,将无法找到该消息,同时Kafka的消息留存策略也会受到影响。
4. 发送消息
producer发送消息的方法是send,在底层实现了异步化发送,通过Java提供的Future特性实现了同步、异步+回调的两种发送方式,
还有第三种发送方式,被称为fire and forget,即发送后不理会发送结果,这种方式在生产环境下不被推荐使用,因为无法获悉消息是否发送成功。
- 异步发送
onCompletion回调中,如果成功了 exception 为空,否则 metadata 为空,通过if判断知晓消息是否发送成功。
producer.send(new ProducerRecord<>(topic, key, value), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
// 消息发送成功
} else {
// 执行错误逻辑处理
}
}
});
复制代码
- 同步发送
通过 Future.get将一直等待broker将结果返回给Producer,当结果get时,要么返回正常的结果,要么抛出异常交由Producer自行处理,
如果没有异常发生,get将返回RecordMetadata类型的结果,该结果中包含了已发送消息的所有元数据信息,即topic、分区、消息所在分区位移信息等等。
producer.send(new ProducerRecord<>(topic, key, value)).get();
复制代码
- fire and forget
不推荐
producer.send(new ProducerRecord<>(topic, key, value));
复制代码
发送异常
无论是同步还是异步发送,都有可能出现发送失败的异常,发送异常分为两类:可重试异常和不可重试异常。
- 可重试异常
-
LeaderNotAvailableException:分区leader副本不可用,此时通常leader在经历选举 -
NotControllerException:controller不可用,此时通常controller在经历选举 -
NetworkException:网络瞬时故障导致的异常
以上的异常,如果在producer程序中配置了重试次数,那么只要在规定的次数内自行恢复了,错误将不会出现在 onCompletion 的 exception 中,否则会被封装进 exception 中由 producer 自行处理该异常。
所有可重试异常都继承自 org.apache.kafka.common.errors.RetriableException 抽象类,理论上来说,所有没有继承该类的异常都属于不可重试异常,这些异常表示一些严重的错误或kafka无法处理的问题。
- 不可重试异常
以上的异常都无法通过重试来解决,因此会抛给Producer程序自行处理。
由于可重试/不可重试都可能会被封装到exception中,而这两种异常可能需要不同的处理,因此代码可以这么判断:
producer.send(new ProducerRecord<>(topic, key, value), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
// 消息发送成功
} else {
// 执行错误逻辑处理
if (exception instanceof RetriableException) {
// 处理可重试瞬间异常
} else {
// 处理不可重试瞬间异常
}
}
}
});
复制代码
5. 关闭Producer
Producer被创建后,会向系统申请相关的线程、内存、socket等资源,当Producer完成了既定的任务,不再使用后,应当及时关闭。
producer.close();
复制代码
调用普通的无参close方法,producer会等待没有发送完的消息到发送完毕为止,即优雅退出(graceful shutdown),
close方法可指定 timeout 超时时间,即超时后,不再等待消息发送完毕,强行退出,这种方式会丢失所有未发送和未收到应答的消息,要谨慎使用。
Producer主要参数
1. acks
acks参数用于控制Producer发送后的消息的持久性(durability)策略。
当一条消息由Producer发送给topic分区的leader broker时,Producer将等待leader broker返回消息的写入结果以确认消息被成功提交(等待时间具有超时限制),
当Producer确认当前消息成功提交后,将继续发送下一条消息,Kafka保证了Consumer永远不会读取到尚未提交完成的消息。
leader broker何时返回写入结果给Producer是一个问题,其响应结果的快慢将直接影响到Producer端的吞吐量以及消息的持久性质量,
Producer端越快的接收到leader broker响应,其就能越快的发送下一条消息,吞吐量也就越大,acks参数就是用于控制响应速度的,
acks参数的值代表的是在leader broker响应producer前,leader broker必须要确保写入该消息的副本数。
acks取值如下:
-
acks = 0,即Producer不等待leader broker的响应结果,一条消息发送完毕后,立即发送下一条消息,吞吐量最高,但是由于不等待响应结果,所以无法判定消息是否被提交成功,这种状态下Producer不保证消息会被发送成功。 -
acks = all 或者 -1,leader broker将消息写入本地日志,并且等待ISR中的副本们也写入本地日志,再返回结果给Producer,这种情况可保证消息不会丢失,但是吞吐量最低。 -
acks = 1,是上面两种的折中方案,leader broker仅仅将消息写入自己的本地日志后就返回结果,只要leader broker不宕机,该消息将不会丢失,即保证吞吐量,又保证持久性。
acks | producer吞吐量 | 消息持久性 | 使用场景 |
---|
0 | 最高 | 最低 | 允许消息丢失,不关心消息是否发送成功 | 1 | 适中 | 适中 | 一般场景即可 | all或-1 | 最差 | 最高 | 不能容忍消息丢失 |
2. buffer.memeory
该参数指定Producer端用于缓存消息的缓冲区大小,单位是字节,默认是32MB,因为kafka的消息是异步发送,消息会先放到缓冲区中,由另一个sender线程从缓冲区中读取再真正的发送,
如果Producer向缓冲区写消息的速度超过专属IO线程发送消息的速度,将造成缓冲区空间的不断增大,此时Producer会停止工作,等待Sender线程追上来,若一段时间后还没有追上来,将抛出异常提示用户,
如果Producer程序需要向很多分区发送消息,需要合理的设置该参数防止过小的缓冲区降低Producer的整体吞吐量。
3. compression.type
Producer发送消息时,如果针对消息进行压缩,将显著减低网络IO的传输开销以提升整体吞吐量,但同时会增加Producer机器CPU的开销,
如果Broker端的压缩参数与Producer不同,那么Broker在写入消息时,对消息进行解压缩再重压缩的操作也会占用Broker的CPU资源。
kafka支持的压缩算法有 GZIP、Snappy、LZ4、Zstandard 等,根据实际使用经验来看,采用Zstandard算法的效率最高,默认情况下,compression.type 的值为 none,即不压缩消息。
4. retries
Kafka Broker在处理写入请求时,可能由于瞬时故障(瞬时leader选举或网络抖动)导致消息发送失败,这种故障一般可以自行恢复,即使把错误返回给Producer回调函数,回调函数能做的也只有重发,
因此不如在Producer中直接进行重试,retries的值就是重试的次数,默认是0,即不重试,设置大于0的值可以很好的应对瞬时错误,但是重试会带来两个问题:
- 重试造成消息重复发送
由于瞬时网络都懂导致broker端已成功写入消息,但没有成功将结果返回给producer,因此producer将认为消息发送失败,从而开启重试机制,
为了应对这种风险,kafka要求consumer端必须执行去重处理,kafka 自0.11 版本开始支持 精确一次 的处理语义,使用幂等性避免了消息重试时的重复写入。
- 重试造成消息乱序
Producer会将多个发送消息的请求缓存在内存中(默认是5),如果消息发生了重试,则可能造成消息流的乱序,
Producer SDK提供了 max_in_flight_requests_per_connection 参数,将该值设置为1,即同一时刻仅允许发送一个消息请求避免乱序发生,不过该设置会降低性能,该问题也可以通过设置幂等性解决。
重试带来的两个问题其实最终合理也是最简单的解决方法就是开启幂等性 enable.idempotence=true 处理。
幂等性的实现原理大概如下:
关键的两个术语是 Producer ID(即PID)和 Sequence Number。
对于每个PID,该Producer发送消息的每个<Topic, Partition>都对应一个单调递增的Sequence Number,
同样,Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每Commit一条消息时将其对应序号递增。
对于接收的每条消息,如果其序号比Broker维护的序号大 1,则Broker会接受它,否则将其丢弃,
如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,
如果消息序号比Broker维护的序号差值比 1 大,说明中间有数据尚未写入,即乱序,此时Broker拒绝该消息,
消息发送失败后会重试,即保证每个消息都被发送到broker,消费者从 partition 中取出来数据的时候,也一定是有顺序的。
retry.backff.ms
producer对于失败的消息重试会有一个间隔,防止频繁重试消耗系统的资源,该时间由 retry.backff.ms 参数配置,
默认值是100毫秒,一般leader选举是比较常见的瞬时错误,可通过计算平均leader选举时间来合理设定 retries 和 retry.backff.ms 的值。
5. batch.size / linger.ms
producer会将送往同一分区的消息封装到一个batch中,当一个batch中的消息大小达到了batch.size的限制时,会将该batch中的消息批量发送到broker中,
不过producer并不会一直等待消息达到batch.size的大小,不然的话,如果迟迟达不到该量,岂不是陷入死等待了,
因此 linger.ms 就是控制最多等待多久的后发送的参数,该值默认为0,即立即发送不等待。
batch.size的值默认是 16384,即16KB,该值如果过小,一次写入的消息数会很少,将降低producer的吞吐量,
该值如果过大,也会给系统内存带来压力,因为不管是否能填满,producer都会为该batch分配固定大小的内存,因此对batch.size的设置是对时间和空间权衡的体现,可在内存富足的情况下,适当的调大该值,以此得到更高的吞吐量。
对于 linger.ms,默认是0,即立即发送,大多数情况下我们都希望消息被尽快的发送到broker中随后被消费者消费,
但是这样做的方式会降低producer的吞吐量,每次请求积攒的消息越多,吞吐量就越高,因此想提升吞吐量的话,可适当的调整 linger.ms 的值,在这段延时中积攒消息量,一并发送。
6. max.request.size
该参数用于设置一条消息的最大大小,默认值是 1048576字节,如果需要发送的消息内容大于该值,则需要调整该参数,否则会出现异常。
7. request.timeout.ms
当producer将消息发送给broker后,broker需要在规定时间内返回结果给producer,即producer会在一定时间内等待结果,
这个时间就是 request.timeout.ms,默认是30秒,如果超过该时间broker还未返回结果,producer将在回调函数中抛出TimeoutException的异常,
一般30秒足够使用,如果producer的负载较大,可适当调整该值。
消息分区机制
1. 分区策略
producer发送消息过程中需要确定消息发送到指定topic中的哪个分区中,并提供分区策略以及对应的分区器(partitioner)供用户选择使用,
默认的partitioner会尽力确保相同key的消息被发送到相同的分区上,如果没有指定key,则采用轮询的方式确保在topic的所有分区上均匀分配。
2. 自定义分区机制
Java版本的Producer默认的 partitioner 使用 murmur2 算法计算消息 key 的哈希值,然后对总分区数进行求模运算得到消息需要发送的分区号,用户可使用producer提供的自定义分区策略决定消息的去处。
自定义分区机制需要做一下两件事:
-
创建一个实现 org.apache.kafka.clients.producer.Partitioner 接口的类,用于实现自定义分区的规则。 partition方法用于确定目标分区,该方法接收消息所属的topic、key、value、集群元数据信息,通过这些信息计算得出最终的目标分区号并返回, close方法用于Partitioner关闭时,清理创建时的初始化资源等。
public interface Partitioner extends Configurable, Closeable {
/**
* 计算消息发送到哪个分区
*
* @param topic topic名称
* @param key 消息键或null
* @param keyBytes 消息键序列化字节数组或null
* @param value 消息体或null
* @param valueBytes 消息体序列化字节数组或null
* @param cluster 集群元数据
*/
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
/**
* 关闭 partitioner
*/
void close();
}
复制代码
- 在构造KafkaProducer中的Properties对象中设置 partitioner.class 参数。
实现消息分区
假设现有业务需求,有一类消息专门用于审计功能使用,当消息的key为 audit 时,将该类消息发送到topic的最后一个分区上,方便后续处理,而该topic下的其他消息则采用随机策略发送到其他分区上,代码实现如下:
先创建自定义分区机制类:
package com.kafka.producer;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
public class AuditPartitioner implements Partitioner {
private Random random;
@Override
public void configure(Map<String, ?> configs) {
// 初始化资源
random = new Random();
}
@Override
public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes,
Cluster cluster) {
// 得到消息的key,转成String类型
String key = (String) keyObj;
// 得到topic下可用分区数量
List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);
int partitionCounts = partitionInfos.size();
// 得到最后一个分区数字
int auditPartition = partitionCounts - 1;
// 当消息为null、空字符串、非audit时,在除了最后一个分区之间随机选择一个分区返回,否则返回最后一个分区
return key == null || key.isEmpty() || !"audit".equals(key) ? random.nextInt(partitionCounts - 1) :
auditPartition;
}
@Override
public void close() {
// 清理资源
random = null;
}
}
复制代码
使用自定义分区类:
public class KafkaProducerApp {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// 自定义分区类,必须给出完整的类限定名
props.put("partitioner.class", "com.kafka.producer.AuditPartitioner");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
String topic = "my-test";
producer.send(new ProducerRecord<>(topic, "non-key record")).get();
producer.send(new ProducerRecord<>(topic, "non-key record")).get();
producer.send(new ProducerRecord<>(topic, "non-key record")).get();
producer.send(new ProducerRecord<>(topic, "other", "non-key record")).get();
producer.send(new ProducerRecord<>(topic, "other", "non-key record")).get();
producer.send(new ProducerRecord<>(topic, "other", "non-key record")).get();
producer.send(new ProducerRecord<>(topic, "audit", "audit record")).get();
}
}
复制代码
创建一个分区总数为3的topic:
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic my-test --partitions 3 --replication-factor 1
复制代码
运行 KafkaProducerApp main方法进行消息的发送,然后查看分区中的消息数量:
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-test
复制代码
可以看到,发送了7条消息,只有一条是audit,最后一个分区,只有一条消息:
其他的两个分区被随机发送了,虽然第一个分区也只有一条,不过我们可以继续运行 KafkaProducerApp ,然后再看结果:
很明显再运行一次后,前两个分区都被随机发送了消息,最后一个分区增加了1,因为运行了两次,只发送了两次audit的消息,说明我们的自定义分区策略生效了。
消息序列化
1. 默认序列化
kafka支持任意的消息类型,可以是字符串、整数、数组、或者其他的对象类型,但是在发送数据到broker前,producer需要将数据转换为字节形式在网络中传输,
也就是说,不管是什么类型的消息,在发送前都需要有一个序列化器(serializer)将消息转化为字节数组,
而接收方都需要一个解序列化(deserializer)将接收到的字节数组转换为相应的对象,serializer与deserializer的关系如下:
Kafka默认提供了较多的序列化器,常用的serializer如下:
-
StringSerializer:序列化String类型 -
ByteBufferSerializer:序列化ByteBuffer类型 -
BytesSerializer:序列化Kafka自定义的Bytes类 -
DoubleSerializer:序列化Double类型 -
IntegerSerializer:序列化Integer类型 -
LongSerializer:序列化Long类型
如果用户有更复杂的序列化需求,可自行定义 serializer 。
在构造Producer对象时,指定相应的序列化值即可使用序列化:
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
复制代码
2. 自定义序列化
与自定义分区类型,自定义序列化也需要编写相应的实现类,步骤如下:
-
定义数据对象格式 例如,我们有一个User类的对象需要进行序列化发送:
public class User {
private String name;
private String pwd;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPwd() {
return pwd;
}
public void setPwd(String pwd) {
this.pwd = pwd;
}
}
复制代码
-
创建自定义序列化类 这里使用jackson对User对象进行json序列化,自定义的序列化器必须实现 org.apache.kafka.common.serialization.Serializer 接口。
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.1</version>
</dependency>
复制代码
public class UserSerializer implements Serializer {
private ObjectMapper objectMapper;
@Override
public void configure(Map configs, boolean isKey) {
this.objectMapper = new ObjectMapper();
}
@Override
public byte[] serialize(String topic, Object data) {
try {
return objectMapper.writeValueAsString(data).getBytes(StandardCharsets.UTF_8);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
this.objectMapper = null;
}
}
复制代码
- 设置自定义序列化类
因为我们发送的Value是User类型,因此将 value.serializer 设置为我们自己写好的User序列化器,然后进行发送即可。
public class KafkaProducerApp {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// 自定义分区类,必须给出完整的类限定名
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.kafka.producer.UserSerializer");
Producer<String, User> producer = new KafkaProducer<>(props);
String topic = "my-test";
producer.send(new ProducerRecord<>(topic, new User())).get();
}
}
复制代码
如果序列化器指定错误,例如下面的 StringSerializer 无法序列化 User 对象,将抛出异常。
producer拦截器
producer拦截器(interceptor) 可以在消息发送前或者producer回调逻辑处理前对消息进行一些通用的处理,例如修改消息内容等,
interceptor可以被指定多个,按照指定顺序作用于同一条消息形成一个拦截链(interceptor chain),
interceptor的接口为:org.apache.kafka.clients.producer.ProducerInterceptor,主要的方法如下:
-
onSend(ProducerRecord<K, V> record) 该方法运行在用户的主线程中,在消息被序列化和确定分区前被调用,用于可以在该方法中对消息做任意处理,但最好不要修改所属的topic和分区,否则将影响后面的目标分区计算。 -
onAcknowledgement(RecordMetadata metadata, Exception exception) 该方法在消息被应答前或消息发送失败时调用,通常在producer回调逻辑前触发,其运行在Producer的IO线程中,在这个方法里不要做一些比较耗时的操作,否则会降低Producer的消息发送效率。 -
void close() 关闭 interceptor,执行一些资源清理工作。
interceptor不保证线程安全性,需要用户自行保障线程安全,多个interceptor按照指定顺序调用,同时interceptor中捕获的异常将记录到错误日志中,而不是向上传递。
实际使用
假设我们现在要创建两个 interceptor,一个是在消息的前面加上发送时的时间戳,一个是统计消息发送成功和失败的次数。
TimeStampPrependerInterceptor:
public class TimeStampPrependerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 在消息内容前加上时间戳
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(),
System.currentTimeMillis() + "," + record.value());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
复制代码
CounterInterceptor:
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private AtomicInteger errorCounter = new AtomicInteger(0);
private AtomicInteger successCounter = new AtomicInteger(0);
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 统计成功和失败次数
if (exception == null) {
successCounter.incrementAndGet();
} else {
errorCounter.incrementAndGet();
}
}
@Override
public void close() {
// 关闭前打印出成功和失败的次数
System.out.println("成功发送: " + successCounter.get());
System.out.println("失败发送: " + errorCounter.get());
}
@Override
public void configure(Map<String, ?> configs) {
}
}
复制代码
main运行类:
public class KafkaProducerApp {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 添加拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add("com.kafka.producer.TimeStampPrependerInterceptor");
interceptors.add("com.kafka.producer.CounterInterceptor");
props.put("interceptor.classes", interceptors);
Producer<String, String> producer = new KafkaProducer<>(props);
String topic = "my-interceptor";
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>(topic, i + "", i + "")).get();
}
producer.close();
}
}
复制代码
运行结果:
消费者监听到的消息:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-interceptor --from-beginning
复制代码
防止消息丢失配置
Producer采用异步消息发送,每次send前会将消息放到batch缓冲区中,由专门的IO线程提取消息进行发送,这意味着如果IO线程提取消息时,Producer出现崩溃,那么之前放到batch缓冲区中的消息就都丢失了,
其次在没有开启幂等性,又设置了瞬时异常重试的情况下,由于 max_in_flight_requests_per_connection 的值默认是5,因此也会发生消息乱序的情况,
对于消息丢失的情况,采用同步发送看起来可规避,但是这样会大大降低性能,因此我们考虑在异步情况下解决这个问题,
在不开启幂等性的情况下,对于上面的两种情况通过对producer和broker进行相应的配置可进行最大程度的规避:
1. producer端配置
-
max.block.ms 该值用于设置当缓冲区已满或元数据不可用时的阻塞时间,默认是 60000 (1分钟)。 -
acks = all 必须所有的follower都响应了发送消息才认为消息提价成功,最大程度保证持久性。 -
retries = Integer.MAX_VALUE 设置为一个超大值,当发生瞬时可恢复异常时,最好交给Producer不停的重试并将消息提交成功。 -
max_in_flight_requests_per_connection = 1 这个配置用于防止同topic同分区的消息乱序问题,该参数限制Producer在单个broker连接上能够发送的未响应请求的数量,设置为1后,则是串行化发送,当前面的没有收到响应,后面的肯定无法发送过去,保证了顺序性。 -
带有回调机制的send 如果消息发送失败,该方法不会得到任何通知,可能造成数据的丢失,因此要带上回调机制,对失败的消息进行相应的处理。 -
失败时显式关闭Producer 在回调机制中当发生了不可逆转的错误时,同时不希望剩下的消息发送出去,那么就需要使用KafkaProducer.close(0)立即进行关闭,如果使用close(),Producer会将剩余的消息继续发送出去,这样可能会造成消息乱序。
2. broker端配置
-
unclean.leader.election.enable = false 不允许非ISR中的副本被选举为leader,防止因日志水位截断造成消息丢失。 -
replication.factor >= 3 使用多个副本保存消息,防止消息丢失。 -
min.insync.replicas > 1 在Producer端的acks设置为all或-1时生效,控制消息在broker中至少写入多少个副本时,才算成功。 -
确保replication.factor > min.insync.replicas 如果二者相等,那么只要有一个副本挂掉,分区就无法正常工作,持久性虽然很高但是可用性降低了,因此推荐是 replication.factor = min.insync.replicas + 1
消息压缩
在生产环境中,如果IO资源非常紧张,例如Producer程序消耗了大量的网络带宽或broker端的磁盘占用率非常高,但Producer的CPU资源非常富足,那么可以考虑为Producer开启消息压缩,反之则不需要设置消息压缩,以节省CPU资源。
压缩的性能与batch中的消息大小相关,batch大小越大,压缩时间则越长,如果压缩比较慢,说明系统瓶颈在用户主线程而非IO发送线程,此时可以增加多个用户主线程同时发送消息,增加Producer吞吐量。
多线程发送消息
在实际的使用过程中,Producer实例有两种使用方式:
-
多线程单KafkaProducer实例 全局共用一个KafkaProducer实例,然后在多个线程中共享使用该实例,KafkaProducer实例是线程安全的,可以放心使用。 -
多线程多KafkaProducer实例 在全局实例影响到吞吐量或性能的情况下,每个用户线程可自己单独创建自己的KafkaProducer实例。
| 说明 | 优势 | 劣势 |
---|
多线程单KafkaProducer实例 | 所有线程共享一个实例 | 实现简单,性能较好 | 1. 所有线程共享一个内存缓冲区,需要较多的内存 2. 一旦某个producer 线程崩溃导致KafkaProducer实例被破坏(例如关闭), 则所有producer线程都无法工作 | 多线程多KafkaProducer实例 | 每个线程使用单独的实例 | 1. 每个线程拥有自己的实例,缓冲区空间和其他配置可单独控制 2. 某个实例崩溃不会影响到其他producer线程工作 | 需要较大的内存分配开销 |
如果是分区数不对的kafka集群,可以使用第一种方式,如果分区数较多,不同的线程有自己的配置,可使用第二种方式。
新旧版本producer区别
建议采用新版本的Producer SDK进行开发,新旧之间最大的区别是:
- 代码入口,API参数列表等都不相同
- 旧版本默认同步发送,新版本异步发送
- 旧版本与Zookeeper通信进行数据发送,新版本则不需要
ProducerConfig
在上面的示例代码中,很多参数我们都是以字符串的形式直接写入 Properties 的,这样很容易出错,Producer SDK为我们提供了 ProducerConfig 类,用于直接使用,例如:
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"
..... 还有很多,可以查看源码
|