1、环境介绍 操作系统:centos 7.9 jdk版本:8u291 kafka版本:2.8.0 kafka下载地址: https://kafka.apache.org/downloads
节点清单: 10.99.27.111 kafkac01.wtown.com 4核心 8G内存 500G硬盘 10.99.27.112 kafkac02.wtown.com 4核心 8G内存 500G硬盘 10.99.27.113 kafkac03.wtown.com 4核心 8G内存 500G硬盘
2、设置主机名及host文件(三台机器)
10.99.27.111 kafkac01.wtown.com
10.99.27.112 kafkac02.wtown.com
10.99.27.113 kafkac03.wtown.com
10.99.27.11 zk01.wtown.com
10.99.27.12 zk02.wtown.com
10.99.27.13 zk03.wtown.co
3、关闭防火墙和selinux(三台机器) 4、创建数据目录/data,并挂载数据盘(三台机器)
mkdir /data
https://blog.csdn.net/zyj81092211/article/details/118054000
5、配置jdk https://blog.csdn.net/zyj81092211/article/details/118055068
6、创建zookeeper集群 https://blog.csdn.net/zyj81092211/article/details/118066724
7、上传软件到服务器解压并重命名为kafka-connect(三台机器) 8、创建软连接到 /usr/local下(三台机器)
ln -s /data/kafka-connect /usr/local/kafka-connect
9、更改配置文件(三台机器) 编辑配置文件server.properties,替换文件内容为下
vi /data/kafka-connect/config/server.properties
kafakac01.wtown.com:
broker.id=111
listeners=PLAINTEXT://kafkac01.wtown.com:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk01.wtown.com:2181,zk02.wtown.com:2181,zk03.wtown.com:2181/kafka-connect01
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
kafakac02.wtown.com:
broker.id=112
listeners=PLAINTEXT://kafkac02.wtown.com:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk01.wtown.com:2181,zk02.wtown.com:2181,zk03.wtown.com:2181/kafka-connect01
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
kafakac03.wtown.com:
broker.id=113
listeners=PLAINTEXT://kafkac03.wtown.com:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk01.wtown.com:2181,zk02.wtown.com:2181,zk03.wtown.com:2181/kafka-connect01
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=
10、添加环境变量(三台机器)
export KAFKA_HOME=/data/kafka-connect
export PATH=$PATH:$KAFKA_HOME/bin
11、启动kafka集群(三台机器)
kafka-server-start.sh -daemon /data/kafka-connect/config/server.properties
12、kafka集群状态 zookeeper状态: 13、创建插件目录
mkdir /data/kafka-connect/plugins
14、修改connector配置文件connect-distributed.properties
bootstrap.servers=kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092
group.id=connect-cluster-01
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/data/kafka-connect/plugins
15、启动kafka connector
connect-distributed.sh -daemon /data/kafka-connect/config/connect-distributed.properties
16、查看kafka connector状态 查看kafka topic信息
kafka-topics.sh --list --zookeeper zk01.wtown.com:2181,zk02.wtown.com:2181,zk03.wtown.com:2181/kafka-connect01
可以看到已经自动创建了配置文件中的topic connect-configs
17、测试样例 (1)创建测试目录和文件
mkdir /data/test
touch /data/test/in.txt
touch /data/test/out.txt
(2)获取插件信息
curl http://kafkac01.wtown.com:8083/connector-plugins
可以在https://www.sojson.com/格式化json数据(或者直接使用postman请求) (3)建立source connector
curl -i -k -H "Content-type: application/json" -X POST -d '{"name":"in","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","file":"/data/test/in.txt","topic":"localfiles"}}' http://kafkac01.wtown.com:8083/connectors
(4)查看source connector
curl http://kafkac01.wtown.com:8083/connectors/in/status
注意这里,测试程序是本地file获取,所以应该上connector运行的节点上进行文件输入操作,即10.99.27.112上 (5)查看topic
kafka-topics.sh --list --zookeeper zk01.wtown.com:2181,zk02.wtown.com:2181,zk03.wtown.com:2181/kafka-connect01
(6)模拟消费者
kafka-console-consumer.sh --bootstrap-server kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092 --topic localfiles
到10.99.27.112(connector运行的节点上)输入数据到int.txt
消费者这边已经接到数据
(7)创建sink connector
curl -i -k -H "Content-type: application/json" -X POST -d '{"name":"out","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector","tasks.max":"1","topics":"localfiles","file":"/data/test/out.txt"}}' http://kafkac01.wtown.com:8083/connectors
(8)查看sink connector
curl http://kafkac01.wtown.com:8083/connectors/out/status
(9)查看out输出文件(还是要到connector运行节点上去看)
|