一、环境搭建
1.1 安装包及环境准备
zookeeper:3.7.0 (apache-zookeeper-3.7.0-bin.tar.gz)
kafka:2.8.0 (kafka_2.13-2.8.0.tgz)
jdk:jdk-8u301 (jdk-8u301-linux-x64.tar.gz)
zookeeper、kafka均采用集群部署,主机采用centos7.9,环境信息如下:
主机名 | ip | 应用部署 |
---|
node1 | 192.168.206.201 | zookeeper、kafka | node1 | 192.168.206.202 | zookeeper、kafka | node1 | 192.168.206.203 | zookeeper、kafka |
hosts文件配置如下:
$ vi /etc/hosts
192.168.206.201 node1
192.168.206.202 node2
192.168.206.203 node3
1.2 JDK安装
服务器上解压安装包,配置环境变量,不赘述,可参照如下:(3台服务器均需操作)
$ tar -xzvf jdk-8u301-linux-x64.tar.gz -C /usr/local/
$ vi .bash_profile
PATH=$PATH:$HOME/bin
export PATH
export JAVA_HOME=/usr/local/jdk1.8.0_301
export PATH=$JAVA_HOME/bin:$PATH
$ source .bash_profile
测试:
$ java -version
输入如下:
java version "1.8.0_301"
Java(TM) SE Runtime Environment (build 1.8.0_301-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.301-b09, mixed mode)
1.3 Zookeeper安装
服务器上解压并安装
$ tar -xzvf apache-zookeeper-3.7.0-bin.tar.gz
$ cd apache-zookeeper-3.7.0-bin/conf/
$ vi zoo.cfg
tickTime=2000
dataDir=/data/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
$ mkdir -p /data/zookeeper
$ vi /data/zookeeper/myid
1
##此文件内写入zoo.cfg文件中配置的人最后三行server.序号=IP:2888:3888 对应的序号,如node1则为1 node2则为2
依次在三台服务器上配置,注意上述的myid文件中的序号按照zoo.cfg中的序号配置。 启动zk:
$ bin/zkServer.sh start
启动成功:
ZooKeeper JMX enabled by default
Using config: /root/apache-zookeeper-3.7.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
客户端测试:
$ bin/zkCli.sh -server 127.0.0.1:2181
进入zk交互终端,执行测试命令
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /
[zookeeper]
1.4 Kafka安装
服务器解压并安装:
$ tar -xzvf kafka_2.13-2.8.0.tgz
$ cd kafka_2.13-2.8.0
$ vi config/server.properties
broker.id=1 #每台服务器唯一且不冲突,用于区分不同的broker
log.dirs=/data/kafka #修改目录,此目录为kafka日志数据文件的存储路径
zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka
启动服务:
$ bin/kafka-server-start.sh config/server.properties
测试:
查看zookeeper集群数据:
ls /kafka/brokers/ids
输出结果:
[1, 2, 3]
二、基本命令使用
2.1 创建Topic
$ bin/kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --create --topic hello-topic --partitions 3 --replication-factor 3
–topic hello-topic:topic主题名称(例:hello-topic为topic名称) –partitions 3:设置分区数量(例:该topic有3个分区) –replication-factor 3:副本因子(例: 该topic下每个分区有3个副本备份)
2.2 列出所有Topic
$ bin/kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --list
将会显示所有的topic列表
2.3 获取某个Topic的详细信息
$ bin/kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --topic hello-topic
输出结果如下:(将会展示分区信息,分区主节点信息,副本节点信息,ISR节点信息)
Topic: hello-topic TopicId: uX53uX8dSDW-qP-0KKreDg PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: hello-topic Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: hello-topic Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: hello-topic Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
说明 Partition: 0 此处的0代表分区编号 Leader: 1 此分区的主节点信息 Replicas: 1,3,2 此分区的副本节点信息 Isr: 1,3,2 当前可用的ISR节点信息 可参照此文章进一步了解ISR机制
2.4 发送消息
$ bin/kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic hello-topic
进入交互终端可在线发送消息到topic
>This is my first event
>This is my second event
>This is my third event
>
2.5 消费消息
$ bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic hello-topic --from-beginning
将会实时输出消息结果:
This is my first event
This is my third event
This is my second event
说明 以下选项可以定义消息的消费开始位置: –from-beginning:如果使用者尚未建立要使用的偏移量,则从日志中出现的最早消息开始 –offset :要从中使用的偏移量id(非负数的数字),或字符“earliest”表示从开始,或“latest”表示从结束(默认值:最新。如果使用offset偏移量来精确指定消息位置,则需要指定–partition选项,明确指定分区信息 例如: $ bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic hello-topic --offset 11 --partition 2 –group:可指定group消费分组
2.6 获取消费者Group列表
$ bin/kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --all-groups --list
2.7 获取消费者Group详细信息
$ bin/kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --group hello-group --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
hello-group hello-topic 0 14 14 0 consumer-hello-group-1-cc2ea682-4bb7-41a0-b87c-5970522de997 /192.168.206.201 consumer-hello-group-1
hello-group hello-topic 1 19 19 0 consumer-hello-group-1-cc2ea682-4bb7-41a0-b87c-5970522de997 /192.168.206.201 consumer-hello-group-1
hello-group hello-topic 2 24 24 0 consumer-hello-group-1-cc2ea682-4bb7-41a0-b87c-5970522de997 /192.168.206.201 consumer-hello-group-1
*说明*
CURRENT-OFFSET:该group对当前分区的消费偏移量
LOG-END-OFFSET:当前分区的偏移量,即已提交的最大消息偏移量(CURRENT-OFFSET <= LOG-END-OFFSET)
2.8 查看当前topic的消息总数
$ ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node1:9092,node3:9092,node2:9092 --topic hello-topic --time -1
输出结果如下:
hello-topic:0:20
hello-topic:1:27
hello-topic:2:29
说明 通常采用 kafka-consumer-groups.sh命令查看LOG-END-OFFSET即可,如若没有消费者的情况下可使用此命令
三、SDK使用
3.1 工程Maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
3.2 生产者Demo
package com.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Properties;
public class ProducerDemo {
private static final Logger LOGGER = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) throws IOException {
String topic = "hello-topic";
String bootstrapServer = "node1:9092,node2:9092,node3:9092";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 100000; i++) {
for (int j = 0; j < 3; j++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, j, j + "", j + "-" + i + "-msg");
producer.send(record, (metadata, exception) -> {
int partition = metadata.partition();
long offset = metadata.offset();
LOGGER.debug("partition:{},offset:{},key:{},value:{}!", partition, offset, record.key(), record.value());
if (exception != null) {
LOGGER.error("error!", exception);
}
});
}
}
System.in.read();
}
}
Demo模拟了key、value均为String形式的消息发送,并采用明确指定分区的方式连续发送数据。
参数中:需要关注ack的参数设置 0:不等待broker的ack响应,即本地开始socket即返回 1:broker主节点写入成功即返回,不保证副本成功 -1:broker主节点及ISR集合内至少有1个副本写入成功才返回(分布式环境建议使用此模式)
3.3 消费者Demo
package com.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.*;
public class ConsumerDemo {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerDemo.class);
public static void main(String[] args) throws IOException {
String topic = "hello-topic";
String bootstrapServer = "node1:9092,node2:9092,node3:9092";
String groupId = "consumer-java-group";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE.toString());
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(e -> LOGGER.info("onPartitionsRevoked -> topic:{},partition:{}", e.topic(), e.partition()));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach(e -> LOGGER.info("onPartitionsAssigned -> topic:{},partition:{}", e.topic(), e.partition()));
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000L));
records.forEach(r -> {
LOGGER.debug("partition:{},offset:{},key:{},value:{}", r.partition(), r.offset(), r.key(), r.value());
});
}
}
}
关于offset Consumer的offset可自行编码采用第三方维护,如数据库、redis等,在消费开始时,根据自己维护的信息,使用Consumer的seek(TopicPartition partition, long offset)函数进行重置,开始消费。在没有特殊业务需求的情况下,则使用kafka的自动保存机制即可,其原理是kafka中维护了__consumer_offsets的topic,用来保存consume的不同group的offset,默认5秒提交一次。当然也可以使用非自动提交机制,显式的调用提交函数,commitAsync()或commitSync()。 关于offset可进一步参考文章
四、系统架构
//todo
|