package com.cisdi.dsp.modules.metaAnalysis.rest;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
import java.util.function.Consumer;
public class ManualSubmitOffsetByPartition {
public static void main(String[] args) {
//定义topic
String topic="testTopic2";
//定义broker
String server="localhost:9092";
//定义消费者组
String group="consumerGroupTest2";
//定义Properties对象来构建kafka Consumer
Properties properties=new Properties();
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//构建Kafka Consumer
KafkaConsumer<String,String> myConsumer=new KafkaConsumer<>(properties);
//订阅topic
myConsumer.subscribe(Arrays.asList(topic));
try{
while(true){
//每隔2秒从服务器获取消息
ConsumerRecords<String,String> records=myConsumer.poll(Duration.ofMillis(2000));
//从ConsumerRecords对象获取所有的TopicPartition集合
Set<TopicPartition> partitions = records.partitions();
//遍历TopicPartition集合
for(TopicPartition topicPartition: partitions){
//获取收到的消息中属于某个partition的所有消息记录
List<ConsumerRecord<String, String>> recordList = records.records(topicPartition);
//消费消息
recordList.forEach(new Consumer<ConsumerRecord<String, String>>() {
@Override
public void accept(ConsumerRecord<String, String> stringStringConsumerRecord) {
System.out.println(stringStringConsumerRecord.value());
}
});
//获取某个partition中最大的消息offset
long latestOffsetInOneTopicPartition=recordList.get(recordList.size()-1).offset();
//提交某个partition的消费offset
myConsumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndMetadata(latestOffsetInOneTopicPartition+1)));
}
}
}catch (Exception ex){
myConsumer.close();
}
}
}
|