什么是Kafka?
Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。最新官方给Kafka的定义是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
Kafka集群的搭建
部署服务器发行版本为CentOS7 ?? 三台(已安装好Java环境),三台服务器的IP为:
- 192.168.182.4
- 192.168.182.5
- 192.168.182.6
部署Kafka版本为kafka_2.12-3.1.0
下载安装包、上传服务器
https://kafka.apache.org/downloads
将下载好的安装包上传至每台服务器的/opt/module/ 目录下后解压重命名为kafka 。
修改server.properties
分别进入三台服务器kafka的配置文件目录config ,修改服务配置文件server.properties 。
- 将broker.id分别修改为0、1、2(不可重复)
- 添加
listeners=PLAINTEXT://192.168.182.4:9092 配置,对应好每个服务器的IP地址 - 修改
log.dirs=/opt/module/kafka/datas - 修改
zookeeper.connect=192.168.182.4:2181,192.168.182.5:2181,192.168.182.6:2181
主要修改部分
...
broker.id=0
listeners=PLAINTEXT://192.168.182.4:9092
log.dirs=/opt/module/kafka/datas
zookeeper.connect=192.168.182.4:2181,192.168.182.5:2181,192.168.182.6:2181
...
...
broker.id=1
listeners=PLAINTEXT://192.168.182.5:9092
log.dirs=/opt/module/kafka/datas
zookeeper.connect=192.168.182.4:2181,192.168.182.5:2181,192.168.182.6:2181
...
...
broker.id=2
listeners=PLAINTEXT://192.168.182.6:9092
log.dirs=/opt/module/kafka/datas
zookeeper.connect=192.168.182.4:2181,192.168.182.5:2181,192.168.182.6:2181
...
修改zookeeper.properties
分别进入三台服务器kafka的配置文件目录config ,修改服务配置文件修改zookeeper.properties 。
- 修改
dataDir=/opt/module/kafka/zk-datas ,后需要手动创建该目录,并创建myid文件,三台服务器的myid不可重复,分别为0,1,2 - 添加节点的配置:
server.0=192.168.182.4:2888:3888
server.1=192.168.182.5:2888:3888
server.2=192.168.182.6:2888:3888
- 添加配置:
initLimit=10
syncLimit=5
tickTime=2000
三台服务器保持一致,像下面这样
dataDir=/opt/module/kafka/zk-datas
clientPort=2181
initLimit=10
syncLimit=5
tickTime=2000
maxClientCnxns=0
admin.enableServer=false
server.0=192.168.182.4:2888:3888
server.1=192.168.182.5:2888:3888
server.2=192.168.182.6:2888:3888
修改bin/kafka-server-stop.sh
官方的停止服务的脚本不太好用,将三台服务器的kafka-server-stop修改成下面的内容
#!/bin/sh
PIDS=$(jps -lm | grep -i "kafka.Kafka" | awk '{print $1}')
if [ -z "$PIDS" ]; then
echo "No kafka server to stop"
exit 1
else
kill -s TERM $PIDS
fi
注意:当使用ssh来远程执行此脚本的时候,可能会报jps命令找不到,但是在服务器本身上执行是没有问题,原因是你服务器的jps命令不在Path中,加上去就行,或者使用jps的绝对路径。
启动测试
一定要先启动zk再启动kafka,关闭的时候先关闭kafka,再关闭zk。
启动Zookerper
三台服务器上分别执行下面语句启动Zookerper
/opt/module/kafka/bin/zookeeper-server-start.sh /opt/module/kafka/config/zookeeper.properties
/opt/module/kafka/bin/zookeeper-server-start.sh -daemon /opt/module/kafka/config/zookeeper.properties
jps -lm 可以看到zk已经启动成功。
启动Kafka
三台服务器上分别执行下面语句启动Kafka
/opt/module/kafka/bin/kafka-server-start.sh /opt/module/kafka/config/server.properties
/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
jps -lm 可以看到kafka已经启动成功。
关闭Kafka
三台服务器上分别执行下面语句关闭Kafka
/opt/module/kafka/bin/kafka-server-stop.sh
jps -lm 可以看到kafka已经关闭成功。
关闭Zookerper
三台服务器上分别执行下面语句关闭Zookerper
/opt/module/kafka/bin/kafka-server-stop.sh
jps -lm 可以看到zookerper已经关闭成功。
编写集群启停脚本
刚才我们是分别在每台服务器上执行启动、停止的命令,这种方式不够方便。所以我们可以通过ssh远程命令来远程在集群中的一台服务器上远程执行另外两台服务器的命令,前提是需要配置好ssh免密登陆,前面的文章与有介绍。
Zookerper的集群启停脚本
创建zk.sh 文件,输入以下内容:
#!/bin/bash
case $1 in
"start"){
for i in 192.168.182.4 192.168.182.5 192.168.182.6
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
};;
"stop"){
for i in 192.168.182.4 192.168.182.5 192.168.182.6
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"
done
};;
esac
赋予运行权限chmod +x zk.sh
- 启动集群:./zk.sh start
- 关闭集群:./zk.sh stop
Kafka的集群启停脚本
创建kafka.sh 文件,输入以下内容:
#!/bin/bash
case $1 in
"start"){
for i in 192.168.182.4 192.168.182.5 192.168.182.6
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
};;
"stop"){
for i in 192.168.182.4 192.168.182.5 192.168.182.6
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"
done
};;
esac
赋予运行权限chmod +x kafka.sh
- 启动集群:./kafka.sh start
- 关闭集群:./kafka.sh stop
|