IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka集群部署 -> 正文阅读

[大数据]Kafka集群部署

1)解压安装包

tar -zxvf kafka_2.11-0.11.0.0.tgz

2)修改解压后的文件名称

 mv kafka_2.11-0.11.0.0/ kafka

3)在/opt/module/kafka目录下创建logs文件夹

mkdir logs

4)修改配置文件

cd config/
?
vim server.properties

输入以下内容:

注:千万不要有空格

#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径 
log.dirs=/opt/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=wt01:2181,wt02:2181,wt03:2181
192.168.20.32:2181,192.168.20.33:2181,192.168.20.34:2181

5)配置环境变量

vim /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
?
刷新环境变量
source /etc/profile

6)分发安装包

xsync kafka/

注意:分发之后记得配置其他机器的环境变量

7)分配

分别在gz005和gz006上修改配置文件vim /opt/kafka/config/server.properties中的
broker.id=1  broker.id=2

注:broker.id不得重复

8)启动集群

首先启动zookeeper集群

启动 Server
三个节点都需要执行,没有一键启动的脚本

/opt/zookeeper/bin/zkServer.sh start

依次在gz004、gz005、gz006节点上启动kafka

在kafka目录运行 

# bin/kafka-server-start.sh -daemon config/server.properties

9)关闭集群

 bin/kafka-server-stop.sh stop

Kafka命令行操作

1)查看当前服务器中的所有topic

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

查看某个topic里的数据

bin/kafka-console-consumer.sh --bootstrap-server 192.167.30.21:9092 --topic topicName --from-beginning

2)创建topic

bin/kafka-topics.sh --zookeeper 192.168.20.32:2181 --create --replication-factor 3 --partitions 1 --topic t01

选项说明:

--topic 定义topic名

--replication-factor 定义副本数

--partitions 定义分区数

3)删除topic

 bin/kafka-topics.sh --zookeeper 192.168.20.32:2181 --delete --topic t01

需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

4)发送消息

 bin/kafka-console-producer.sh \
?
--broker-list gz004:9092 --topic first
?
\>hello world

5)消费消息

 bin/kafka-console-consumer.sh \
?
--zookeeper hadoop102:2181 --from-beginning --topic first

--from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

6)查看某个Topic的详情

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 \

--describe --topic first ?? ? ? ? ? ? ? ? ? ? ? ? ? ? 

kafka与flume连接

1.启动zookeeper  三个主机都启动
 /opt/zookeeper/bin/zkServer.sh start
2.启动kafka 三个主机都启动  启动后不要退出,在在三台主机基础上复制三个xshell窗口进行下面的操作。
   
 /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
3.在opt目录下放入flume的安装包
4.创建一个access.log文件 在里面输入信息
5.将access.log 文件放入到 /opt下  这边的文件跟fk.conf的source路径有关

6.将fk.conf 放入到flume下的 /opt/flume/conf  目录下。
# define
?
a1.sources = r1
a1.sinks = k1
a1.channels = c1
?
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/access.log
a1.sources.r1.shell = /bin/bash -c
?
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = wt01:9092,wt02:9092,wt03:9092
a1.sinks.k1.topic = t02
a1.sinks.k1.flumeBatchSize = 20
a1.sinks.k1.producer.acks = 1
a1.sinks.k1.producer.linger.ms = 1
?
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
?
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
7.创建 topic
bin/kafka-topics.sh --zookeeper wt01:2181 --create --replication-factor 3 --partitions 1 --topic t03
8.在gz006主机中进入到t02 消费消息模式 
bin/kafka-console-consumer.sh --zookeeper wt01:2181 --from-beginning --topic t03
9.在gz004主机上运行flume配置文件
 bin/flume-ng agent -c conf -f conf/fk.conf -Dflume.root.logger=DEBUG,console -n al
10.再次复制一个gz004xshell端口,进入到/opt/kafka/logs 查看t02-0 中的日志

kafka监控工具

1)启动集群

zookeeper集群

 /opt/zookeeper/bin/zkServer.sh start

kafka集群

bin/kafka-server-start.sh config/server.properties

2)把 KafkaOffsetMonitor-assembly-0.2.0.jar 放到kafka的lib目录下

3)运行命令

java -cp /opt/kafka/libs/KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk wt01:2181 --port 9000 --refresh 10.seconds --retain 1.days

4)访问 wt01:9000

Kafka Eagle安装及使用

参考网址:blog.csdn.net/locky_lll/article/details/104840827

1)kafka-eagle-bin-1.2.4.tar.gz 放到opt下解压

tar -zxvf kafka-eagle-web-1.2.4-bin.tar.gz

2)设置全局变量 vim /etc/profile

#添加代码

export KE_HOME=/opt/kafka-eagle-web-1.2.4 
export PATH=$PATH:$KE_HOME/bin

#刷新 source /etc/profile

3)进入kafka-eagle的conf目录下修改配置文件

#修改配置文件 vim system-config.properties

#修改

kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://wt01:3306/kafka-eagle (数据库名字)
kafka.eagle.username=root
kafka.eagle.password=123456

4)文件配置完毕后开始启动 ,启动要进入bin目录下

#进入bin目录后会看到 ke.sh 文件先修改文件的权限 chmod -x ke.sh

#启动命令 ./ke.sh start 启动成功后的控制台输出 http://wt01:8048/ke/

hive要启动

 ./hive --service metastore &

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-28 09:24:49  更:2021-08-28 09:25:58 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:47:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码