背景:kafka 客户端之producer API发送消息以及简单源码分析已经介绍了producer的异步发送和异步回调发送消息的基本使用,但是都是使用内置的负载均衡策略。kafka的负载均衡是在客户端实现的。
自定义负载均衡实现
在某些特殊的业务场景下我们经常会有自定义负载均衡算法的需求,在Kafka中可以通过实现Partitioner接口来自定义Partition负载均衡器。
kafka自带的有三种实现
- DefaultPartitioner:如果record中指定了分区,则使用它;如果未指定分区但存在key,则根据key的hash选择分区;如果不存在分区或key,则选择在批处理已满时更改的sticky partition。
- UniformStickyPartitioner:如果record中指定了分区,则使用它;否则选择batch已满时更改的sticky partition。 注意:与 DefaultPartitioner 相比,record key不用作此分区器中分区策略的一部分。 具有相同键的record不保证发送到同一个分区。 有关sticky partition的详细信息,请参阅 KIP-480
- RoundRobinPartitioner: “循环”分区器 当用户希望将写入平均分配到所有分区时,可以使用此分区策略。这是与record key hash无关的行为
自己实现Partitioner接口
public class MyPartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int partitionsNum = cluster.partitionsForTopic(topic).size();
return ThreadLocalRandom.current().nextInt(partitionsNum);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
new 一个producer实例的时候,把自己的负载均衡实现类的全路径名导入进去。
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.xt.kafkademo.producer.MyPartition");
Producer异步发送消息(带回调函数和自定义Partition负载均衡)
public static void producerSendWithCallbackAndPartition(Producer<String,String> producer){
for(int i=0;i<10;i++){
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
e.printStackTrace();
}else{
System.out.println("partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
}
}
});
}
producer.close();
}
完整代码
public class ProducerSample {
private final static String TOPIC_NAME="xt";
public static Producer<String, String> createProducer(boolean mypartition) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka服务器IP:9092");
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,"5000");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
if(mypartition == true){
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.xt.kafkademo.producer.MyPartition");
}
return new KafkaProducer<>(properties);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
Producer<String,String> producer = ProducerSample.createProducer(true);
producerSendWithCallbackAndPartition(producer);
}
public static void producerSendWithCallbackAndPartition(Producer<String,String> producer){
for(int i=0;i<10;i++){
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
e.printStackTrace();
}else{
System.out.println("partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
}
}
});
}
producer.close();
}
}
kafka是怎样调用我们的负载均衡实现类的
在自己的负载均衡类上的方法打断点 然后debug运行自己的主程序 我们可以发现是如下代码调用的我们自己的实现 KafkaProducer的 partition方法
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
而这个partitioner实例正是我们自己实现的那个负载均衡类的实例,他在KafkaProducer类中被定义为了私有 final 字段。
private final Partitioner partitioner;
而partitioner 是通过如下形式来实例化的
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
config是一个ProducerConfig实例,是在KafkaProducer的构造函数new出来的。如下代码所示。而下面的Map<String, Object> configs 正是由kafka封装的Utils.propsToMap(properties)转化而来,properties是我们主程序传入的配置。所以我们的设置是怎么传递的基本弄清楚了。
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)),
keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
}
那config是怎么去new 我们自定义负载均衡实现类的呢? 先前我们说到了config是由我们传入的配置信息new 出来的,而他是一个ProducerConfig类的实例,他既然保存了我们的传入的配置信息,然后他再找出来是易于反掌
如下,已经拿到了我们自定义负载均衡类的全路径 如下就是AbstractConfig类保存配置信息的字段,是一个Map,虽然是一个私有字段,但是ProducerConfig类的实例通过从AbstractConfig类继承的getClass方法(此方法是public的)拿到了配置信息
private final Map<String, Object> values;
拿到负载均衡类的全路径之后,kafka使用自己的封装的Utils来new 实例
private <T> T getConfiguredInstance(Object klass, Class<T> t, Map<String, Object> configPairs) {
if (klass == null)
return null;
Object o;
if (klass instanceof String) {
try {
o = Utils.newInstance((String) klass, t);
} catch (ClassNotFoundException e) {
throw new KafkaException("Class " + klass + " cannot be found", e);
}
} else if (klass instanceof Class<?>) {
o = Utils.newInstance((Class<?>) klass);
} else
throw new KafkaException("Unexpected element of type " + klass.getClass().getName() + ", expected String or Class");
if (!t.isInstance(o))
throw new KafkaException(klass + " is not an instance of " + t.getName());
if (o instanceof Configurable)
((Configurable) o).configure(configPairs);
return t.cast(o);
}
进去Utils.newInstance一看,对的,还是使用的反射来创建的负载均衡实例
public static <T> T newInstance(Class<T> c) {
if (c == null)
throw new KafkaException("class cannot be null");
try {
return c.getDeclaredConstructor().newInstance();
} catch (NoSuchMethodException e) {
throw new KafkaException("Could not find a public no-argument constructor for " + c.getName(), e);
} catch (ReflectiveOperationException | RuntimeException e) {
throw new KafkaException("Could not instantiate class " + c.getName(), e);
}
}
(写博客主要是对自己学习的归纳整理,资料大部分来源于书籍、网络资料和自己的实践,整理不易,但是难免有不足之处,如有错误,请大家评论区批评指正。同时感谢广大博主和广大作者辛苦整理出来的资源和分享的知识。)
|