Consumer消费数据流程
offset相关 Consumer从kafka的磁盘中消费数据,所以不用担心数据丢失问题。
但是,Consumer作为一个消费者,是有可能出现宕机等问题的,也就意味着会出现重启后,继续消费的问题,那么就必须要消费者偏移量,消费到哪条数据了。
结论:offset是用来记录Consumer的消费位置的,由Consumer自己负责维护(提交),保存在kafka的broker的内置topic中
# consumer重启offset机制,三个可选值,过早的offset记录会被删除。
auto.offset.reset=latest # 默认值,从最新的offset继续消费数据。
# 自动提交offset
默认情况下Consumer的offset自动提交。
# ------------------配置参数-----------------------
# 自动提交开启
enable.auto.commit=true # 默认值
# 自动提交的时间间隔
auto.commit.interval.ms=5000 # 默认值5000 单位毫秒。```
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
# 手动提交offset
通过代码的方式手动明确offset提交的方式。
`config.put(ConsumerConfig.ENABLE_AUTO_CO``MMIT_CONFIG,"false");
while (true){
ConsumerRecords<String, String> crs = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> cr : crs) {
System.out.println("cr = " + cr);
}
kafkaConsumer.commitAsync();
}
while (true){
ConsumerRecords<String, String> crs = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> cr : crs) {
System.out.println("cr = " + cr);
}
kafkaConsumer.commitSync();
}
|