IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka安装和使用 -> 正文阅读

[大数据]kafka安装和使用

安装和启动

参考https://kafka.apache.org/quickstart, 下载kafka:
然后进行解压:

$ tar -xzf kafka_2.13-3.0.0.tgz
$ cd kafka_2.13-3.0.0

# 在当前的版本中,Kafka还依赖zookeeper,所以,先启动zookeeper(需要java8)
$ bin/zookeeper-server-start.sh config/zookeeper.properties

然后开启另外一个窗口,启动Kafka Broker执行:

# 启动Kafka Broker
$ bin/kafka-server-start.sh config/server.properties

config/server.properties中的内容:

# broker的标识,每个broker必须都不相同
broker.id=0

# log目录
log.dirs=/tmp/kafka-logs

# zookeeper集群地址,可以配置多个
zookeeper.connect=localhost:2181

Topic创建和消费

# 创建topic
# 注意:在这个版本的kafka中,创建topic需要指定--partitions和--replication-factor参数
$ bin/kafka-topics.sh --create --topic hello --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092

创建完topic之后,可以看到/tmp/kafka-logs下面多了hello-0,hello-1,hello-2这几个文件,分别代表不同的分区。
在这里插入图片描述

写入(开启一个新的窗口):

# 写入
$ bin/kafka-console-producer.sh --topic hello --bootstrap-server localhost:9092
>msg0
>msg1
>msg2
^D

消费(开启一个新的窗口):

# 消费,这里可能乱序,参见下面的原因
$ bin/kafka-console-consumer.sh --topic hello --from-beginning --bootstrap-server localhost:9092
msg1
msg0
msg2

上面消息看似乱序,实则因为它们写入的分区不同:
在这里插入图片描述
我们可以看到msg0,msg2写入了分区0,而msg1写入了分区2。

集群

前面提到config/server.properties文件,用于指定broker启动的配置。其中,zookeeper.connect指定所要连接的zk集群,所有使用同一组zk集群的broker自动成为一个kafka集群。所以,我们可以构造另外一个config/server-2.properties:

# 需要改变监听的地址,默认是9092 
listeners=PLAINTEXT://:9093

# broker的标识,每个broker必须都不相同
broker.id=1

# log目录
log.dirs=/tmp/kafka-logs-2

# zookeeper集群地址,可以配置多个
zookeeper.connect=localhost:2181

关于broker.id, 值必须显式指定,不会随机生成。如果使用broker.id=0重新启动,会产生NodeExists错误

[2021-10-19 21:31:06,199] ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists
       at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
       at kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
       at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
       at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
       at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
       at kafka.server.KafkaServer.startup(KafkaServer.scala:319)
       at kafka.Kafka$.main(Kafka.scala:109)
       at kafka.Kafka.main(Kafka.scala)

创建第2个topic,然后查看topic详情:

# 创建hello2
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic hello2 --partitions 4 --replication-factor 1 

# 列出
$ bin/kafka-topics.sh  --bootstrap-server localhost:9092 --topic hello,hello2  --describe

在这里插入图片描述
从上面的结果可以看出,加入新的broker,已有的topic分区不会重新分配,hello所有的Leader都是broker-0。

关于rebalance
注意,rebalance不是指topic创建,或者broker数量变化时,为已分配的分区重新进行同步。相反,broker数量变化不会触发分区重新分配。
rebalance指的是consumer group,当consumer group中consumer的数量发生变化时,会对consumer所消费的分区进行重分配。

文章:https://medium.com/streamthoughts/apache-kafka-rebalance-protocol-or-the-magic-behind-your-streams-applications-e94baf68e4f2

分区重分配

使用bin/kafka-reassign-partitions.sh,分为两步:1.先生成计划, 2.执行计划

# 生成计划
$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file config/topics-to-move.json  --broker-list 0,1,2 --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"hello","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"hello","partition":1,"replicas":[0],"log_dirs":["any"]},{"topic":"hello","partition":2,"replicas":[0],"log_dirs":["any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"hello","partition":0,"replicas":[1],"log_dirs":["any"]},{"topic":"hello","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"hello","partition":2,"replicas":[0],"log_dirs":["any"]}]}

config/topics-to-move.json列出需要重分配的topic:

{
    "version":1,
    "topics":[{"topic":"hello"}]
}

将计划内容写入文件config/reassignment-json-file.json中,然后执行:

$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file config/reassignment-json-file.json --execute

# 重新查看topic分配
$ bin/kafka-topics.sh  --bootstrap-server localhost:9092 --topic hello,hello2  --describe

在这里插入图片描述
hello已经被重新分配。

特殊的topic: __consumer_offsets

在/tmp/kafka-logs下面

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-20 12:33:20  更:2021-10-20 12:35:16 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 6:19:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码