一 Flume专题之组件及架构介绍
1、Flume概述
1.1、Flume定义
? Flume是一种分布式的、高可靠的和高可用的服务,用于有效地收集、聚合和移动大量日志数据框架。
1.2、Flume特性
(1)支持自定义Source
? flume 支持在日志系统中定制各类数据发送方,用于收集数据。
(2)支出数据简单处理
? flume支持对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。
(3)事件基本数据单位
? flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个) Channel 中。可以把 Channel 看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。
(4)高可靠性
? 当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:
- end-to-end:收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。
- Store on failure:这也是scribe采用的策略,当数据接收方crash崩溃时,将数据写到本地,待恢复后,继续发送。
- Best effort:数据发送到接收方后,不会进行确认。
1.3、Flume使用场景
? 实时监控读取服务器产生到本地磁盘的数据,将数据写入到HDFS、Kafka等下游处理流程中去。
- 1、webserver产生日志;
- 2、日志通过flume封装的Agent进行采集;
- source:接受数据源
- channel:进行数据缓存的管道;
- sink:连接目的地
- 3、将数据输出到HDFS/HBASE/KAFKA上。
2、Flume架构组成
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yFHTtXjw-1652500698450)
2.1、Event
? Event是Flume的数据传输最基本单位,flume在数据传输过程中是使用Event将数据封装起来进行传输的。
Event: { headers:{} body: 68 61 64 6F 6F 70 hadoop }
2.2、Agent
? Agent是数据传输(接受-缓存-发送)形式,Agent是一个JVM进程,flume 以 Agent 为最小的独立运行单位,包含最基本的三个组件:Source、Chanel、Sink。
2.3、Source
? Source是负责接收数据到Flume Agent的组件,Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、spooling directory、netcat、tailder…以及Custom Source。
2.4、Channel
? Channel是为了Source和Sink之间的缓冲区。Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理多个Source的写入操作、多个Sink的读取操作。
- Flume Channel官方提供:memory、jdbc、kafka、file…以及Custom Channel。
2.5、Sink
? Sink不断轮询Channel中的事件且批量移除它们,并将这些事件批量写入到存储或者索引系统、或者被发送到到另一个Flume Agent。
- Sink组件目的地:hdfs、loggeer、avro、thrift、ipc、file、hbase、solr、自定义Custom Sink
2.6、Flume NG架构
(1)每个web server产生日志文件
(2)每一台web server所对应的节点上开启一个进程,分别对应Agent1,Agent2、Agent3;
? Avro Sources将数据序列化,写入到channel,之后将数据送往Avro Sink,以Avro sink的形式将数据送往下游进行处理。
(3)下游的avro Source接受上游的Avro Sink的数据,在经过下游的Flume汇总到HDFS中。
3、Flume安装配置
3.1、下载
https://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
3.2、上传并解压
tar -xzvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/flume
3.3、配置环境变量
sudo vim /etc/profile.d/my_env.sh
export FLUME_HOME=/opt/module/flume/apache-flume-1.9.0-bin
export PATH=$PATH:$FLUME_HOME/bin
source /etc/profile.d/my_env.sh
flume-ng version
3.4、删除冗余Jar包
rm /lib/guava-11.0.2.jar
二 Flume专题之企业常用组件及案例演示
1、入门案例之打印端口数据
1.1、案例需求
? 使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。
1.2、需求分析
(1)通过netcat工具向本机的4444端口发送数据;
(2)Flume监控本机的4444端口通过Flume的Source端读取数据;
(3)Flume将获取数据通过Sink端写到控制台。
1.3、配置信息
(1)安装 netcat 工具
sudo yum install -y nc
(2)判断 44444 端口是否被占用
sudo netstat -nlp | grep 44444
(3)在 flume 目录下创建 job 文件夹并进入 job 文件夹。
mkdir job
cd job/
(4)在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf。
vim flume-netcat-logger.conf
(6)在 flume-netcat-logger.conf 文件中添加如下内容。
添加内容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(6)开启Flume监听端口
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
- 参数说明:
- –conf/-c:表示配置文件存储在 conf/目录
- –name/-n:表示给 agent 起名为 a1
- –conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf 文件。
- -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error。
(7)使用netcat工具向本机的4444端口发送内容
nc localhost 44444
(8)Flume监听页面结果
2、入门案例之实时监控单个追加文件
2.1、案例需求
? 实时监控Hive日志,并上传到HDFS中
2.2、需求分析
(1)创建符合条件的flume配置文件;
(2)执行配置文件,开启监控
(3)开启Hive,生成日志
(4)查看HDFS上数据
- Hive实时更新日志路径:/opt/module/hive/logs/hive.log
2.3、实现步骤
(1)配置文件编写
? 要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive 日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行 Linux 命令来读取文件。
vim flume-file-hdfs.conf
a2.sources = r2
a2.sinks = k2
a2.channels = c2
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 100
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 60
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
(2)HDFS创建Flume监控目录
hadoop fs -mkdir -p /flume
(3)运行Flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志
bin/hive
(5)查看HDFS上结果
3、实时监控目录下多个新文件
3.1、案例需求
? 使用Flume监听整个目录的文件,并上传至HDFS
3.2、需求分析
(1)创建符合条件的flume配置文件
(2)执行配置文件,开启监控
(3)向upload目录中添加文件
(4)查看HDFS上数据
(5)查看/opt/module/flume/upload目录上传的文件是否已经标记为.COMPLETED结尾;.tmp后缀结尾文件没有上传。
3.3、实现步骤
(1)创建配置文件
vim flume-dir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
- 说明:在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文 件;上传完成的文件会以.COMPLETED 结尾;被监控文件夹每 500 毫秒扫描一次文件变动。
(3)创建目录并添加文件
mkdir upload
touch atguigu.txt
touch atguigu.tmp
touch atguigu.log
(4)查看HDFS上的数据
4、实时监控目录下的多个追加文件
- 重点:关于Source类型
- Exec source 适用于监控一个实时追加的文件,不能实现断点续传;
- Spooldir Source 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;
- Taildir Source 适合用于监听多个实时追加的文件,并且能够实现断点续传。
4.1、案例需求
? 使用Flume监听整个目录的实时追加文件,并上传至HDFS
4.2、需求分析
4.3、实现步骤
(1)创建配置文件
vim flume-taildir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path =
hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
(3)向files文件夹中追加文件
mkdir files
cd files/
echo hello >> file1.txt
echo atguigu >> file2.txt
(4)查看HDFS上的数据
第四章 Flume专题之进阶概念&生产环境案例
1、进阶基础概念
1.1、Flume流程分析
(1)Source接收数据后将数据封装成Event对象。
(2)Event数据对象经过Inteceptor链对数据进行改造
(3)根据Channel selector确定Event要发送给那个Channel
(4)Channel通过SinkProcessor分发给对应Sink
1.2、Flume的数据传递过程事务
(1)推送事务流程
- doPut:把批数据写入到临时缓冲区 putList doCommit,检查Channel容量是否充足;
- 容量充足:把putList里的数据发送到Channel doRollBack中;
- 容量不足:把数据回滚到PutList中
(2)拉取事务流程
- doTake:把数据读取到临时缓冲区takeList,检查数据是否发送成功。
- 成功:把Event从takeList中移除
- 失败:将TakeList中的数据回滚到Channel中
2、Flume拓扑结构
2.1、简单串联
? 简单串联就是将多个Flume顺序串联起来,从最初的Source开始到最终的Sink传送的目的存储系统。
- 注意事项:此模式不建议桥接过多的flume数量,flume数量过多不仅会影响传输效率,而且一旦传输过程中某个节点Flume宕机,会影响整个传输系统。
2.2、复制和多路复用
? Flume支持将事件流向一个或多个目的地,即1对多模式。
- 注意事项:这种模式可以将相同数据复制到多个Channel中,或者将不同数据分发到不同Channel中,sink可以选择传送不同目的地。
2.3、负载均衡和故障转移
(1)负载均衡
? 将多个sink逻辑上分为一个sink组,sink组配合不同的SinkProcessor将数据相对均匀的分发到指定目录或者其他agent实例。
a1.sinkgroups.g1.processor.type =load_balance
a1.sinkgroups.g1.processor.backoff=true
(2)故障转移
? 有主备agent,主agent负责数据的采集、传输、落地,备用agent一直处于监听状态,一旦主agent宕机,备用agent启动,进行主agent的工作,直到主agent恢复。
2.4、聚合
? 每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析。
3、生产实战之多路复用(多channel)
3.1、案例需求
? nginx产生的日志数据需要供多个部门使用,如何处理一份数据发送两个系统来使用?
3.2、需求分析
3.3、具体实现
(1)创建目录
mkdir group1
cd group1
(2)创建flume1配置文件
- 作用:配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir。
vim flume-file-flume.conf
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.selector.type = replicating
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
(3)创建Flume2的配置文件
- 作用:配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。
vim flume-flume-hdfs.conf
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume2/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = flume2-
a2.sinks.k1.hdfs.round = true
a2.sinks.k1.hdfs.roundValue = 1
a2.sinks.k1.hdfs.roundUnit = hour
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.batchSize = 100
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.rollInterval = 30
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
(4)创建flume3的配置文件
- 作用:配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。
- 注意事项:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目 录。
vim flume-flume-dir.conf
a3.sources = r1
a3.sinks = k1
a3.channels = c2
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
(5)分别执行配置文件
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf
(5)启动Hive后检查文件
4、生产实战之故障转移(多Sink)
4.1、案例需求
? 使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用FailoverSinkProcessor,实现故障转移功能。
4.2、需求分析
4.3、具体实现
(1)创建目录
mkdir group2
cd group2
(2)创建flume1配置文件
- 作用:配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给 flume-flume-console1 和 flume-flume-console2。
vim flume-netcat-flume.conf
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
(3)创建Flume2配置文件
- 作用:配置上级 Flume 输出的 Source,输出是到本地控制台。
vim flume-flume-console1.conf
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
a2.sinks.k1.type = logger
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
(4)创建flume3配置文件
- 作用:配置上级 Flume 输出的 Source,输出是到本地控制台。
vim flume-flume-console2.conf
a3.sources = r1
a3.sinks = k1
a3.channels = c2
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
a3.sinks.k1.type = logger
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
(5)分别执行相关配置文件
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf
(6)使用netcat工具向本机的44444端口发送内容
nc localhost 44444
netstat -anp |grep [端口号]
lsof -i:[端口号]
kill -9 [进程PID]
(7)正常工作
(8)故障转移
- 作用:在flume-console2出现故障后,数据发送到flume-console1.
kill -9 pid
- flume-console1的结果输出打印
5、生产实战之聚合操作
5.1、案例需求
? 需求将来自不同服务器的日志数据采集回来存储在HDFS聚集到一起进行处理
5.2、需求分析
5.3、具体实现
待后续
三 Flume专题之常见面试题
1、上游Flume进程采集到数据后,干嘛不直接送到目的地?
(1)上游的Flume进程数十分庞大,直接送往目的地,目的地系统可能承载不了高并发的访问压力宕机;
(2)上游的Flume将采集完的数据送往下游Flume后,下游Flume进程将数据送往目的地;
(3)下游Flume数可人为控制,一般都有备份。
2、Flume结构简述
(1)Source:默认有的Avro(监听端口),thrift、Exec(执行linux命令)、JMS、SpoolingDirctory(监听目录)、第三方插件kafka;
(2)拦截器:所有events,增加头,类似json‘格式里的headers:{“keys”:“values”}:时间戳(头部插入时间戳),主机(头部插入主机名和IP),静态(头部插入指定KV),正则过滤(留下符合条件的)
(3)Channel:包括Memory、File、JDBC、Kafka等类型;
(4)拦截器:自定义拦截器,同上
(5)Sink:包括HDFS,hive,Avro,Hbase,kafka等类型
3、Flume HA机制(高可用机制)
(1)负载均衡
? flumeNG通过设置sinkgroups将多个沉潜节点分到一组,然后设置该组启用负载均衡,沉潜时会自动选择节点,如果节点宕机可选择其他节点:进程在后台运行时轮询依次查询,每个sink送出去的压力基本保持一致。
(2)事务机制
? 基于事务传输event(批量传输),使用两个独立的事务分别处理source到channel和channel到sink之间,失败时会将所有数据都回滚到source或channel进行重试。
- Put事务:source到channel之间
- Take事务:channel到sink之间
4、Flume的数据丢失&重复场景
(1)数据丢失场景
- 是Channel 采用 memoryChannel,agent 宕机导致数据 丢失,或者 Channel 存储数据已满,导致 Source 不再写入,未写入的数据丢失
(2)数据重复
- 数据已经由Sink发出,但是没有接收到响应,Sink会再次发送数据,导致数据重复
? 总的来说,Flume 不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由 Sink 发出, 但是没有接收到响应,Sink 会再次发送数据,此时可能会导致数据的重复
5、Flume的channel选择
- 注意事项:Channel被设计为event中转临时缓冲区,存储source收集并且没有被sink读取的Event
(1)memory:读写速度快,但是存储数据量小,Flume进程挂掉、服务器宕机或者重启都会导致数据丢失;
(2)File:落地到磁盘,如果sink已经提交完成的事务,则可以删除file;
(3)Kafka
6、Flume拦截器简述
(1)作用
- Source将event写入到channel之前可以使用拦截器对event进行各种形式的处理
- source和channel之间可以有多个拦截器,可以用不同规则进行定制拦截器
(2)拦截器类型:ETL拦截器、区分类型拦截器
(3)自定义拦截器步骤
- Ⅰ、实现接口intercept
- Ⅱ、重写四个方法(初始化,处理单个event,处理多个event-调用处理单个event方法,close方法资源释放-flume都会处于运行状态)
- Ⅲ、实现静态内部类builder,定制相关参数
- Ⅳ、将自定义拦截器打包,上传到flume的lib目录下
- Ⅴ、修改flume的核心配置文件
7、Flume故障方案
? 负载均衡和故障转移方案:配置sink组,同一个人sink组内有多个子sink,不同sink之间可以配置成负载均衡或故障转移。
8、Flume优化
(1)Flume内存配置为4G:实际开发中,再flume-env.sh中设置JVMheap为4G或更高
(2)FileChannel优化:DataDirs指向多个路径,每个路径对应不同硬盘,增大Flume的吞吐量:Checkpoint和backCheckpointDir也尽量配置在不同硬盘对应目录,保证checkpoint坏掉后,可快速恢复
(3)Sink优化:HDFSSink小文件处理
- 避免HDFS产生大量小文件,设置相关参数达到效果如下
- ①tmp文件达到128M时会滚动生成正式文件;
- ②tmp文件创建超10秒会滚动生成文件。
|