IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka消费者配置-手动提交offset-初级入门五 -> 正文阅读

[大数据]kafka消费者配置-手动提交offset-初级入门五

消费者部分常用配置介绍:

#消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
group.id
?
#为true则自动提交偏移量
enable.auto.commit
?
#自动提交offset周期
auto.commit.interval.ms
?
#重置消费偏移量策略,消费者在读取一个没有偏移量的分区或者偏移量无效情况下(因消费者长时间失效、包含偏移量的记录已经过时并被删除)该如何处理,
#默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更 才可以
auto.offset.reset
?
#序列化器
key.deserializer

?配置项代码:

    public static Properties getProperties() {

        Properties props = new Properties();

        //broker地址
        props.put("bootstrap.servers", "xxx.xxx.xx.xxx:9092");

        //消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
        props.put("group.id", "wnn-g1");

        //开启自动提交offset
        props.put("enable.auto.commit", "true");

        //自动提交offset延迟时间
        props.put("auto.commit.interval.ms", "1000");

        //反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        return props;
    }

消息订阅:

?topic=wnn-topic-test-12.18, offset=5,key=wnn-key0,value=wnn-content-value0?
topic=wnn-topic-test-12.18, offset=19,key=wnn-key1,value=wnn-content-value1?
topic=wnn-topic-test-12.18, offset=20,key=wnn-key2,value=wnn-content-value2?


自动提交offset问题
?? ?没法控制消息是否正常被消费
?? ?适合非严谨的场景,比如日志收集发送?

下面介绍手动提交offset:

????????手工提交offset分2种:
????????同步 commitSync 阻塞当前线程 (自动失败重试)
????????异步 commitAsync 不会阻塞当前线程 (没有失败重试,回调callback函数获取提交信息,记录日志)

异步手动提交代码:

    @Test
    public void simpleConsumerTest(){

        Properties properties = getProperties();

        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);

        //订阅主题
        kafkaConsumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME));


        while (true){
            //领取时间,阻塞超时时间
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));

            for(ConsumerRecord record : records){
                System.err.printf("topic=%s, offset=%d,key=%s,value=%s %n",record.topic(),record.offset(),record.key(),record.value());
            }

            //同步阻塞提交offset 用的相对较少
            //kafkaConsumer.commitSync();

            if(!records.isEmpty()){
                //异步提交offset
                kafkaConsumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {

                        if(exception == null){
                            System.err.println("手工提交offset成功:"+offsets.toString());
                        }else {
                            System.err.println("手工提交offset失败:"+offsets.toString());
                        }
                    }
                });
            }
        }

    }

?

如果需要从头消费partition消息,怎操作?
auto.offset.reset 配置策略即可
默认是latest,需要改为 earliest 且消费者组名变更 ,即可实现从头消费
//默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效 props.put("auto.offset.reset","earliest");
?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-23 15:49:12  更:2021-12-23 15:49:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 7:40:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码