Flume(日志收集, 聚合, 传输的工具)
一, Flume 概述
1. Flume 定义
[定义]
- Flume 是 Cloudera 提供的一个高可用(available)的, 高可靠(reliable), 分布式(distribute)的
海量日志采集(collecting), 聚合(aggregating)和传输(moving)的系统. - Flume 基于流式架构, 灵活简单.
[Why Flume?]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LfMx27dN-1629794979099)(2021-08-10-14-46-44.png)]
2. Flume 基础架构
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1lCcWfc3-1629794979102)(2021-08-10-14-47-28.png)]
2.1 Agent
- Agent是一个JVM进程, 它
以事件的形式将数据从源头送至目的地 . Agent由 Source , Channel , Sink 三部分组成.
2.2 Source(接收并处理数据)
- Source 是负责
接收数据到Flume Agent 的组件, Source组件可以处理各种类型, 各种格式的日志数据 , 包括avro、thrift、exec 、jms、spooling directory 、netcat、taildir 、sequence generator、syslog、http、legacy。
2.3 Channel(缓冲区, 平衡读写速度)
- Channel是位于Source和Sink之间的缓冲区. 因此, Channel允许Source和Sink运作在不同的速率上.
- Channel是线程安全的, 可以同时处理几个Source 的写入操作和几个Sink的读取操作.
- Flume 自带两种Channel: Memory Channl, File Channel 和 Kafka Channel
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0vN0hC9P-1629794979104)(2021-08-10-14-56-44.png)]
2.4 Sink(批量写出数据)
- Sink 不断轮询Channel中的时间并并批量的移除它们, 并将这些时间批量写出到存储或索引系统, 或者被发送到另一个Flume Agent;
- Sink组件的目的地包括hdfs, file,Hbase,logger, 自定义等等.
2.5 Event
- Event 是
Flume数据传输的基本单元 , 以Event的形式将数据从源头送至目的地. Event由Header和Body两部分组成, Header用来存放该event的一些属性, 为K-V结构. Body用来存放该条数据, 形式为字节数组.
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QuV4zVz9-1629794979107)(2021-08-10-15-05-16.png)]
二, Flume 安装部署和入门案例实操
2.1 安装部署
- 下载Flume-1.7.0
- 解压到/opt/module, 并改名为flume-1.7.0
tar -zxvf apache-flume-1.7.0.tar.gz -C /opt/module/
mv apache-flume-1.7.0 flume-1.7.0
- 将 flume-1.7.0/conf 下的 flume-env.sh.template 文件修改为 flume-env.sh,并配置 flume-env.sh 文件
mv flume-env.sh.template flume-env.sh
#修改
vi flume-env.sh
#添加jdk依赖
export JAVA_HOME=/opt/module/jdk1.8
2.2 案例实操
2.2.1 监控端口数据官方案例
[需求和需求分析]
- 使用flume监听一个端口, 收集该端口数据, 并打印到控制台;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RSrXe3RR-1629794979109)(2021-08-10-16-54-55.png)]
[实现步骤]
- 安装netcat;
sudo yum install -y nc
- 判断44444端口是否被占用
sudo netstat -tunlp | grep 44444
- 创建Flume Agent 配置文件
flume-netcat-logger.conf
在flume安装目录下创建job文件夹并进入job文件夹, 然后在job文件夹下创建Flume Agent 配置文件 flume-netcat-logger.conf(配置文件是自定义命名, 按照给定模板修改.)
a1.sources = r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=44444
a1.sinks.k1.type = logger
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
对配置文件模板的说明: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3nFRa5yj-1629794979684)(2021-08-11-15-48-34.png)]
注意: 在绑定组件时, a1.sinks.k1.channel中 channel不带s, 这是因为虽然我们可以有多个sink从channel中读取数据并写出, 但是其中的每一个sink只能从唯一的一个channel中读取数据(event).
- 开启flume去监听端口
记住: flume-ng agent -n 自定义agent名称 -c 环境配置文件 -f 用户配置文件
flume-ng agent -n -c -f
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ry3uujPu-1629794979110)(2021-08-12-15-32-04.png)]
bin/flume-ng agent
--conf conf/(配置文件的目录)
--name a1(agent的名称)
--conf-file job/flume-netcat-logger.conf(配置文件的名)
-Dflume.root.logger=INFO,console(日志级别为INFO, 打印到控制台)
参数 | 说明 |
---|
–name/-n a1 | 表示给agent起名为a1 | –conf/-c conf/ | 表示配置文件存储在conf/目录 | –conf-file/-f job/flume-netcat-logger.conf | flume本次启动读取的配置文件是在job文件夹下的flume-netcat-logger.conf | -Dflume.root.logger=INFO,console | -D表示flume运行时动态修改flume.root.logger参数属性值, 并将控制日志打印级别设置为INFO级别. 日志级别包括: log,info,warn,error. |
简便写法: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qhZ5x0nG-1629794979111)(2021-08-12-15-43-47.png)]
- 使用netcat在新的终端向本地主机的44444端口发送信息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-c95BQrng-1629794979112)(2021-08-12-15-33-47.png)] 6. 在flume监听终端查看监听到的信息 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4PQAcf7Z-1629794979113)(2021-08-12-15-34-07.png)]
2.2.2 实时监控单个文件新内容
[案例需求与分析]
案例需求: 实时监控Hive日志, 并上传到HDFS中. [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GjucuhK4-1629794979114)(2021-08-16-16-09-35.png)]
[案例实操]
- Flume要想将数据输入到HDFS, 需要往
/opt/module/flume-1.7.0/lib 放入HADOOP相关的jar包. [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4df7l498-1629794979115)(2021-08-16-16-24-47.png)]
2.创建自定义配置文件flume-file-hdfs.conf
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7WfOaOfo-1629794979116)(2021-08-16-16-30-54.png)]
a2.sources = r2
a2.sinks = k2
a2.channels = c2
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://bigdata01:8020/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 100
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 30
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
tail -f 和 tail -F的区别和使用方法
tail -f , 根据文件描述符进行追踪, 当文件改名或被删除, 追踪会停止;tail -F , 根据文件名进行追踪, 并保持重试, 即该文件被删除或改名后, 如果再次创建相同的文件名, 会继续进行追踪. 比如: 在hive的logs目录中, 当天的日志会记录在hive.log中, 之前每一天的hive.log会被自动命名为以那天日期为文件名的日志, 同时新建hive.log继续记录今天的日志.我们要追踪当天最新日志, 只需使用 tail -F hive.log 即可.
- 启动flume, 然后操作hive
bin/flume-ng agent -n a2 -c conf/ -f job/flume-file-hdfs.conf -Dflume.root.logger=INFO, console
show databases;
- 查看HDFS中是否生成了指定格式的文件.
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Z6DOImEh-1629794979116)(2021-08-23-21-34-24.png)]
2.2.3 实时监控目录下新文件
[案例需求与分析]
案例需求: 使用Flume监听整个目录的文件创建情况, 并上传至HDFS [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-65SOxzNg-1629794979118)(2021-08-24-06-06-12.png)]
[案例实操]
- 在之前创建的job目录中, 创建并编写配置文件
flume-dir-hdfs.conf
a3.sources=r3
a3.sinks = k3
a3.channels = c3
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume-1.7.0/upload
a3.sources.r3.fileSuffix=.COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata01:8020/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 30
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
a3.sources.r3.channels =c3
a3.sinks.k3.channel = c3
- 启动flume
- 启动Flume(bin/flume-ng agent -n a3 -c conf/ -f job/flume-dir-hdfs.conf -Dflume.root.logger=INFO,console), 成功启动标识如下图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7H0YfIpP-1629794979119)(2021-08-24-06-30-59.png)]
- 在配置文件中指定的目录下创建新的文件, 并查看hdfs和flume控制台
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WBaMKlGn-1629794979119)(2021-08-24-06-34-46.png)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-m8BqQy3v-1629794979120)(2021-08-24-07-55-40.png)]
注意:
- 当监控目录有新文件出现时, Flume会把新文件上传到HDFS中去,
在监控目录中, 上传完成的文件会以.COMPLETED后缀结尾(在配置文件中 a1.sources.r1.fileSuffix=.COMPLETED参数中定义) . 后续的监控中, Flume会忽略这些COMPLETED后缀的文件, 即便这些文件的内容发生改变. - 在Flume把新文件上传到HDFS的过程中, 会
先在HDFS指定的目录中生成文件名.tmp的临时文件(由hdfs.inUseSuffix参数控制) , 待上传完成后去掉.tmp前缀. - Flume会
对监控的文件夹, 每500毫秒扫描一次文件变动 . 不要在监控目录中创建重名文件, 否则Flume会报错并且无法继续使用 !
2.2.4 实时追踪目录下的多个追踪文件
Source的三种主要type的区别:
type | 作用 |
---|
exec source | 适用于监控一个实时追加的文件, 但不能保证数据不丢失 | spooldir source | 能保证数据不丢失, 且能够实现断点续传, 但延迟较高, 不能实时监控 | taildir source | 即可以实现断点续传, 又可以保证数据不丢失, 还能够进行实时监控. |
- ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。 例如:通过tail -F去获取Nginx的访问日志,如果Flume挂掉,Nginx访问日志继续导入到日志文件中,那么在Flume挂掉的这段时间中,新产生的日志Flume是无法获取到的.
- 为了更好的可靠性保证,可以考虑使用Spooling Directory Source,拿实时获取Nginx访问日志来说,Spooling Directory Source虽然做不到实时,但是也可以通过日志文件的切分,做到准实时。 然而Flume只能监控单一目录(子目录无法监控)下的新文件, 无法监控老文件中的新增数据.
- 而taildir source 就更厉害了, 它能够递归的监控指定目录下的所有文件实时的新增数据.
[案例需求和分析]
案例需求: 使用Flume监听整个目录的实时追加文件, 并上传到HDFS.
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zzqGjXgY-1629794979121)(2021-08-24-09-08-25.png)]
[案例实操]
- 在之前创建的job目录中, 创建并编写配置文件
flume-taildir-hdfs.conf
#1. 给agent的各个组件命名
a3.sources=r3
a3.sinks = k3
a3.channels = c3
#2. 配置 source (spooldir source)
a3.sources.r3.type = TAILDIR
### taildir的positionsFile, 自动创建json文件, 定期记录flume读取文件的位置.
a3.sources.r3.positionFile = /opt/module/flume-1.7.0/tail_dir.json
a3.sources.r3.filegroups = f1
a3.sources.r3.filegroups.f1 = /opt/module/flume-1.7.0/files/.*.txt
# 3. 配置sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata01:8020/tailDir/upload/%Y%m%d/%H
### 上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
### 是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
### 多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
### 重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
### 是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
### 积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#### 设置文件类型, 可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
### 多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
### 设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
### 文件的滚动与Event的数量无关
a2.sinks.k2.hdfs.rollCount = 0
# 4. 配置channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# 5. 把sources, sinks 分别与channel绑定
a3.sources.r3.channels =c3
a3.sinks.k3.channel = c3
- 在监控的目录中建立a.txt 和 b.txt两个文本文件, 然后启动flume, 在启动后往两个文本文件中追加内容.
- 创建监控文件
- 启动flume:
flume-ng agent -n a3 -c conf/ -f job/flume-taildir-hdfs.conf -Dflume.root.logger=INFO,console - 往文本文件中追加内容, 并观察输出信息:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cwSarN6U-1629794979122)(2021-08-24-14-26-27.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nT8IfCZL-1629794979123)(2021-08-24-14-27-55.png)]
待补充: 常用的source, sink, channel配置写法
|