Kafka(七).Kafka监控 Kafka -Eagle 和SpringBoot集成
一.Kafka监控安装
1.下载软件压缩包 http://download.kafka-eagle.org/
2.解压后文件有
[root@CentOSA kafka-eagle]
总用量 0
drwxr-xr-x. 2 root root 33 1月 2 22:26 bin
drwxr-xr-x. 2 root root 62 1月 2 22:26 conf
drwxr-xr-x. 2 root root 6 9月 13 01:12 db
drwxr-xr-x. 2 root root 23 1月 2 22:26 font
drwxr-xr-x. 9 root root 91 9月 13 01:12 kms
drwxr-xr-x. 2 root root 6 9月 13 01:12 logs
3.配置文件
[root@CentOSA conf]# cat system-config.properties
######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
efak.zk.cluster.alias=cluster1,cluster2
# 1.修改zookeeper 集群
cluster1.zk.list=CentOSA:2181,CentOSB:2181,CentOSC:2181
# cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123
######################################
# broker size online list
######################################
cluster1.efak.broker.size=20
######################################
# zk client thread limit
######################################
kafka.zk.limit.size=32
######################################
# EFAK webui port 2.服务端口
######################################
efak.webui.port=8048
######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456
######################################
# kafka offset storage
######################################
cluster1.efak.offset.storage=kafka
# cluster2.efak.offset.storage=zk
######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
######################################
# kafka metrics, 15 days by default 3.报表图 这个要开必须开启kafka jmx
######################################
efak.metrics.charts=true
efak.metrics.retain=15
######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10
######################################
# delete kafka topic token 4.管理的密码
######################################
efak.topic.token=keadmin
######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
#cluster2.efak.sasl.enable=false
#cluster2.efak.sasl.protocol=SASL_PLAINTEXT
#cluster2.efak.sasl.mechanism=PLAIN
#cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
#cluster2.efak.sasl.client.id=
#cluster2.efak.blacklist.topics=
#cluster2.efak.sasl.cgroup.enable=false
#cluster2.efak.sasl.cgroup.topics=
######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=
######################################
# kafka sqlite jdbc driver address
######################################
efak.driver=org.sqlite.JDBC
efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
efak.username=root
efak.password=www.kafka-eagle.org
######################################
# kafka mysql jdbc driver address 5.插件依赖的数据库
######################################
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=root
4.配置KE_HOME
因为启动的时候需要这个变量;
二. Springboot 项目集成
1.pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sff</groupId>
<artifactId>springbootkafka</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>
2.配置文件
spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,CentOSC:9092
#producer
spring.kafka.producer.retries=5
spring.kafka.producer.acks=all
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
## 开启事务 如果 1.代码里面使用事务的api 2.使用transaction 若无发送数据会报错
spring.kafka.producer.transaction-id-prefix=transaction-id-
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.enable.idempotence=true
# consumer
spring.kafka.consumer.group-id=springboot-kafka
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
##批量消费
# 若没有配置 则每次消费一个
# 若配置 KafkaListeners 注解的方法 就不是一个对象了 而是一个List 使用一个对象会报错
spring.kafka.listener.type=batch
spring.kafka.listener.poll-timeout=1000
spring.kafka.listener.concurrency=1
spring.kafka.consumer.max-poll-records=20
3.监听kafka代码
3.1 则每次消费一个
@KafkaListeners(
value = {
@KafkaListener(topics = {"topic06"})
}
)
public void recevice3(ConsumerRecord<String, String> record) {
System.out.println("topic06: "+record.value());
}
3.2 每次消费多个
@KafkaListeners(
value = {
@KafkaListener(topics = {"topic06"})
}
)
public void recevice3(List<ConsumerRecord<String, String>> records) {
System.out.println("consumer size:"+records.size());
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic06: " + record.value());
}
}
4.发送kafka代码
4.1 发送没有事务
public void sendMessage(String topic ,String value) throws InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
kafkaTemplate.send(record);
}
4.2 发送携带事务一
public void testSend(){
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() {
@Override
public Object doInOperations(KafkaOperations<String, String> operations) {
for (int i = 0; i < 50; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("topic06", "testSend"+i);
kafkaTemplate.send(record);
}
return null;
}
});
}
4.3 发送携带事务二
@Transactional
public void sendMessage(String topic, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
kafkaTemplate.send(record);
}
|