1,java代码获取kafka的基础信息,查出所有的topic,然后遍历信息
package kafkamonitor;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class KafkaOffset_test {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaOffset_test.class);
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map requestInfo = Maps.newHashMap();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
LOGGER.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
/**
* @param brokers broker 地址
* @param topic topic
* @return map
*/
public static Map findLeader(List<String> brokers, String topic) {
Map<Integer,PartitionMetadata> map = Maps.newHashMap();
for (String broker : brokers) {
SimpleConsumer consumer = null;
try {
String[] hostAndPort = broker.split(":");
consumer = new SimpleConsumer(hostAndPort[0], Integer.parseInt(hostAndPort[1]), 100000, 64 * 1024, "leaderLookup" + new Date().getTime());
List topics = Lists.newArrayList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
map.put(part.partitionId(), part);
}
}
} catch (Exception e) {
LOGGER.error("Error communicating with Broker [" + broker + "] to find Leader for [" + topic + ", ] Reason: " + e);
} finally {
if (consumer != null){
consumer.close();
}
}
}
return map;
}
public static List<JSONObject> monitor(List<String> brokers, List<String> topics) {
if (brokers == null || brokers.isEmpty()) {
return null;
}
if (topics == null || topics.isEmpty()) {
return null;
}
List<JSONObject> list = new ArrayList<>();
for (String topicName : topics) {
JSONObject json = new JSONObject();
Map<Integer,PartitionMetadata> metadata = findLeader(brokers, topicName);
long totalSize = 0L;
JSONArray jsonArray = new JSONArray();
for (Map.Entry<Integer,PartitionMetadata> entry : metadata.entrySet()) {
int partition = entry.getKey();
String leadBroker = entry.getValue().leader().host();
String clientName = "Client_" + topicName + "_" + partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000, 64 * 1024, clientName);
//todo whichTime 要获取offset的时间,-1 最新,-2 最早
long endOffset = getLastOffset(consumer, topicName, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
System.out.println("endOffset = " + endOffset);
long startOffset = getLastOffset(consumer, topicName, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
System.out.println("startOffset = " + startOffset);
long total_number = endOffset - startOffset;
String loadUrlPattern = "parition=%s,startOffset=%s,endOffset=%s,parition_number=%s";
String format = String.format(loadUrlPattern, partition, startOffset, endOffset, total_number);
totalSize += total_number;
jsonArray.add(format);
consumer.close();
}
json.put("topicName",topicName);
json.put("infos",jsonArray.toJSONString());
json.put("totalSize",totalSize);
list.add(json);
}
return list;
}
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());// key反序列化方式
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());// value反系列化方式
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 提交方式
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.x.xx:9092");// 指定broker地址,来找到group的coordinator
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "tuyou");// 指定用户组
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
Set<String> strings = consumer.listTopics().keySet();
List<String> result = new ArrayList<>(strings);
System.out.println(strings);
ArrayList<String> topicList = Lists.newArrayList("ods.rds_core.plateform_stable.order_driver");
ArrayList<String> serverList = Lists.newArrayList("192.168.x.xx:9092", "192.168.x.xx:9092", "192.168.x.xx:9092");
List<JSONObject> monitorList = monitor(serverList, topicList);
System.out.println("monitorList = " + monitorList);
}
}
运行结果:

2,JMX监控查询信息:
import com.google.common.collect.Lists;
import javax.management.*;
import java.io.IOException;
/**
* @program: flink-scala-opt
* @description:
* @author: Mr.Wang
* @create: 2021-07-13 14:02
**/
public class Kafka_JMX_test {
public static void main(String[] args) {
try {
KafkaJmxConnection jmxConn = new KafkaJmxConnection("192.168.7.15:9393");
jmxConn.init();
while(true) {
String topicName = "aaa";
// 与topic无关的metric
Object o1 = jmxConn.getValue(
"kafka.server:type=ReplicaManager,name=PartitionCount",
Lists.newArrayList("Value"));
System.out.println(o1);
// 与topic有关的metric
Object o2 = jmxConn.getValue(topicName,
"kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec",
Lists.newArrayList("Count", "OneMinuteRate", "FiveMinuteRate"));
System.out.println(o2);
Thread.sleep(5000);
}
} catch (IOException e) {
e.printStackTrace();
} catch (MalformedObjectNameException e) {
e.printStackTrace();
} catch (MBeanException e) {
e.printStackTrace();
} catch (AttributeNotFoundException e) {
e.printStackTrace();
} catch (InstanceNotFoundException e) {
e.printStackTrace();
} catch (ReflectionException e) {
e.printStackTrace();
} catch (IntrospectionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("1111 = " + 1111);
}
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* @program: flink-scala-opt
* @description:
* @author: Mr.Wang
* @create: 2021-07-13 14:08
**/
public class KafkaJmxConnection {
private Logger log = LoggerFactory.getLogger(this.getClass());
private MBeanServerConnection conn;
/**
* 默认连接的ip和端口号
*/
private String ipAndPort = "localhost:9999";
public KafkaJmxConnection(String ipAndPort){
this.ipAndPort = ipAndPort;
}
public boolean init() throws Exception {
String jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";
log.info("init jmx, jmxUrl: {}, and begin to connect it",jmxURL);
try {
// 初始化连接jmx
JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
JMXConnector connector = JMXConnectorFactory.connect(serviceURL, null);
conn = connector.getMBeanServerConnection();
if(conn == null){
log.error("getValue connection return null!");
return false;
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
public String getName(String metric, String topicName){
if (topicName==null){
return metric;
}
// 一些metric与topic有关, 如果要指定具体的topic的话
return metric + ",topic=" + topicName;
}
public Map<String, Object> getValue(String topicName, String metric, Collection<String> attrs){
ObjectName objectName;
try {
objectName = new ObjectName(this.getName(metric, topicName));
} catch (MalformedObjectNameException e) {
e.printStackTrace();
return null;
}
Map<String, Object> result = new HashMap<>();
// 遍历所有属性, 获取每个属性的结果, 并将属性和属性对应的结果保存到map中
for(String attr:attrs){
result.put(attr, getAttribute(objectName, attr));
}
return result;
}
public Map<String, Object> getValue(String metric, Collection<String> attrs){
return getValue(null, metric, attrs);
}
private Object getAttribute(ObjectName objName, String objAttr){
if(conn== null){
log.error("jmx connection is null");
return null;
}
try {
return conn.getAttribute(objName,objAttr);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
?JMX参数参考:
https://cloud.tencent.com/developer/article/1554002
完整的开源项目 ,例如滴滴的kafka?manager还是挺不错的。 https://github.com/didi/LogiKM/blob/master/docs/user_guide/kafka_metrics_desc.md
总结:大部分代码都是百度的,大致API都类似,
|