概念
Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单。
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
为什么选用Flume
Flume最主要的作用就是:实时读取服务器本地磁盘的数据,将数据写入到HDFS
Flume 基础架构
Flume 架构中的组件:Agent,Source,Sink, Channel,Event
Agent
Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。
Agent 主要有 3 个部分组成,Source、Channel、Sink。
Source
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat(监控端口)、sequence generator、syslog、http、legacy。
Sink
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。
Channel
Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作。
Flume 自带两种 Channel:Memory Channel 和 File Channel 以及 Kafka Channel。
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
Event
传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组。
快速入门
Flume 安装部署
安装地址
-
Flume 官网地址 👉 http://flume.apache.org/ -
文档查看地址 👉 http://flume.apache.org/FlumeUserGuide.html -
下载地址 👉 http://archive.apache.org/dist/flume/
安装部署
-
将 apache-flume-1.7.0-bin.tar.gz 上传到 linux 的/opt/software 目录下 -
解压 apache-flume-1.7.0-bin.tar.gz 到/opt/module/目录下 tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
-
修改 apache-flume-1.7.0-bin 的名称为 flume mv apache-flume-1.7.0-bin flume
-
将 flume/conf 下的 flume-env.sh.template 文件修改为 flume-env.sh,并配置 flume-env.sh 文件 mv flume-env.sh.template flume-env.sh
export JAVA_HOME=/opt/java/jdk1.8.0_144
JAVA_HOME可以通过echo $JAVA_HOME命令查看,没有的请自行安装。
Flume 入门案例
监控端口数据官方案例
-
案例需求 使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。 -
需求分析 -
实现步骤
-
安装 netcat 工具 sudo yum install -y nc
-
判断 44444 端口是否被占用 sudo netstat -tunlp | grep 44444
-
测试 nc -lk 44444
nc hadoop2 44444
-
创建 Flume Agent 配置文件 flume-netcat-logger.conf 在 flume 目录下创建 job 文件夹并进入 job 文件夹 在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf。 在 flume-netcat-logger.conf 文件中添加如下内容:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
解析说明 多flow格式
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
-
先开启 flume 监听端口 第一种写法: bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
第二种写法: bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
参数说明: –conf/-c:表示配置文件存储在 conf/目录 –name/-n:表示给 agent 起名为 a1 –conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf文件。 -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error。 -
使用 netcat 工具向本机的 44444 端口发送内容 nc localhost 44444
-
在 Flume 监听页面观察接收数据情况
注意一点的是,在其它的机子中连接显示拒绝
注:使用 jps -ml 查看 Flume 进程。
实时监控单个追加文件
-
案例需求:实时监控 Hive 日志,并上传到 HDFS 中 -
需求分析: -
实现步骤:
-
Flume 要想将数据输出到 HDFS,须持有 Hadoop 相关 jar 包 将 commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
拷贝到/opt/module/flume/lib 文件夹下。
👉 点击下载jar包
-
创建 flume-file-hdfs.conf 文件 创建文件 vim flume-file-hdfs.conf
注:要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive 日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行 Linux命令来读取文件。 添加如下内容
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop2:9000/flume/%Y%m%d/%H
a1.sinks.k1.hdfs.filePrefix = logs-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = hour
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注意
对于所有与时间相关的转义序列,Event Header 中必须存在以 “timestamp”的 key(除hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自动添加timestamp)。
a3.sinks.k3.hdfs.useLocalTimeStamp = true
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-file-hdfs.conf
```
sbin/start-dfs.sh
sbin/start-yarn.sh
bin/hive
```
-
在 HDFS 上查看文件 持续生成日志
实时监控目录下多个新文件
-
案例需求:使用 Flume 监听整个目录的文件,并上传至 HDFS -
需求分析: -
实现步骤:
-
创建配置文件 flume-dir-hdfs.conf 创建一个文件 vim flume-dir-hdfs.conf
添加如下内容
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/module/flume/upload
a1.sources.r1.fileHeader = true
a1.sources.r1.fileSuffix = .COMPLETED
a1.sources.r1.ignorePattern = ([^ ]*\.tmp)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop2:9000/flume/%Y%m%d/%H
a1.sinks.k1.hdfs.filePrefix = logs-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = hour
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
-
向 upload 文件夹中添加文件 在/opt/module/flume 目录下创建 upload 文件夹 创建1.txt文件,写入 hello
hello!
hello world!
-
启动监控文件夹命令 bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
说明:在使用 Spooling Directory Source 时不要在监控目录中创建并持续修改文件上传完成的文件会以.COMPLETED 结尾被监控文件夹每 500 毫秒扫描一次文件变动 -
查看 文件中的数据为: hello
hello!
hello world!
-
再增加一个新的文件2.txt 开始吧!!!
2.txt!!!!
在很短时间内增加三个文件3.txt,4.txt,5.txt
可以看到在同一分钟内新增的文件数据会进入到同一个文件中!
-
查看一下我们的upload文件夹,每个文件加了一个后缀
千万不要修改名字!!!(你可以试试😄)
实时监控目录下的多个追加文件
Exec source 适用于监控一个实时追加的文件,但不能保证数据不丢失;Spooldir Source 能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控;而 TaildirSource 既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控。
-
案例需求:使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS -
需求分析: -
实现步骤:
-
创建files文件夹,并创建file1.txt和file2.txt -
创建配置文件 flume-taildir-hdfs.conf 创建一个文件 vim flume-taildir-hdfs.conf
添加如下内容
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/position/tail_dir.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume/files/file1.txt
a1.sources.r1.filegroups.f2 = /opt/module/flume/files/file2.txt
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop2:9000/flume/%Y%m%d/%H
a1.sinks.k1.hdfs.filePrefix = logs-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = hour
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
-
启动监控文件夹命令 ./bin/flume-ng agent -c ./conf/ -n a1 -f ./job/flume-taildir-hdfs.conf
-
向 files 文件夹中追加内容 echo hello >> file1.txt
echo world >> file2.txt
-
查看 HDFS 上的数据 -
Taildir 说明: Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File 的格式如下 [{"inode":34707633,"pos":24,"file":"/opt/module/flume/files/file1.txt"},{"inode":34707634,"pos":37,"file":"/opt/module/flume/files/file2.txt"}]
hello
hello
hello
file1
world
world
world
file2
新增数据
注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。
可使用的类型还有很多,通过上面几种基本有一定的了解了,更多的可以去官网看相关配置和案列。🤞
Flume 进阶
Flume 事务
Flume Agent 内部原理
主要组件:
-
ChannelSelector ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)。 ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel。 -
SinkProcessor SinkProcessor 共 有 三 种 类 型 , 分 别 是
DefaultSinkProcessor 对 应 的 是 单 个 的 Sink LoadBalancingSinkProcessor 和 FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以实现故障转移的功能。
Flume 拓扑结构
简单串联
这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量,flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。
复制和多路复用
Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地。
负载均衡和故障转移
Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。
聚合
这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析。
Flume 企业开发案例
复制和多路复用
-
案例需求 使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责data输出到 Local FileSystem。 -
需求分析 -
实现步骤
-
准备工作 在/opt/module/flume/job 目录下创建 group1 文件夹 在/opt/module/datas/目录下创建 flume3 文件夹 -
创建 flume-file-flume.conf 配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir。 编辑配置文件 vim flume-file-flume.conf
添加如下内容
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.selector.type = replicating
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop2
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop2
a1.sinks.k2.port = 4142
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
-
创建 flume-flume-hdfs.conf 配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。 编辑配置文件 vim flume-flume-hdfs.conf
添加如下内容
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop2
a2.sources.r1.port = 4141
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop2:9000/flume2/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = flume2-
a2.sinks.k1.hdfs.round = true
a2.sinks.k1.hdfs.roundValue = 1
a2.sinks.k1.hdfs.roundUnit = hour
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.batchSize = 100
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.rollInterval = 600
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
-
创建 flume-flume-dir.conf 配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。 编辑配置文件 vim flume-flume-dir.conf
添加如下内容
a3.sources = r1
a3.sinks = k1
a3.channels = c2
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop2
a3.sources.r1.port = 4142
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume/datas/flume3
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
**提示**:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录
-
执行配置文件 分别启动对应的 flume 进程:flume-flume-dir,flume-flume-hdfs,flume-file-flume。 ./bin/flume-ng agent --conf ./conf/ --name a1 --conf-file ./job/group1/flume-file-flume.conf
./bin/flume-ng agent --conf ./conf/ --name a3 --conf-file ./job/group1/flume-flume-dir.conf
./bin/flume-ng agent --conf ./conf/ --name a2 --conf-file ./job/group1/flume-flume-hdfs.conf
-
检查 HDFS 上数据 -
/opt/module/flume/datas/flume3
负载均衡和故障转移
-
案例需求 使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能。 -
需求分析 -
实现步骤
-
准备工作 在/opt/module/flume/job 目录下创建 group2 文件夹 -
创建 flume-netcat-flume.conf 配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给 flume-flume-console1 和 flume-flume-console2。 配置以下内容
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop2
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop2
a1.sinks.k2.port = 4142
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.chnnels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
-
创建 flume-flume-console1.conf 配置上级 Flume 输出的 Source,输出是到本地控制台 添加一下内容
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop2
a2.sources.r1.port = 4141
a2.sinks.k1.type = logger
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
-
创建 flume-flume-console2.conf 配置上级 Flume 输出的 Source,输出是到本地控制台。 添加一下内容
a3.sources = r1
a3.sinks = k1
a3.channels = c2
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop2
a3.sources.r1.port = 4142
a3.sinks.k1.type = logger
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
-
执行配置文件 分别开启对应配置文件:flume-flume-console2,flume-flume-console1,flume-netcat-flume。 bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
-
使用 netcat 工具向本机的 44444 端口发送内容 nc localhost 44444
-
查看 Flume2 及 Flume3 的控制台打印日志 -
将 Flume2 kill,观察 Flume3 的控制台打印情况。
聚合
-
案例需求 hadoop102 上的 Flume-1 监控文件/opt/module/data/group.log, hadoop103 上的 Flume-2 监控某一个端口的数据流, Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台。 -
需求分析 -
实现步骤
在 hadoop2、hadoop3 以及 hadoop4 的/opt/module/flume/job目录下创建一个 group3文件夹。 分发 Flume xsync flume
-
创建 flume1-logger-flume.conf 配置 Source 用于监控 hive.log 文件,配置 Sink 输出数据到下一级 Flume。 在 hadoop2 上编辑配置文件 vim flume1-logger-flume.conf
添加如下内容
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop4
a1.sinks.k1.port = 4141
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
-
创建 flume2-netcat-flume.conf 配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume: 在 hadoop3 上编辑配置文件 vim flume2-netcat-flume.conf
添加如下内容
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop3
a2.sources.r1.port = 44444
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop4
a2.sinks.k1.port = 4141
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
-
创建 flume3-flume-logger.conf 配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台。 在 hadoop4 上编辑配置文件 vim flume3-flume-logger.conf
添加如下内容
a3.sources = r1
a3.sinks = k1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop4
a3.sources.r1.port = 4141
a3.sinks.k1.type = logger
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
-
执行配置文件 分别开启对应配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf。 bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume1-logger-flume.conf
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume2-netcat-flume.conf
-
在 hadoop3 上向/opt/module 目录下的 group.log 追加内容 -
在 hadoop2 上向 44444 端口发送数据 -
检查 hadoop4 上数据
自定义 Interceptor
-
案例需求 使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。 -
需求分析 在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的key 赋予不同的值。 在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义 interceptor 区分数字和字母,将其分别发往不同的分析系统(Channel)。 -
实现步骤
-
创建一个 maven 项目,并引入以下依赖。 <dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
-
定义 CustomInterceptor 类并实现 Interceptor 接口。 package top.codekiller.flume.interceptor;
import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.ArrayList; import java.util.List; import java.util.Map;
public class TypeInterceptor implements Interceptor {
private List<Event> addHeaderEvent = null;
public void initialize() {
this.addHeaderEvent = new ArrayList<Event>();
}
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String body = new String(event.getBody());
if(body.contains("hello")){
headers.put("type","codekiller");
}else{
headers.put("type","bigdata");
}
return event;
}
public List<Event> intercept(List<Event> list) {
this.addHeaderEvent.clear();
for (Event event : list) {
this.addHeaderEvent.add(intercept(event));
}
return this.addHeaderEvent;
}
public void close() {
}
public static class Builder implements Interceptor.Builder{
public Interceptor build() {
return new TypeInterceptor();
}
public void configure(Context context) {
}
}
}
注意要创建内部类
-
编辑 flume 配置文件 为 hadoop2 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor。 创建flume-intercetor-1.conf文件,内容如下:
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop2
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = top.codekiller.flume.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.codekiller = c1
a1.sources.r1.selector.mapping.bigdata = c2
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop3
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop4
a1.sinks.k2.port = 4242
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
为 hadoop3 上的 Flume2 配置一个 avro source 和一个 logger sink。 创建flume-intercetor-2.conf文件,内容如下: a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop3
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
为 hadoop4 上的 Flume3 配置一个 avro source 和一个 logger sink。 创建flume-intercetor-3.conf文件,内容如下: a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop4
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
-
分别在 hadoop2,hadoop3,hadoop4 上启动 flume 进程,注意先后顺序。 ./bin/flume-ng agent -c ./conf/ -n a1 -f ./job/interceptor/flume-intercetor-1.conf
./bin/flume-ng agent -c ./conf/ -n a1 -f ./job/interceptor/flume-intercetor-2.conf
./bin/flume-ng agent -c ./conf/ -n a1 -f ./job/interceptor/flume-intercetor-3.conf
-
在 hadoop2 使用 netcat 向 localhost:44444 发送信息(可包含hello)。 -
观察hadoop3和hadoop4的日打印
自定义 Source
介绍
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。
官方也提供了自定义 source 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#source 根据官方说明自定义MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。
实现相应方法:
- getBackOffSleepIncrement()//暂不用
- getMaxBackOffSleepInterval()//暂不用
- configure(Context context)//初始化 context(读取配置文件内容)
- process()//获取数据封装成 event 并写入 channel,这个方法将被循环调用。
使用场景:读取 MySQL 数据或者其他文件系统
需求和设计
-
需求 使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置。 -
分析 -
编码
-
导入依赖 <dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
-
编写代码 package top.codekiller.flume.source;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.mortbay.jetty.webapp.Configuration;
public class MySource extends AbstractSource implements Configurable, PollableSource {
private String prefix;
private String suffix;
public void configure(Context context) {
this.prefix = context.getString("prefix");
this.suffix = context.getString("suffix", "后缀") ;
}
public Status process() throws EventDeliveryException {
Status status = null;
try {
for(int i=0;i<5;i++){
SimpleEvent event = new SimpleEvent();
event.setBody((this.prefix+"-----"+i+"-----"+this.suffix).getBytes());
getChannelProcessor().processEvent(event);
status = Status.READY;
}
} catch (Exception e) {
e.printStackTrace();
status = Status.BACKOFF;
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return status;
}
public long getBackOffSleepIncrement() {
return 0;
}
public long getMaxBackOffSleepInterval() {
return 0;
}
}
-
打包 将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下 -
配置文件 创建mysource.conf文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = top.codekiller.flume.source.MySource
a1.sources.r1.suffix = hello
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
-
开启任务 bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
自定义 Sink
介绍
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。
官方也提供了自定义 sink 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink 根据官方说明自定义
MySink 需要继承 AbstractSink 类并实现 Configurable 接口。
实现相应方法:
使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统。
需求和设计
-
需求 使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。 -
流程分析
-
编码 package top.codekiller.flume.sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable {
private final Logger logger = LoggerFactory.getLogger(MySink.class);
private String prefix;
private String suffix;
public void configure(Context context) {
this.prefix = context.getString("prefix");
this.suffix = context.getString("suffix", "后缀") ;
}
public Status process() throws EventDeliveryException {
Status status = null;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
try {
Event event = channel.take();
String body = new String(event.getBody());
logger.info(this.prefix+"-----"+body+"-----"+this.suffix);
transaction.commit();
status = Status.READY;
} catch (ChannelException e) {
e.printStackTrace();
transaction.rollback();
status = Status.BACKOFF;
} finally {
transaction.close();
}
return status;
}
}
-
打包 将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下。 -
配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = top.codekiller.flume.sink.MySink
a1.sinks.k1.prefix = codekiller
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
-
开启任务 bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
Flume 数据流监控
Ganglia 的安装与部署
-
安装 httpd 服务与 php yum -y install httpd php
-
安装其他依赖 yum -y install rrdtool perl-rrdtool rrdtool-devel
yum -y install apr-devel
-
安装 ganglia sudo rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
yum -y install ganglia-gmetad
yum -y install ganglia-web
yum -y install ganglia-gmond
Ganglia 由 gmond、gmetad 和 gweb 三部分组成。 gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用gmond,你可以很容易收集很多系统指标数据,如 CPU、内存、磁盘、网络和活跃进程的数据等。 gmetad(Ganglia Meta Daemon)整合所有信息,并将其以 RRD 格式存储至磁盘的服务。 gweb(Ganglia Web)Ganglia 可视化工具,gweb 是一种利用浏览器显示 gmetad 所存储数据的 PHP 前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据。 -
修改配置文件/etc/httpd/conf.d/ganglia.conf
vim /etc/httpd/conf.d/ganglia.conf
修改配置:
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
Order deny,allow
Allow from all
</Location>
-
修改配置文件/etc/ganglia/gmetad.conf vim /etc/ganglia/gmetad.conf
修改内容(不是注释的) data_source "hadoop2" 192.168.234.136
-
修改配置文件/etc/ganglia/gmond.conf cluster {
name = "hadoop2"
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
address
Without
host = 192.168.234.136
port = 8649
ttl = 1
}
udp_recv_channel {
port = 8649
bind = 192.168.234.136
retry_bind = true
really
}
-
修改配置文件/etc/selinux/config
SELINUX=disabled
SELINUXTYPE=targeted
提示:selinux 本次生效关闭必须重启,如果此时不想重启,可以临时生效之: setenforce 0
-
启动 ganglia service httpd start
service gmetad start
service gmond start
-
打开网页浏览 ganglia 页面
http://192.168.234.136/ganglia
提示:如果完成以上操作依然出现权限不足错误,请修改/var/lib/ganglia 目录的权限:
chmod -R 777 /var/lib/ganglia
操作 Flume 测试监控
- 修改/opt/module/flume/conf 目录下的 flume-env.sh 配置:
JAVA_OPTS="-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=192.168.234.136:8649 -Xms100m -Xmx200m"
-
启动 Flume 任务 bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf - Dflume.root.logger==INFO,console -Dflume.monitoring.type=ganglia - Dflume.monitoring.hosts=192.168.9.102:8649
-
发送数据观察 ganglia 监测图 nc localhost 44444
![](https://img-blog.csdnimg.cn/20201023151818225.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ0NzY2ODgz,size_16,color_FFFFFF,t_70#pic_center) 图例说明
企业真实面试题(重点)
-
你是如何实现 Flume 数据传输的监控的 使用第三方框架 Ganglia 实时监控 Flume。 -
Flume 的 Source,Sink,Channel 的作用?你们 Source 是什么类型?
-
作用 Source 组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy Channel 组件对采集到的数据进行缓存,可以存放在 Memory 或 File 中。 Sink 组件是用于把数据发送到目的地的组件,目的地包括 HDFS、Logger、avro、thrift、ipc、file、Hbase、solr、自定义。 -
我公司采用的 Source 类型为 监控后台日志:exec 监控后台产生日志的端口:netcat Exec spooldir -
Flume 的 Channel Selectors -
Flume 参数调优
-
Source 增加 Source 个(使用 Tair Dir Source 时可增加 FileGroups 个数)可以增大 Source 的读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个 Source 以保证 Source 有足够的能力获取到新产生的数据。 batchSize 参数决定 Source 一次批量运输到 Channel 的 event 条数,适当调大这个参数可以提高 Source 搬运 Event 到 Channel 时的性能。 -
Channel type 选择 memory 时 Channel 的性能最好,但是如果 Flume 进程意外挂掉可能会丢失数据。type 选择 file 时 Channel 的容错性更好,但是性能上会比 memory channel 差。 使用 file Channel 时 dataDirs 配置多个不同盘下的目录可以提高性能。Capacity 参数决定 Channel 可容纳最大的 event 条数。transactionCapacity 参数决定每次 Source 往 channel 里面写的最大 event 条数和每次 Sink 从 channel 里面读的最大 event条数。transactionCapacity 需要大于 Source 和 Sink 的 batchSize 参数。 -
Sink 增加 Sink 的个数可以增加 Sink 消费 event 的能力。Sink 也不是越多越好够用就行,过多的 Sink 会占用系统资源,造成系统资源不必要的浪费。 batchSize 参数决定 Sink 一次批量从 Channel 读取的 event 条数,适当调大这个参数可以提高 Sink 从 Channel 搬出 event 的性能。 -
Flume 的事务机制
Flume 的事务机制(类似数据库的事务机制):Flume 使用两个独立的事务分别负责从Soucrce 到 Channel,以及从 Channel 到 Sink 的事件传递。比如 spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到 Channel 且提交成功,那么 Soucrce 就将该文件标记为完成。同理,事务以类似的方式处理从 Channel 到 Sink 的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持到 Channel 中,等待重新传递。
-
Flume 采集数据会丢失吗? 根据 Flume 的架构原理,Flume 是不可能丢失数据的,其内部有完善的事务机制,Source 到 Channel 是事务性的,Channel 到 Sink 是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是 Channel 采用 memoryChannel,agent 宕机导致数据丢失,或者 Channel 存储数据已满,导致 Source 不再写入,未写入的数据丢失。 Flume 不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由 Sink 发出,但是没有接收到响应,Sink 会再次发送数据,此时可能会导致数据的重复。
|