最近在学习Flink,不可避免的需要用到kafka消息中间件,下面介绍下单机部署流程
1.下载2.4.1版本
我的Flink版本是1.12,对应的kafka版本要使用2.4.1的 我们下载?kafka_2.11-2.4.1这个版本,2.11是scala版本
http://kafka.apache.org/downloads
2.上传解压tar包
tar -zxvf kafka_2.11-2.4.1.tgz -C .
3.配置环境变量
vi .bash_profile
JAVA_HOME=/root/software/jdk1.8.0_181
HADOOP_HOME=/root/software/hadoop-3.2.0
HIVE_HOME=/root/software/apache-hive-3.1.2-bin
SCALA_HOME=/root/software/scala-2.13.6
SPARK_HOME=/root/software/spark-3.1.2
export KAFKA_HOME=/root/software/kafka_2.11-2.4.1
export KE_HOME=/root/software/kafka-eagle-bin-2.0.6
PATH=$PATH:$HOME/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin:$KE_HOME/bin:$KAFKA_HOME/bin
export JAVA_HOME
export HADOOP_HOME
export HIVE_HOME
export SCALA_HOME
export SPARK_HOME
export KAFKA_HOME
export KE_HOME
export PATH
export HDFS_DATANODE_USER=root
export HDFS_DATANODE_SECURE_USER=root
export HDFS_NAMENODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
生效配置?
source .bash_profile?
4.修改配置文件
vim $KAFKA_HOME/config/server.properties
主要配置3个选项,监听地址,日志地址和zk地址,其它可以保持默认配置
listeners=PLAINTEXT://master104:9092
log.dirs=/root/software/kafka_2.11-2.4.1/kafka-logs
zookeeper.connect=master104:2181
?zookeeper配置,我们新建一个配置文件zoo.cfg,自带的zookeeper.properties有问题
tickTime=2000
dataDir=/root/software/kafka_2.11-2.4.1/zookeeper/data
dataLogDir=/root/software/kafka_2.11-2.4.1/zookeeper/log
clientPort=2181
上面的2个路径需要提前建好并赋给写入权限
5.启动验证
1.启动自带的zookeeper:
我们在kafka目录下执行 bin/zookeeper-server-start.sh? config/zoo.cfg
打印如下内容
?如果后台运行那么执行
bin/zookeeper-server-start.sh -daemon config/zoo.cfg
这里的-daemon为可选参数,加上是后台启动,不加就是前台启动会打印信息到控制台。
检查启动,我们就检查下端口2181
?端口在监听证明启动成功
2.启动kafka:
bin/kafka-server-start.sh config/server.properties
如下图则启动成功
?我们查看9092和2181端口,可以看到已经监听和建立链接了
??如果后台运行那么执行
bin/kafka-server-start.sh -daemon config/server.properties
这里的-daemon为可选参数,加上是后台启动,不加就是前台启动会打印信息到控制台。
接下里我们可以建2个sh文件,方便一次执行就可以启动zookeeper和kafka
vi kafkastart.allsh
#!/bin/sh
#启动zookeeper
bin/zookeeper-server-start.sh -daemon config/zoo.cfg &
sleep 5 #等5秒后执行
#启动kafka
bin/kafka-server-start.sh -daemon config/server.properties &
?停止服务 vi kafkastopall.sh
#!/bin/sh
#关闭zookeeper
./bin/zookeeper-server-stop.sh config/zoo.cfg &
sleep 5 #等5秒后执行
#关闭kafka
./bin/kafka-server-stop.sh config/server.properties &
?创建好文件后记得赋予执行权限
chmod a+x kafkastartall.sh
chmod a+x kafkastopall.sh
3.验证启动:
创建topic:
./kafka-topics.sh --create --zookeeper master104:2181 --replication-factor 1 --partitions 3 --topic sstest
这里的--replication-factors是根据有几个broker节点才可以设置最多有几个备份,因为我是单机的,所以指定的是1。
列所有topic:
./bin/kafka-topics.sh --list --zookeeper master104:2181
启动producer:
./bin/kafka-console-producer.sh --broker-list master104:9092 --topic sstest
启动consumer:
我们新开一个终端执行
./bin/kafka-console-consumer.sh --bootstrap-server master104:9092 --zookeeper master104:2181 --topic sstest
我们会发现上面的命令报错
zookeeper is not a recognized option
大家莫慌,这个不是配置问题,是因为我们的版本比较新,上面的启动命令不支持新版本了
我们改用下面的命令
./bin/kafka-console-consumer.sh --bootstrap-server master104:9092 --topic sstest --from-beginning
出现下面界面则连接成功
?我们在生产终端输入几条数据
然后把终端切换到消费端
?可以看到数据已经被消费了
查看topic信息:
我们可以再开一个终端
./bin/kafka-topics.sh --describe --zookeeper master104:2181 --topic sstest
查看消费情况:
./bin/kafka-consumer-groups.sh --describe --all-groups --all-topics --bootstrap-server master104:9092
./bin/kafka-consumer-groups.sh --describe --all-topics --bootstrap-server master104:9092 --group console-consumer-64910
重置偏移reset-topic
kafka不重要的数据,出现堆积,重新消费需要很久,不如清理一下快
./bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --bootstrap-server master104:9092 --all-topics --group console-consumer-64910 --execute
./bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --bootstrap-server master104:9092 --all-topics --group console-consumer-64910 --execute
执行报下面错误
Error: Assignments can only be reset if the group 'logstash_kafka_group' is inactive, but the current state is Stable
是因为相同的 group 有其他的 topic 正在使用和消费,那么这个 group 是?active 状态。active 状态的 group 的任何 topic 的 offset 是不能够被 reset 的
解决办法是停止消费端,再执行重置
?重置后,我们再启动消费端
发现数据又消费了一遍?
常用命令
创建topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
展示topic bin/kafka-topics.sh --list --zookeeper localhost:2181
描述topic bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
生产者: bin/kafka-console-producer.sh --broker-list 130.51.23.95:9092 --topic my-replicated-topic
消费者: bin/kafka-console-consumer.sh --zookeeper 130.51.23.95:2181 --topic test --from-beginnin
删除topic bin/kafka-topics.sh --delete --zookeeper 130.51.23.95:2181 --topic topicname
删除topic中存储的内容 在config/server.properties中找到如下的位置log.dirs=路径,删除log.dirs指定的文件目录,然后重新启动就可以了
? ? ? ? 感谢能看到这里的朋友😉
? ? ? ? 本次的分享就到这里,猫头鹰数据致力于为大家分享技术干货😎
? ? ? ? 如果以上过程中出现了任何的纰漏错误,烦请大佬们指正😅
? ? ? ? 受益的朋友或对技术感兴趣的伙伴记得点赞关注支持一波🙏
? ? ? ? 也可以搜索关注我的微信公众号【猫头鹰数据分析】,留言交流🙏
|