IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> java获取kafka的元数据信息 -> 正文阅读

[大数据]java获取kafka的元数据信息

作者:https://cloud.tencent.com/developer/article/1554002

1,java代码获取kafka的基础信息,查出所有的topic,然后遍历信息

package kafkamonitor;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;

import com.google.common.collect.Maps;

import kafka.api.PartitionOffsetRequestInfo;

import kafka.common.TopicAndPartition;

import kafka.javaapi.OffsetResponse;

import kafka.javaapi.PartitionMetadata;

import kafka.javaapi.TopicMetadata;

import kafka.javaapi.TopicMetadataRequest;

import kafka.javaapi.consumer.SimpleConsumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;

import org.slf4j.LoggerFactory;
 

import java.util.*;



public class KafkaOffset_test {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaOffset_test.class);

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);

        Map requestInfo = Maps.newHashMap();

        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));

        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);

        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            LOGGER.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));

            return 0;

        }

        long[] offsets = response.offsets(topic, partition);

        return offsets[0];

    }

    /**

     * @param brokers broker 地址

     * @param topic topic

     * @return map

     */

    public static Map findLeader(List<String> brokers, String topic) {
        Map<Integer,PartitionMetadata> map = Maps.newHashMap();

        for (String broker : brokers) {
            SimpleConsumer consumer = null;

            try {
                String[] hostAndPort = broker.split(":");

                consumer = new SimpleConsumer(hostAndPort[0], Integer.parseInt(hostAndPort[1]), 100000, 64 * 1024, "leaderLookup" + new Date().getTime());

                List topics = Lists.newArrayList(topic);

                TopicMetadataRequest req = new TopicMetadataRequest(topics);

                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();

                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        map.put(part.partitionId(), part);

                    }

                }

            } catch (Exception e) {
                LOGGER.error("Error communicating with Broker [" + broker + "] to find Leader for [" + topic + ", ] Reason: " + e);

            } finally {
                if (consumer != null){
                    consumer.close();

                }


            }

        }

        return map;

    }



    public static List<JSONObject> monitor(List<String> brokers, List<String> topics) {
        if (brokers == null || brokers.isEmpty()) {
            return null;

        }

        if (topics == null || topics.isEmpty()) {
            return null;

        }

        List<JSONObject> list = new ArrayList<>();


        for (String topicName : topics) {
            JSONObject json = new JSONObject();
            Map<Integer,PartitionMetadata> metadata = findLeader(brokers, topicName);

            long totalSize = 0L;


            JSONArray jsonArray = new JSONArray();
            for (Map.Entry<Integer,PartitionMetadata> entry : metadata.entrySet()) {
                int partition = entry.getKey();

                String leadBroker = entry.getValue().leader().host();

                String clientName = "Client_" + topicName + "_" + partition;

                SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000, 64 * 1024, clientName);

                //todo whichTime 要获取offset的时间,-1 最新,-2 最早
                long endOffset = getLastOffset(consumer, topicName, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                System.out.println("endOffset = " + endOffset);

                long startOffset = getLastOffset(consumer, topicName, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
                System.out.println("startOffset = " + startOffset);
                long total_number = endOffset - startOffset;

                String loadUrlPattern = "parition=%s,startOffset=%s,endOffset=%s,parition_number=%s";
                String format = String.format(loadUrlPattern, partition, startOffset, endOffset, total_number);
                totalSize += total_number;

                jsonArray.add(format);
                consumer.close();

            }
            json.put("topicName",topicName);
            json.put("infos",jsonArray.toJSONString());
            json.put("totalSize",totalSize);
            list.add(json);

        }

        return list;

    }


    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());// key反序列化方式
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());// value反系列化方式
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 提交方式
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.x.xx:9092");// 指定broker地址,来找到group的coordinator
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "tuyou");// 指定用户组


        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        Set<String> strings = consumer.listTopics().keySet();


        List<String> result = new ArrayList<>(strings);
        System.out.println(strings);
        ArrayList<String> topicList = Lists.newArrayList("ods.rds_core.plateform_stable.order_driver");
        ArrayList<String> serverList = Lists.newArrayList("192.168.x.xx:9092", "192.168.x.xx:9092", "192.168.x.xx:9092");
        List<JSONObject> monitorList = monitor(serverList, topicList);
        System.out.println("monitorList = " + monitorList);



    }
}


运行结果:

2,JMX监控查询信息:

import com.google.common.collect.Lists;

import javax.management.*;
import java.io.IOException;


/**
 * @program: flink-scala-opt
 * @description:
 * @author: Mr.Wang
 * @create: 2021-07-13 14:02
 **/
public class Kafka_JMX_test {
    public static void main(String[] args)   {
        try {
            KafkaJmxConnection jmxConn = new KafkaJmxConnection("192.168.7.15:9393");
            jmxConn.init();

            while(true) {
                String topicName = "aaa";
                // 与topic无关的metric
                Object o1 = jmxConn.getValue(
                        "kafka.server:type=ReplicaManager,name=PartitionCount",
                        Lists.newArrayList("Value"));
                System.out.println(o1);
                // 与topic有关的metric
                Object o2 = jmxConn.getValue(topicName,
                        "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec",
                        Lists.newArrayList("Count", "OneMinuteRate", "FiveMinuteRate"));
                System.out.println(o2);
                Thread.sleep(5000);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (MalformedObjectNameException e) {
            e.printStackTrace();
        } catch (MBeanException e) {
            e.printStackTrace();
        } catch (AttributeNotFoundException e) {
            e.printStackTrace();
        } catch (InstanceNotFoundException e) {
            e.printStackTrace();
        } catch (ReflectionException e) {
            e.printStackTrace();
        } catch (IntrospectionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("1111 = " + 1111);
        }
    }
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
 * @program: flink-scala-opt
 * @description:
 * @author: Mr.Wang
 * @create: 2021-07-13 14:08
 **/
public class KafkaJmxConnection {

    private Logger log = LoggerFactory.getLogger(this.getClass());

    private MBeanServerConnection conn;
    /**
     * 默认连接的ip和端口号
     */
    private String ipAndPort = "localhost:9999";

    public KafkaJmxConnection(String ipAndPort){
        this.ipAndPort = ipAndPort;
    }

    public boolean init() throws Exception {
        String jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";
        log.info("init jmx, jmxUrl: {}, and begin to connect it",jmxURL);
        try {
            // 初始化连接jmx
            JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
            JMXConnector connector = JMXConnectorFactory.connect(serviceURL, null);
            conn = connector.getMBeanServerConnection();
            if(conn == null){
                log.error("getValue connection return null!");
                return  false;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    public String getName(String metric, String topicName){
        if (topicName==null){
            return metric;
        }
        // 一些metric与topic有关, 如果要指定具体的topic的话
        return metric + ",topic=" + topicName;
    }

    public Map<String, Object> getValue(String topicName, String metric, Collection<String> attrs){
        ObjectName objectName;
        try {
            objectName = new ObjectName(this.getName(metric, topicName));
        } catch (MalformedObjectNameException e) {
            e.printStackTrace();
            return null;
        }

        Map<String, Object> result = new HashMap<>();
        // 遍历所有属性, 获取每个属性的结果, 并将属性和属性对应的结果保存到map中
        for(String attr:attrs){
            result.put(attr, getAttribute(objectName, attr));
        }
        return result;
    }

    public Map<String, Object> getValue(String metric, Collection<String> attrs){
        return getValue(null, metric, attrs);
    }

    private Object getAttribute(ObjectName objName, String objAttr){
        if(conn== null){
            log.error("jmx connection is null");
            return null;
        }

        try {
            return conn.getAttribute(objName,objAttr);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

}

?JMX参数参考:

https://cloud.tencent.com/developer/article/1554002

完整的开源项目 ,例如滴滴的kafka?manager还是挺不错的。
https://github.com/didi/LogiKM/blob/master/docs/user_guide/kafka_metrics_desc.md

总结:大部分代码都是百度的,大致API都类似,

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-28 23:17:22  更:2021-07-28 23:17:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/7 2:35:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码