- 获取topic下所有的partion
- 计算每个partion的offset
- 将分区offset移动最新的位置
- 提交分区最新的位置
如下代码
public class KafkaConsumerOffsetManager {
private KafkaConsumer consumer;
private String topic;
public KafkaConsumerRunnable(KafkaConsumer consumer, String topic) {
this.consumer = consumer;
this.topic = topic;
}
public void run() {
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
//找到topic下所有的分区
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> tp =new ArrayList<TopicPartition>();
if (null != partitionInfos && partitionInfos.size() > 0) {
partitionInfos.forEach(p -> {
tp.add(new TopicPartition(topic,p.partition()));
//消费者分配到该分区
consumer.assign(tp);
//移动到最新offset
consumer.seekToEnd(tp);
//获取到该分区的last offset
long position = consumer.position(new TopicPartition(topic, p.partition()));
offset.put(new TopicPartition(topic, p.partition()), new OffsetAndMetadata(position + 1));
});
}
consumer.commitAsync(offset, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
/** 异常同步去提交 **/
if (null != exception) {
consumer.commitSync(offsets);
}
}
});
}
}
|