安装和启动
参考https://kafka.apache.org/quickstart, 下载kafka: 然后进行解压:
$ tar -xzf kafka_2.13-3.0.0.tgz
$ cd kafka_2.13-3.0.0
$ bin/zookeeper-server-start.sh config/zookeeper.properties
然后开启另外一个窗口,启动Kafka Broker执行:
$ bin/kafka-server-start.sh config/server.properties
config/server.properties 中的内容:
broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
Topic创建和消费
$ bin/kafka-topics.sh --create --topic hello --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
创建完topic之后,可以看到/tmp/kafka-logs 下面多了hello-0 ,hello-1 ,hello-2 这几个文件,分别代表不同的分区。
写入(开启一个新的窗口):
$ bin/kafka-console-producer.sh --topic hello --bootstrap-server localhost:9092
>msg0
>msg1
>msg2
^D
消费(开启一个新的窗口):
$ bin/kafka-console-consumer.sh --topic hello --from-beginning --bootstrap-server localhost:9092
msg1
msg0
msg2
上面消息看似乱序,实则因为它们写入的分区不同: 我们可以看到msg0,msg2写入了分区0,而msg1写入了分区2。
集群
前面提到config/server.properties 文件,用于指定broker启动的配置。其中,zookeeper.connect 指定所要连接的zk集群,所有使用同一组zk集群的broker自动成为一个kafka集群。所以,我们可以构造另外一个config/server-2.properties :
listeners=PLAINTEXT://:9093
broker.id=1
log.dirs=/tmp/kafka-logs-2
zookeeper.connect=localhost:2181
关于broker.id, 值必须显式指定,不会随机生成。如果使用broker.id=0重新启动,会产生NodeExists错误
[2021-10-19 21:31:06,199] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists
at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
at kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
at kafka.server.KafkaServer.startup(KafkaServer.scala:319)
at kafka.Kafka$.main(Kafka.scala:109)
at kafka.Kafka.main(Kafka.scala)
创建第2个topic,然后查看topic详情:
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic hello2 --partitions 4 --replication-factor 1
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic hello,hello2 --describe
从上面的结果可以看出,加入新的broker,已有的topic分区不会重新分配,hello所有的Leader都是broker-0。
关于rebalance 注意,rebalance不是指topic创建,或者broker数量变化时,为已分配的分区重新进行同步。相反,broker数量变化不会触发分区重新分配。 rebalance指的是consumer group,当consumer group中consumer的数量发生变化时,会对consumer所消费的分区进行重分配。
文章:https://medium.com/streamthoughts/apache-kafka-rebalance-protocol-or-the-magic-behind-your-streams-applications-e94baf68e4f2
分区重分配
使用bin/kafka-reassign-partitions.sh ,分为两步:1.先生成计划, 2.执行计划
$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file config/topics-to-move.json --broker-list 0,1,2 --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"hello","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"hello","partition":1,"replicas":[0],"log_dirs":["any"]},{"topic":"hello","partition":2,"replicas":[0],"log_dirs":["any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"hello","partition":0,"replicas":[1],"log_dirs":["any"]},{"topic":"hello","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"hello","partition":2,"replicas":[0],"log_dirs":["any"]}]}
config/topics-to-move.json 列出需要重分配的topic:
{
"version":1,
"topics":[{"topic":"hello"}]
}
将计划内容写入文件config/reassignment-json-file.json 中,然后执行:
$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file config/reassignment-json-file.json --execute
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic hello,hello2 --describe
hello已经被重新分配。
特殊的topic: __consumer_offsets
在/tmp/kafka-logs下面
|