消费者部分常用配置介绍:
#消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
group.id
?
#为true则自动提交偏移量
enable.auto.commit
?
#自动提交offset周期
auto.commit.interval.ms
?
#重置消费偏移量策略,消费者在读取一个没有偏移量的分区或者偏移量无效情况下(因消费者长时间失效、包含偏移量的记录已经过时并被删除)该如何处理,
#默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更 才可以
auto.offset.reset
?
#序列化器
key.deserializer
?配置项代码:
public static Properties getProperties() {
Properties props = new Properties();
//broker地址
props.put("bootstrap.servers", "xxx.xxx.xx.xxx:9092");
//消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
props.put("group.id", "wnn-g1");
//开启自动提交offset
props.put("enable.auto.commit", "true");
//自动提交offset延迟时间
props.put("auto.commit.interval.ms", "1000");
//反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
消息订阅:
?topic=wnn-topic-test-12.18, offset=5,key=wnn-key0,value=wnn-content-value0? topic=wnn-topic-test-12.18, offset=19,key=wnn-key1,value=wnn-content-value1? topic=wnn-topic-test-12.18, offset=20,key=wnn-key2,value=wnn-content-value2?
自动提交offset问题 ?? ?没法控制消息是否正常被消费 ?? ?适合非严谨的场景,比如日志收集发送?
下面介绍手动提交offset:
????????手工提交offset分2种: ????????同步 commitSync 阻塞当前线程 (自动失败重试) ????????异步 commitAsync 不会阻塞当前线程 (没有失败重试,回调callback函数获取提交信息,记录日志)
异步手动提交代码:
@Test
public void simpleConsumerTest(){
Properties properties = getProperties();
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
//订阅主题
kafkaConsumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME));
while (true){
//领取时间,阻塞超时时间
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for(ConsumerRecord record : records){
System.err.printf("topic=%s, offset=%d,key=%s,value=%s %n",record.topic(),record.offset(),record.key(),record.value());
}
//同步阻塞提交offset 用的相对较少
//kafkaConsumer.commitSync();
if(!records.isEmpty()){
//异步提交offset
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if(exception == null){
System.err.println("手工提交offset成功:"+offsets.toString());
}else {
System.err.println("手工提交offset失败:"+offsets.toString());
}
}
});
}
}
}
?
如果需要从头消费partition消息,怎操作? auto.offset.reset 配置策略即可 默认是latest,需要改为 earliest 且消费者组名变更 ,即可实现从头消费 //默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效 props.put("auto.offset.reset","earliest"); ?
|