1、环境介绍 操作系统:centos 7.9 jdk版本:8u291 flume版本:1.9.0 flume下载地址: http://flume.apache.org/download.html 2、flume集群架构和数据流向 agent中的数据流向
3、资源规划
10.99.27.121 flumec01.wtown.com 4核心 8G内存 100G硬盘 10.99.27.122 flumec02.wtown.com 4核心 8G内存 100G硬盘
10.99.27.131 flumea01.wtown.com 4核心 8G内存 100G硬盘 10.99.27.132 flumea02.wtown.com 4核心 8G内存 100G硬盘 10.99.27.133 flumea03.wtown.com 4核心 8G内存 100G硬盘 10.99.27.134 flumea04.wtown.com 4核心 8G内存 100G硬盘
4、关闭防火墙和selinux(所有节点) 5、配置主机名并设置hosts文件(所有节点)
10.99.27.121 flumec01.wtown.com
10.99.27.122 flumec02.wtown.com
10.99.27.131 flumea01.wtown.com
10.99.27.132 flumea02.wtown.com
10.99.27.133 flumea03.wtown.com
10.99.27.134 flumea04.wtown.com
10.99.27.101 kafka01.wtown.com
10.99.27.102 kafka02.wtown.com
10.99.27.103 kafka03.wtown.com
6、安装jdk(所有节点) https://blog.csdn.net/zyj81092211/article/details/118055068
7、kafka集群搭建 https://blog.csdn.net/zyj81092211/article/details/119326105
8、建立数据目录/data(所有节点)
mkdir /data
9、上传软件到/data目录,解压并重命名为flume(所有节点)
tar -xvf apache-flume-1.9.0-bin.tar.gz
mv apache-flume-1.9.0-bin flume
10、建立软连接(所有节点)
ln -s /data/flume/ /usr/local/flume
11、配置Collector节点(flumec01.wtown.com,flumec02.wtown.com节点)
vi /data/flume/conf/collector-to-kafka.conf
添加如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 58001
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.keep-alive = 60
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = kafka-test
a1.sinks.k1.kafka.bootstrap.servers = kafka01.wtown.com:9092,kafka02.wtown.com:9092,kafka03.wtown.com:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 100
a1.sinks.k1.kafka.producer.compression.type = snappy
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
12、配置agent节点
flumea01.wtown.com,flumea02.wtown.com节点:
vi /data/flume/conf/agent-to-collector.conf
添加如下:
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sinkgroups = g1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 57001
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = flumec01.wtown.com
a1.sinks.k1.port = 58001
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = flumec02.wtown.com
a1.sinks.k2.port = 58001
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1500000
a1.channels.c1.transactionCapacity = 10000
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.sources.r1.channels = c1
a1.sinks.k2.channel = c1
a1.sinks.k1.channel = c1
flumea03.wtown.com,flumea04.wtown.com节点:
vi /data/flume/conf/agent-to-collector.conf
添加如下:
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sinkgroups = g1
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 57001
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = flumec01.wtown.com
a1.sinks.k1.port = 58001
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = flumec02.wtown.com
a1.sinks.k2.port = 58001
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1500000
a1.channels.c1.transactionCapacity = 10000
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 1
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.sources.r1.channels = c1
a1.sinks.k2.channel = c1
a1.sinks.k1.channel = c1
13、flume配置JDK(所有节点)
cp /data/flume/conf/flume-env.sh.template /data/flume/conf/flume-env.sh
添加如下:
export JAVA_HOME=/usr/local/java
14、配置flume环境变量(所有节点)
vi /etc/profile
添加如下:
export FLUME_HOME=/data/flume
export PATH=$PATH:$FLUME_HOME/bin
重新加载环境变量:
source /etc/profile
15、启动flume collector(flumec01.wtown.com,flumec02.wtown.com节点)
nohup flume-ng agent --conf $FLUME_HOME/conf/ --conf-file $FLUME_HOME/conf/collector-to-kafka.conf --name a1 -Dflume.root.logger=INFO,console >> /dev/null 2>&1 &
16、启动flume agent(flumea01、flumea02、flumea03、flumea04.wtown.com节点)
nohup flume-ng agent --conf $FLUME_HOME/conf/ --conf-file $FLUME_HOME/conf/agent-to-collector.conf --name a1 -Dflume.root.logger=INFO,console >> /dev/null 2>&1 &
#####################以下为要采集数据的数据源客户端配置##################### 17、数据源agent配置示例
source-to-agent.conf内如如下:
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 k3 k4
a1.sinkgroups = g1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /usr/local/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /usr/local/flume/test.log
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = flumea01.wtown.com
a1.sinks.k1.port = 57001
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = flumea02.wtown.com
a1.sinks.k2.port = 57001
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = flumea03.wtown.com
a1.sinks.k3.port = 57001
a1.sinks.k4.type = avro
a1.sinks.k4.hostname = flumea04.wtown.com
a1.sinks.k4.port = 57001
a1.sinkgroups.g1.sinks = k1 k2 k3 k4
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector.maxTimeOut = 10000
a1.sinkgroups.g1.processor.selector = random
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1
注意:多source配置如下 #set source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /usr/local/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /usr/local/flume/1.log
a1.sources.r1.filegroups.f2 = /usr/local/flume/2.txt
18、测试
在source agent 端建立上面配置文件中的log文件 重复输入:
echo 11111 >> test.log
在kafka集群上开启消费者进程:
kafka-console-consumer.sh --bootstrap-server kafka01.wtown.com:9092,kafka02.wtown.com:9092,kafka03.wtown.com:9092 --topic kafka-test
数据已经采集到kafka
|