Kafka
一、是什么?
二、核心概念
-
Broker 是kafka集群启动起来的一个JVM进程 -
topic 在Kafka中可以创建多个topic 每个生产者和消费者进行数据读写时,都需要指定topic -
partition 每个topic包含多个分区,分区会被均衡的创建在每一个broker上,分区本质上就是一个文件夹,分区中存储生产者发送的数据 -
replication-factor 每个topic创建时会设置分区的副本个数 副本个数指的是每个分区的数据在集群中保存的数量 topic中的相同分区的不同副本之间,会选举产生leader和follower,leader接收客户端的请求,follower同步leader数据 -
offset 每条消息在分区内部会有一个唯一的单调递增的编号称之为offset -
producer 指定topic 向Kafka集群中发送数据 producer将数据发送到leader分区 -
consumer 指定topic 从kafka中拉取数据 消费者读取数据时会记录自己的读取进度 topic-partition-offset 下次启动时根据上次的记录继续读取数据 -
consumer group 多个消费者线程如果设置为相同的group id,则他们可以共享一份offset记录
三、kafka集群搭建
3.1 搭建zookeeper
-
下载解压
- 由于目前Hadoop生态和Spark生态主流使用的是zk的3.4.x版本,所以我们选择3.4.14版本而不使用最新的3.7.x版本
-
修改配置文件
- 将zoo_sample.cfg复制并重命名为zoo.cfg
# 修改zk保存数据的目录
dataDir=/opt/zookeeper-3.4.14/data
# 集群配置
#server.A=B:C:D
#A:每个zk节点自己唯一的id
#B:每个zk节点自己的主机名
#C:集群之间数据同步使用的端口 默认2888
#D:集群之间选举使用的端口 默认3888
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888 COPY
-
分发安装包 -
在dataDir中添加myid文件 在每个zk节点的dataDir目录中创建一个文本文件,文件中填入当前机器对应的id号 node01 :echo 1 > /opt/zookeeper-3.4.14/data/myid node02 :echo 2 > /opt/zookeeper-3.4.14/data/myid node03 :echo 3 > /opt/zookeeper-3.4.14/data/myid -
配置环境变量 echo 'export ZK_HOME=/opt/zookeeper-3.4.14' >> /etc/profile
echo 'export PATH=.:$ZK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile COPY -
zk启动
- zk没有提供集群启动脚本,所以需要到每台zk节点上分别执行zkServer.sh start
3.2 zookeeper选举机制
- zk集群中每个节点启动时,会互相投票选举出一个作为leader,其他节点作为follower
- zk集群中leader负责调度所有节点进行数据同步
顺序 | leader |
---|
1 2 3 | 2 | 3 2 1 | 3 | 2 3 1 | 3 |
3.2 安装kafka
-
下载解压改名
- 关于版本:
业界主流版本: 0.10、0.11、1.1.x、2.1.x - 最新版本:2.8.0
- 当前文档中我们使用2.4.1,因为2.4.1同时提供scala2.11和scala2.12的兼容
-
修改配置文件
- server.properties
# 为每个Borker设置一个唯一的整数,作为id号
broker.id=1
# 为每个Broker设置服务的主机名和端口
listeners = PLAINTEXT://node01:9092
# 设置 消息保存的文件目录
# kafka中将生产者发送的消息,保存的文件称之为叫store log file
# 不同于通常的日志,store log file仅仅用来存储kafka缓存的消息
log.dirs=/opt/kafka-2.4.1/kafka-logs
# 设置zk集群的连接
# 因为zk集群有多个节点,所以这里添加所有节点的列表,真正连接时,只需要连接到其中一台就可以正常工作,如果连接不上,会在列表中使用下一个连接方式
zookeeper.connect=node01:2181,node02:2181,node03:2181 COPY
-
分发安装包 -
修改个性化配置
- 修改所有机器的
broker.id 和
listeners = PLAINTEXT://node01:9092
-
配置环境变量
echo 'export KFK_HOME=/opt/kafka-2.4.1' >> /etc/profile
echo 'export PATH=.:$KFK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile COPY
- 启动
kafka-server-start.sh -daemon /opt/kafka-2.4.1/config/server.properties COPY
- 测试
四、kafka命令操作
# 查看topic列表
kafka-topic.sh --zookeeper node01:2181 --list
# 创建topic
kafka-topics.sh \
--zookeeper node01:2181 \
--create \
--topic topic01 \
--partitions 3 \
--replication-factor 2
# 启动消费者测试消费
kafka-console-consumer.sh \
--bootstrap-server node01:9092 \
--topic topic01 \
--group test02 \
--from-beginning
# 启动生产者测试生产
kafka-console-producer.sh \
--broker-list e03:9092 \
--topic topic01 COPY
4.2 kafka eagle
# 设置kafka集群连接的zk集群地址
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=node01:2181,node2:2181,node03:2181
# 注释掉之前关于sqlite设置
# 将保存数据使用的数据库换成自己集群的mysql
kafka.eagle.driver=com.mysql.cj.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node01:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456 COPY
- ke要求配置环境变量
export KE_HOME=/opt/kafka-eagle-web-2.0.5
export PATH=.:$KE_HOME/bin:$PATH COPY
- 启动ke
五、Api操作
|