1.简介
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume具备3大特性:
- 1.有一个简单,灵活,基于流的数据流结构
- 2.具有负载均衡机制和故障转移机制,能保证数据采集的稳定性和可靠性。
- 3.具有一个简单可扩展的数据模型(Source,Channel和Sink)
重上图可以看到,Agent是由Source,Channel和Sink这三大组件组成的。
- Source: 数据源(类似于Logstash的Input)。
- Channel: 临时存储数据的管道。
- Sink: 目的地(类似于Logstash的Output)。
1.1.Source组件
通过Source可以指定读取哪里的数据,将数据传递给后面的Channel组件,以下为常用的数据源。
- Exec Source: 用于文件监控,可以实时监控文件中的新增内容,类似于tail -F的效果。
- NetCat TCP/UDP Source: 采集指定端口的数据,可以读取流经端口的每一行数据。
- Spooling Directory Source: 采集文件夹中新增的文件。
- Kafaka Source: 从Kafaka消息队列中采集数据。
1.2.Channel组件
接受Source发送的数据,可以把Channel理解为一个临时存储数据的管道,以下为常用的渠道类型。
- File Channel: 使用文件来作为数据的存储介质。优点是数据不会丢失,缺点是相对内存效率有的慢。
- Memory Channel: 使用内存作为数据的存储接受,优点效率高,缺点会丢失数据,会存在内存不足的情况。
- Spillable Memory Channel: 使用内存和文件作为存储介质,即内存足够把数据存内存中,不足的时候再写入到文件中。
1.3.Sink组件
Sink从Channel中读取数据并将其存储到指定目的地。以下为常见的类型:
- Logger Sink: 将数据作为日志处理,可以将其选择打印到控制台或写到文件中。
- HDFS Sink: 将数据传输到HDFS中,主要针对离线计算场景。
- Kafka Sink: 将数据传输到Kafka消息队列中,主要针对实时计算场景。
2.安装Flume
下载地址 本文下载的apache-flume-1.10.1-bin.tar.gz文件,对应的版本号是1.10.1。 上传到服务器/home/soft目录下然后解压文件。
cd /home/soft
tar -zxvf apache-flume-1.10.1-bin.tar.gz
修改环境变量配置文件名称
cd /home/soft/apache-flume-1.10.1-bin/conf
mv flume-env.sh.template flume-env.sh
3.采集数据测试
创建一个采集端口数据的配置文件
vi /home/soft/apache-flume-1.10.1-bin/conf/netcat.conf
文件内容如下
# 定义Agent内部3大组件的名称
a1.sources=r1
a1.sinks=k1
a1.channels=c1
# 配置Souce组件
#使用采集端口数据
a1.sources.r1.type=netcat
#指定当前机器Ip地址或主机名,0.0.0.0表示通用的机器ip
a1.sources.r1.bind=0.0.0.0
#指定一个未被使用的端口,用于监控当前端口的数据
a1.sources.r1.port=6666
#配置Channel组件
#使用内存存储数据
a1.channels.c1.type=memory
#event条数
a1.channels.c1.capacity=1000
#flume事务控制所需要的缓存容量600条event
a1.channels.c1.transactionCapacity=100
#配置Sink组件
#数据输出到日志中
a1.sinks.k1.type=logger
#将三大组件绑定到一起
#为名称为r1的数据源绑定输出渠道
a1.sources.r1.channels=c1
#为名称为k1的输出源绑定输入渠道
a1.sinks.k1.channel=c1
启动Agent(需要配置JAVA_HOME才能启动)
cd /home/soft/apache-flume-1.10.1-bin
bin/flume-ng agent -n a1 -c conf -f conf/netcat.conf -Dflume.root.logger=INFO,console
参数 | 说明 |
---|
agent | 启动一个Flume的Agent代理,(参数后面的a1对应着上面配置文件中的a1)。 | -n | 指定Agent的名称(等同于 --name) | -c | 指定Flume配置文件根目录(等同于 --conf) | -f | 指定Agent配置文件的位置。(等同于 --conf-file) |
控制台输出如下 启动另外一个终端查看实时日志
tail -f /home/soft/apache-flume-1.10.1-bin/flume.log
可以看到已经创建了一个Socket监听6666端口数据。
用telnet测试
telnet 127.0.0.1 6666
连接成功后输入helloword然后回车 查看/home/soft/apache-flume-1.10.1-bin/flume.log文件
cat /home/soft/apache-flume-1.10.1-bin/flume.log
可以看到日志里打印了helloword信息
4.日志汇总到HDFS中
主机名 | IP地址 |
---|
A | 192.168.239.130 | B | 192.168.239.129 | C | 192.168.239.128 |
需求: 把A和B两台机器的日志数据汇总到主机C上,通过主机C将数据输出到HDFS的指定目录下。
- 机器A和B因为要读取文件中数据,所以使用基于文件的Exec Source,Channel使用基于内存的Momory Channel,发送数据使用了Avro Sink。
- 机器C的数据使用Avro Source(Avro Sink和Avro Source是一组工具配合使用)
4.1.日志收集服务配置
编辑机器A和B共用的上传日志的配置file-to-avro.conf
- a1.sinks.k1.hostname=机器C的ip地址
- tail -F /home/soft/apache-flume-1.10.1-bin/logs/access.log 读取日志文件里的实时数据
vi /home/soft/apache-flume-1.10.1-bin/conf/file-to-avro.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /home/soft/apache-flume-1.10.1-bin/logs/access.log
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.239.128
a1.sinks.k1.port=45454
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
生成测试数据的脚本
vi /home/soft/apache-flume-1.10.1-bin/logs/generate_log.sh
while [ "1" = "1" ]
do
curr_time=`date +%s`
name=`hostname`
echo ${name}_${curr_time} >>/home/soft/apache-flume-1.10.1-bin/logs/access.log
sleep 100
done
保存退出后执行
chmod 755 generate_log.sh
./generate_log.sh
运行完可以查到/home/soft/apache-flume-1.10.1-bin/logs/access.log日志有测试数据了。
4.2.日志汇总服务配置
编辑机器C的接受数据和上传到HDFS的配置file-to-hdfs.conf
- 需要把a1.sinks.k1.hdfs.path: a1.sinks.k1.hdfs.path中的127.0.0.1换成hdfs部署机器ip地址
- 已在机器C上面安装好hadoop
vi /home/soft/apache-flume-1.10.1-bin/conf/file-to-hdfs.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=45454
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path = hdfs://127.0.0.1:9000/access/%y-%m-%d
a1.sinks.k1.hdfs.filePrefix = access
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
4.3.运行服务测试
使用scp复制flume安装包到机器A和B上面,A和B机器的/home/soft目录必须先创建好
scp -rq /home/soft/apache-flume-1.10.1-bin 192.168.239.129:/home/soft/
scp -rq /home/soft/apache-flume-1.10.1-bin 192.168.239.130:/home/soft/
在机器A和B上面运行flume服务
cd /home/soft/apache-flume-1.10.1-bin
bin/flume-ng agent -n a1 -c conf -f conf/file-to-avro.conf -Dflume.root.logger=INFO,console
A和B都需要开启另外一个终端执行生产测试数据的日志的脚本
/home/soft/apache-flume-1.10.1-bin/logs/generate_log.sh
在机器C上面运行flume服务
cd /home/soft/apache-flume-1.10.1-bin
bin/flume-ng agent -n a1 -c conf -f conf/file-to-hdfs.conf -Dflume.root.logger=INFO,console
下面报错是flume采集数据上传到hdfs报的错误, 这是因为flume内依赖的guava.jar和hadoop内的版本不一致造成的。 查看hadoop安装目录下share/hadoop/common/lib内guava.jar版本 查看flume安装目录下lib内guava.jar的版本 如果两者不一致,删除版本低的,并拷贝高版本过去。
ctrl退出机器C的flume服务,重新执行机器C启动flume的命令。
查看hdfs文件系统里是否有flume服务上传日志文件
cd /home/soft/hadoop-3.2.4/bin
./hdfs dfs -ls -R /access
查看日志文件内容
./hdfs dfs -cat /access/22-09-20/access.1663687662824
可以看到日志里面能看到两台主机生产日志合并后的数据
|