版本:kafka_2.13-2.8.0
1、查看当前服务器中的所有topic
bin/kafka-topics.sh --list --bootstrap-server hadoop201:9092
2、创建topic
bin/kafka-topics.sh --create --bootstrap-server hadoop201:9092 --topic firstTopic
也可以在创建 topic 时,手动方式指定分区数、副本数
bin/kafka-topics.sh --create --bootstrap-server hadoop201:9092 --replication-factor 2 --partitions 3 --topic firstTopic ? 备注: ??replication-factor 2 ??备份数(2个备份) ??partitions 3 ??分区数(3个分区) ??topic firstTopic ??firstTopic为topic的名称
3、显示topic详细信息
bin/kafka-topics.sh --describe --topic firstTopic --bootstrap-server hadoop201:9092
[hadoop@hadoop201 kafka]$ bin/kafka-topics.sh --describe --topic firstTopic --bootstrap-server hadoop201:9092
Topic: firstTopic TopicId: n0-rqoLVQkqsriiCElr4mw PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: firstTopic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
4、删除topic
bin/kafka-topics.sh --delete --bootstrap-server hadoop201:9020 --topic firstTopic
- 老版本:需要server.properties中设置
delete.topic.enable=true ,否则只是标记删除或者直接重启。 - 新版本:已经没有
delete.topic.enable=true 这个配置,可以直接删除 topic 了。如果有生产者或者消费者在使用该topic,需要停掉生产者或者消费者才能进行删除)
5、命令行发送消息
bin/kafka-console-producer.sh --topic firstTopic --bootstrap-server hadoop201:9092
备注:
??kafka01:主机名(通过vi /etc/sysconfig/network,修改hostname即可,此处可填IP地址) ??orderMq:为topic的名称
6、命令行消费消息
bin/kafka-console-consumer.sh --topic firstTopic --from-beginning --bootstrap-server hadoop201:9092
备注:
--from-beginning:会把主题中以往所有的数据都读取出来。即:从头开始读
7、查看消费位置
1.使用 --all-groups查看所有组 bin/kafka-consumer-groups.sh --bootstrap-server hadoop201:9092 --describe --all-groups ? 2.使用 --group 查看指定组 bin/kafka-consumer-groups.sh --bootstrap-server hadoop201:9092 --describe --group 组名

8、修改topic的分区数
bin/kafka-topics.sh --bootstrap-server hadoop201:9092 --alter --topic firstTopic --partitions 6

|