创建topic(4个分区,2个副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
获取当前kafka的topics
kafka-topics --list --zookeeper zoo1:2181 kafka-topics --list --zookeeper zoo1:2181 --describe
查看某一个topic 的详细信息
kafka-topics --zookeeper zoo1:2181 --topic mytopic --describe
删除某一个topic
kafka-topics --zookeeper zoo1:2181 --delete --topic mytopic
消费数据
kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic mytopic–group admin --from-beginning
消费数据到某个文件
kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic mytopic–group wujing --from-beginning -> hello.txt
生产者发送数据
kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic mytopic
查看主题的情况
kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --group group --describe
主题 | 分区 | 当前位置 | 结束位置 | 剩余 | 消费者id | 主机 | 客户端id |
---|
TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID | HOST | CLIENT-ID | mytopic | 0 | 2290 | 2290 | 0 | consumer-1-2af329b2 | /192.168.1.114 | consumer-1 |
列出所有的消费者
kafka-consumer-groups --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --list
kafka版本 >= 2.2
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
分区扩容
kafka版本 < 2.2
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
kafka版本 >= 2.2
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2
删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
查询集群描述
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181
topic列表查询
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
topic列表查询(支持0.9版本+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
新消费者列表查询(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
新消费者列表查询(支持0.10版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
显示某个消费组的消费详情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
新生产者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
新消费者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
kafka-verifiable-consumer.sh(消费者事件,例如:offset提交等)
bin/kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic test --group-id groupName
高级点的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234 --max-messages 10
kafka版本 <= 2.4
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
kafka新版本
bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port
|