以下测试皆在windows下进行,请根据自己情况酌情配置kafka zookeeper等环境
本人使用的是jdk11,代码中可能存在jdk9的新特性,使用jdk9以前的jdk的朋友请自行转换
kafka环境变量等暂时略过
1.java导入依赖
<!--导入kafka依赖-->
? ? ? ?<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.apache.kafka</groupId>
? ? ? ? ? ?<artifactId>kafka-clients</artifactId>
? ? ? ? ? ?<version>2.8.0</version>
? ? ? ?</dependency>
? ? ? ?<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.apache.kafka</groupId>
? ? ? ? ? ?<artifactId>kafka_2.13</artifactId>
? ? ? ? ? ?<version>2.8.0</version>
? ? ? ?</dependency>
kafka Producer
导入相关依赖后,创建测试类ProducerDemo;
-
创建生产者对象 使用KafkaProducer 创建kafka生产者对象,这时可以发现kafka不允许我们使用空构造来创建对象; 那么我们就选用传入properties的方式创建kafka生产者 创建生产者的时候,跟控制台命令一样,我们需要指定集群名称以及序列化器,而这些相关设置都会存储在我们的配置文件中; kafka给我们提供了ProducerConfig类,并在其中已经给我们提前准备好了我们所需要的key,在向properties中put键值时,可以直接使用producerConfig的静态常量作为key;并传入相应value -
向kafka中发送信息 使用kafkaProducer向kafka中发送信息,可以使用其提供的send()方法 ;使用时可以看到其需要传入ProducerRecord以及一个可选的Callback ProducerRecord: 即为每条数据所封装成的对象 CallBack:可选;获取函数的回调 -
close() 在真实生产环境中,我们可能不需要手动调用close方法关闭kafkaProducer,但是目前的测试阶段,如果不使用close关闭,可能会导致发送的信息在设置等待的时间内,不会被真正的发送; 流在关闭的时候会对数据进行回收操作
/**
* 描述:kafkaProducer生产者
*
* <pre>
* HISTORY
* ****************************************************************************
* ID ? ? DATE ? ? ? ? PERSON ? ? ? ? REASON
* 1 ? ? 2021/8/10 23:14 ? Bambi ? ? ? Create
* ****************************************************************************
* </pre>
*
* @author Bambi
* @since 1.0
*/
public class ProducerPartitionerDemo01 {
? ?public static void main(String[] args) {
? ? ? ?Properties properties = new Properties();
? ? ? ?//自行修改为对应的集群地址 kafka默认为9092,此处我没有更改
? ? ? ?properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
? ? ? ?//需要传入序列化器的全类名,kafka需要通过反射全类名去获取序列化器
? ? ? ?properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
? ? ? ?properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
?
? ? ? ?KafkaProducer kafkaProducer = new KafkaProducer(properties);
? ? ? ?for (int i = 0; i < 10; i++) {
? ? ? ? ? ?//使用callBack收集回调信息,使用了lamdba表达式
? ? ? ? ? ?kafkaProducer.send(new ProducerRecord("此处使用自己存在的主题","value"),((metadata, exception) ->{
? ? ? ? ? ? ? ?if(exception==null){
? ? ? ? ? ? ? ? ? ?System.out.println("没有错误,数据添加成功");
? ? ? ? ? ? ? }
? ? ? ? ? } ));
? ? ? }
//关闭
? ? ? ?kafkaProducer.close();
? }
}
自定义分区器
如果想要自己根据业务需求编写自定义的分区规则,可以自定义分区器;
说到自定义,就势必需要去实现某个接口或者继承某个类
这里, 我们需要实现的是kafka给我们的Partitioner接口,实现后重写方法
/**
* 描述: 自定义分区器
*
* <pre>
* HISTORY
* ****************************************************************************
* ID ? ? DATE ? ? ? ? PERSON ? ? ? ? REASON
* 1 ? ? 2021/8/10 22:24 ? Bambi ? ? ? Create
* ****************************************************************************
* </pre>
*
* @author Bambi
* @since 1.0
*/
public class MyPartitioner implements Partitioner {
?
? ?/**
? ? * 编写分区规则
? ? */
? ?@Override
? ?public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
? ? ? ?//根据业务需求编写分区规则
? ? ? ?return 0;
? }
?
? ?@Override
? ?public void close() {
?
? }
?
? ?/**
? ? * 读取配置信息
? ? * @param configs
? ? */
? ?@Override
? ?public void configure(Map<String, ?> configs) {
?
? }
}
在编写规则时可以参考Kafka对Partitioner的默认实现 DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
? ? ? ? ? ? ? ? ? ? ? ? int numPartitions) {
? //如果key也不存在,则会对可用分区进行轮询
? ? ? ?if (keyBytes == null) {
? ? ? ? ? ?return stickyPartitionCache.partition(topic, cluster);
? ? ? }
? //如果没有指定分区,且存在key值,则会根据key的hash进行取模来选择分区
? ? ? ?// hash the keyBytes to choose a partition
? ? ? ?return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
? }
实现同步发送
正常情况下kafka生产者发送信息采用的是异步发送的方式,主线程将信息发送给共享变量 RecordAccumulator ,Sender线程不同的从共享变量中拉取数据发送到broker上;
实现逻辑
在两个线程其中一个执行的时候去阻塞另一个线程,实现串行
我们可以发现kafkaProducer的send()方法是存在返回值 Future 的;
而我们知道,当future对象调用get()方法时,不仅会获得当前线程回调的对象,还会阻塞当前线程
我们便使用这个方法来实现同步发送
同步发送的使用场景相对较少,我们可以使用同步发送来确保区内有序,即当上一条信息发送后,未接收到ack之前,阻塞发送线程,不继续发送,从而实现有序
消费者API
编写消费者api的逻辑与生产者十分的相像,使用kafka提供的 KafkaConsumer 来创建消费者对象
并在配置文件中传递对应信息,可以使用ConsumerConfig中的静态属性充当key值
/**
* 描述:kafka消费者
*
*
* <pre>
* HISTORY
* ****************************************************************************
* ID ? ? DATE ? ? ? ? PERSON ? ? ? ? REASON
* 1 ? ? 2021/8/11 0:21 ? Bambi ? ? ? Create
* ****************************************************************************
* </pre>
*
* @author Bambi
* @since 1.0
*/
public class ConsumerTest {
?
? ?public static void main(String[] args) {
? ? ? ?Properties properties = new Properties();
? ? ? ?//连接的集群
? ? ? ?properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
? ? ? ?//生产者需要指定序列化器,那么消费者就需要指定对应的反序列化器
? ? ? ?properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
? ? ? ?properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
?
? ? ? ?//自动提交
? ? ? ?properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
? ? ? ?//自动提交的延迟,提交的是消费者的offset
? ? ? ?properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
?
? ? ? ?//消费者组
? ? ? ?properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroup01");
?
? ? ? ?//创建消费者
? ? ? ?KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
? ? ? ?//订阅主题
? ? ? ?//此处可以添加多个集群
? ? ? ?//可以看到这里没有返回值,也就是说,这里只是单纯的指定了主题,如果想获取主题中的信息,需要使用别的方法
? ? ? ?kafkaConsumer.subscribe(List.of("你的主题"));
?
?
? ? ? ?while (true){
? ? ? ? ? ?//获取的类型与Producer类似,不过为ConsumerRecords类,想要得到单个数据,需要遍历输出
? ? ? ? ? ?//新版本建议使用传入Duration的方式,直接传入毫秒数的方式以过时
? ? ? ? ? ?ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ZERO);
? ? ? ? ? ?consumerRecords.forEach(stringStringConsumerRecord -> {
? ? ? ? ? ? ? ?//可以看到,使用consumerRecord去调用方法的时候,可以获取到Key,所以key并不只是用来划分分区之用,如果没有指定key,会输出null
? ? ? ? ? ? ? ?System.out.println(stringStringConsumerRecord.key()+":"+stringStringConsumerRecord.value()+" ? :"+stringStringConsumerRecord.offset());
? ? ? ? ? });
? ? ? }
? ? ? ?//consumer进行订阅拉去信息的时候不需要手动关闭,因为顺序执行完毕后,jvm会关闭;所以可以使用一个while循环来持续消费
? }
}
启动消费者,会发现我们可以连接到对应的主题,但是不会获取到先前已经存在的信息;
在我们使用控制台调用消费者时,如果我们想获取该分区已经存在的信息,我们可以使用 --from-beginning指令将offset放到最前端从头获取;
java api中也是一样;
kafka中 命令行能做的事情,在配置文件中应该都有相关的配置
我们进入ConsumerConfig,可以查看到其已经给我们提供了 AUTO_OFFSET_RESET_CONFIG这个属性;
根据下方的doc描述,该属性默认值为lastest,这也是我们为什么在不设置的时候会无法获取已存在信息的原因,我们可以手动在配置文件中传入
earierlast
此处注意这个指定的生效条件:
关于offset的手动提交
我们为什么需要手动提交? 自动提交无法保证准确的提交时机
-
如果设置的提交延时过短,会丢是数据 -
如果设置的延时过长,会导致数据重复
1.在配置文件中关闭自动提交
既然我们需要手动提交,则必然需要在配置文件中将自动提交置为false
ENABLE_AUTO_COMMIT_CONFIG,<----将它改成false
-
在消费结束后进行手动提交 使用consumer的 commitSync() 同步提交,或commitAsync() 异步提交
自定义存储offset
手动提交虽然可以解决丢是数据的问题,但是仍然会存在数据重复的现象;
kafka也早已考虑到这种情况,所以允许我们自定义存储offset的规则,(比如我们可以和MySQL的写入操作进行事务绑定...)
但是相对于自定义分区器,自定义存储offset要相对麻烦一些;在0.9版本之后,kafka会将offset暂存在kafka内置的一个主题中,想要去维护一个offset,就需要考虑到消费者的Rebalance问题
即,如果当前消费者所消费的分区挂掉了,消费者需要转移到另一分区去消费,此时的offset需要定位到这个分区最近提交的offset
为此,我们需要实现kafka提供的ConsumerRebalanceListener
/**
* 描述: 自定义存储Offset
*
* <pre>
* HISTORY
* ****************************************************************************
* ID ? ? DATE ? ? ? ? PERSON ? ? ? ? REASON
* 1 ? ? 2021/8/11 23:26 ? Bambi ? ? ? Create
* ****************************************************************************
* </pre>
*
* @author Bambi
* @since 1.0
*/
public class ConsumerConfigOffset {
? ?//创建一个Map在暂存当前offset
? ?private static Map<TopicPartition,Long> currentOffset = new ConcurrentHashMap<>();
? ?
? ?public static void main(String[] args) {
? ? ? ?//配置文件较为冗长,我写了个工具类进行配置,相关配置内容已经提到过,就不再赘述
? ? ? ?PropertiesUtils propertiesUtils = new PropertiesUtils();
? ? ? ?Properties properties = propertiesUtils.ConsumerProperties("localhost:9092", "bambiOffset", "false", "100", 1);
?
? ? ? ?KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
?
? ? ? ?//在此处创建ConsumerRebalanceListener类
? ? ? ?kafkaConsumer.subscribe(List.of("solo1"), new ConsumerRebalanceListener() {
? ? ? ? ? ?//在Rebalance之前调用
? ? ? ? ? ?@Override
? ? ? ? ? ?public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
? ? ? ? ? ? ? ?commitOffset(currentOffset);
? ? ? ? ? }
?
? ? ? ? ? ?//在Rebalance之后调用
? ? ? ? ? ?@Override
? ? ? ? ? ?public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
? ? ? ? ? ? ? ?currentOffset.clear();
? ? ? ? ? ? ? ?partitions.forEach(partition -> {
? ? ? ? ? ? ? ? ? ?//定位到分区中最近的offset,继续消费
? ? ? ? ? ? ? ? ? ?kafkaConsumer.seek(partition,getOffset(partition));
? ? ? ? ? ? ? });
? ? ? ? ? }
? ? ? });
?
? ? ? ?while (true){
? ? ? ? ? ?ConsumerRecords<String,String> poll = kafkaConsumer.poll(Duration.ofMillis(1000));
? ? ? ? ? ?poll.forEach(consumerRecord ->{
? ? ? ? ? ? ? ?System.out.printf("offset = %d %n",consumerRecord.offset());
? ? ? ? ? ? ? ?System.out.printf("key = %s %n",consumerRecord.key());
? ? ? ? ? ? ? ?System.out.printf("value = %s %n",consumerRecord.value());
?
? ? ? ? ? ? ? ?//将下标缓存到offset中
? ? ? ? ? ? ? ?currentOffset.put(new TopicPartition(consumerRecord.topic(),consumerRecord.partition()),consumerRecord.offset());
? ? ? ? ? });
? ? ? ? ? ?commitOffset(currentOffset);
? ? ? }
? }
?
? ?/**
? ? * 提交当前offset
? ? * @param currentOffset
? ? */
? ?private static void commitOffset(Map<TopicPartition , Long> currentOffset){
? ? ? ?//处理异步提交的业务逻辑
? }
?
? ?//获取当前分区的offset
? ?private static long getOffset(TopicPartition partition){
? ? ? ?return 0;
? }
}
|