一、环境说明
1.电脑或你的服务器需要安装zookeeper和kafka
可以参考我的这篇博客:请点击这里!
2.项目中需要下面的依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
二、简单消费者的书写:
package com.maoyan.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers","localhost:9092");
properties.put("retries",1);
properties.put("batch.size",16384);
properties.put("linger.ms",1);
properties.put("buffer.memory",33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i=0;i<10;i++){
producer.send(new ProducerRecord<String,String>("study","luzelong"+i));
}
producer.close();
}
}
三、有回调函数的生产者
package com.maoyan.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CallBackProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
for (int i=0;i<10;i++) {
producer.send(new ProducerRecord<String, String>("study" ,Integer.toString(i), Integer.toString(i)), (metadata, exception) -> {
if (exception == null) {
System.out.println("success->" +
" partition = " +metadata.partition()+" ~~~~~~~ offset = "+metadata.offset());
} else {
exception.printStackTrace();
} });
}
producer.close();
}
}
注意:如果在send()方法后接着调用get()方法,那么就是有序的同步方法,消息会一条接一条的发送send(xxx).get()
四、自定义分区策略的插入
先说一下不自定义的情况的情况: 分区的分配基本由ProducerRecord的参数决定:
(1)指明partition的情况下,直接将指明的值直接作为 partiton 值; (2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到 partition 值; (3)既没有partition值又没有key值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin算法。
但有时候上面的逻辑并不符合我们的实际开发,而该kafka客户端API也提供了自定义分区的方法,首先需要先书写分区的策略:
package com.maoyan.kafka.partititioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 1;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
然后在生产者的properties中加入即可,其他的书写和上面其他的两个案例类似!
public class PartitionProducer {
public static void main(String[] args) {
.....
properties.put("partitioner.class","com.maoyan.kafka.partititioner.MyPartitioner");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
......
}
}
|