IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka: 按partition同步手动提交offset -> 正文阅读

[大数据]Kafka: 按partition同步手动提交offset

package com.cisdi.dsp.modules.metaAnalysis.rest;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;
import java.util.function.Consumer;

public class ManualSubmitOffsetByPartition {
    public static void main(String[] args) {
        //定义topic
        String topic="testTopic2";
        //定义broker
        String server="localhost:9092";
        //定义消费者组
        String group="consumerGroupTest2";

        //定义Properties对象来构建kafka Consumer
        Properties properties=new Properties();
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        //构建Kafka Consumer
        KafkaConsumer<String,String> myConsumer=new KafkaConsumer<>(properties);
        //订阅topic
        myConsumer.subscribe(Arrays.asList(topic));
        try{
            while(true){
                //每隔2秒从服务器获取消息
                ConsumerRecords<String,String> records=myConsumer.poll(Duration.ofMillis(2000));
                //从ConsumerRecords对象获取所有的TopicPartition集合
                Set<TopicPartition> partitions = records.partitions();
                //遍历TopicPartition集合
                for(TopicPartition topicPartition: partitions){
                    //获取收到的消息中属于某个partition的所有消息记录
                    List<ConsumerRecord<String, String>> recordList = records.records(topicPartition);
                    //消费消息
                    recordList.forEach(new Consumer<ConsumerRecord<String, String>>() {
                        @Override
                        public void accept(ConsumerRecord<String, String> stringStringConsumerRecord) {
                            System.out.println(stringStringConsumerRecord.value());
                        }
                    });
                    //获取某个partition中最大的消息offset
                    long latestOffsetInOneTopicPartition=recordList.get(recordList.size()-1).offset();
                    //提交某个partition的消费offset
                    myConsumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndMetadata(latestOffsetInOneTopicPartition+1)));
                }
            }
        }catch (Exception ex){
            myConsumer.close();
        }
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-11 22:17:06  更:2022-03-11 22:21:24 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 19:06:21-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码