IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 一 Flume入门和典型案例实操 -> 正文阅读

[大数据]一 Flume入门和典型案例实操

Flume(日志收集, 聚合, 传输的工具)

一, Flume 概述

1. Flume 定义

[定义]

  • Flume 是 Cloudera 提供的一个高可用(available)的, 高可靠(reliable), 分布式(distribute)的海量日志采集(collecting), 聚合(aggregating)和传输(moving)的系统.
  • Flume 基于流式架构, 灵活简单.

[Why Flume?]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LfMx27dN-1629794979099)(2021-08-10-14-46-44.png)]

2. Flume 基础架构

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1lCcWfc3-1629794979102)(2021-08-10-14-47-28.png)]

2.1 Agent

  • Agent是一个JVM进程, 它以事件的形式将数据从源头送至目的地. Agent由 Source, Channel, Sink 三部分组成.

2.2 Source(接收并处理数据)

  • Source 是负责接收数据到Flume Agent的组件, Source组件可以处理各种类型, 各种格式的日志数据, 包括avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy。

2.3 Channel(缓冲区, 平衡读写速度)

  • Channel是位于Source和Sink之间的缓冲区. 因此, Channel允许Source和Sink运作在不同的速率上.
  • Channel是线程安全的, 可以同时处理几个Source 的写入操作和几个Sink的读取操作.
  • Flume 自带两种Channel: Memory Channl, File ChannelKafka Channel

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0vN0hC9P-1629794979104)(2021-08-10-14-56-44.png)]

2.4 Sink(批量写出数据)

  • Sink 不断轮询Channel中的时间并并批量的移除它们, 并将这些时间批量写出到存储或索引系统, 或者被发送到另一个Flume Agent;
  • Sink组件的目的地包括hdfs, file,Hbase,logger, 自定义等等.

2.5 Event

  • Event 是Flume数据传输的基本单元, 以Event的形式将数据从源头送至目的地. Event由Header和Body两部分组成, Header用来存放该event的一些属性, 为K-V结构. Body用来存放该条数据, 形式为字节数组.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QuV4zVz9-1629794979107)(2021-08-10-15-05-16.png)]

二, Flume 安装部署和入门案例实操

2.1 安装部署

  1. 下载Flume-1.7.0
  2. 解压到/opt/module, 并改名为flume-1.7.0
tar -zxvf apache-flume-1.7.0.tar.gz -C /opt/module/

mv apache-flume-1.7.0 flume-1.7.0
  1. 将 flume-1.7.0/conf 下的 flume-env.sh.template 文件修改为 flume-env.sh,并配置 flume-env.sh 文件
mv flume-env.sh.template flume-env.sh

#修改
vi flume-env.sh

#添加jdk依赖
export JAVA_HOME=/opt/module/jdk1.8

2.2 案例实操

2.2.1 监控端口数据官方案例

[需求和需求分析]

  • 使用flume监听一个端口, 收集该端口数据, 并打印到控制台;

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RSrXe3RR-1629794979109)(2021-08-10-16-54-55.png)]

[实现步骤]

  1. 安装netcat;
    sudo yum install -y nc
  1. 判断44444端口是否被占用
    sudo netstat -tunlp | grep 44444
  1. 创建Flume Agent 配置文件 flume-netcat-logger.conf

在flume安装目录下创建job文件夹并进入job文件夹, 然后在job文件夹下创建Flume Agent 配置文件 flume-netcat-logger.conf(配置文件是自定义命名, 按照给定模板修改.)

# 1. 给agent(我们命名为a1)的各个组件(source->channel-->sink)命名
a1.sources = r1
a1.sinks=k1
a1.channels=c1

#2. 配置source的类型, 绑定主机地址, 监听端口
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=44444

#3. 配置sink的类型
a1.sinks.k1.type = logger


#4. 配置channel的存储类型, 存储event的上限, 提交事务的阈值
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#5. 绑定三个组件
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

对配置文件模板的说明:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3nFRa5yj-1629794979684)(2021-08-11-15-48-34.png)]

注意: 在绑定组件时, a1.sinks.k1.channel中 channel不带s, 这是因为虽然我们可以有多个sink从channel中读取数据并写出, 但是其中的每一个sink只能从唯一的一个channel中读取数据(event).

  1. 开启flume去监听端口

记住: flume-ng agent -n 自定义agent名称 -c 环境配置文件 -f 用户配置文件

flume-ng agent -n -c -f

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ry3uujPu-1629794979110)(2021-08-12-15-32-04.png)]

bin/flume-ng agent
--conf conf/(配置文件的目录) 
--name a1(agent的名称)
--conf-file job/flume-netcat-logger.conf(配置文件的名)
-Dflume.root.logger=INFO,console(日志级别为INFO, 打印到控制台)
  • 参数说明:
参数说明
–name/-n a1表示给agent起名为a1
–conf/-c conf/表示配置文件存储在conf/目录
–conf-file/-f job/flume-netcat-logger.confflume本次启动读取的配置文件是在job文件夹下的flume-netcat-logger.conf
-Dflume.root.logger=INFO,console-D表示flume运行时动态修改flume.root.logger参数属性值, 并将控制日志打印级别设置为INFO级别. 日志级别包括: log,info,warn,error.

简便写法:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qhZ5x0nG-1629794979111)(2021-08-12-15-43-47.png)]

  1. 使用netcat在新的终端向本地主机的44444端口发送信息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-c95BQrng-1629794979112)(2021-08-12-15-33-47.png)]
6. 在flume监听终端查看监听到的信息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4PQAcf7Z-1629794979113)(2021-08-12-15-34-07.png)]

2.2.2 实时监控单个文件新内容

[案例需求与分析]

案例需求: 实时监控Hive日志, 并上传到HDFS中.
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GjucuhK4-1629794979114)(2021-08-16-16-09-35.png)]

[案例实操]

  1. Flume要想将数据输入到HDFS, 需要往/opt/module/flume-1.7.0/lib放入HADOOP相关的jar包.
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4df7l498-1629794979115)(2021-08-16-16-24-47.png)]

2.创建自定义配置文件flume-file-hdfs.conf

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7WfOaOfo-1629794979116)(2021-08-16-16-30-54.png)]

#给agent中的各个组件命名
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# 配置sources (此处是exec类型的source)
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
a2.sources.r2.shell = /bin/bash -c


# 配置sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://bigdata01:8020/flume/%Y%m%d/%H


#### 上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-

#### 是否按照时间滚动文件夹(每隔一段时间重新创建文件并写入日志信息)
a2.sinks.k2.hdfs.round = true

#### 多少时间单位创造一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1

#### 重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour

#### 是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true

#### 积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 100

#### 设置文件类型, 可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream

#### 多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 30 

#### 设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700

#### 文件的滚动与Event的数量无关
a2.sinks.k2.hdfs.rollCount = 0


# 配置channel
a2.channels.c2.type = memory 
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# 绑定 sources, sinks 和 channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

tail -f tail -F的区别和使用方法

  1. tail -f, 根据文件描述符进行追踪, 当文件改名或被删除, 追踪会停止;
  2. tail -F, 根据文件名进行追踪, 并保持重试, 即该文件被删除或改名后, 如果再次创建相同的文件名, 会继续进行追踪. 比如: 在hive的logs目录中, 当天的日志会记录在hive.log中, 之前每一天的hive.log会被自动命名为以那天日期为文件名的日志, 同时新建hive.log继续记录今天的日志.我们要追踪当天最新日志, 只需使用 tail -F hive.log 即可.
  1. 启动flume, 然后操作hive
# 1. 启动flume(在flume的安装目录下执行启动命令)
bin/flume-ng agent -n a2 -c conf/ -f job/flume-file-hdfs.conf -Dflume.root.logger=INFO, console

# 2. 启动hive后, 随便执行一句查询
show databases;
  1. 查看HDFS中是否生成了指定格式的文件.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Z6DOImEh-1629794979116)(2021-08-23-21-34-24.png)]

2.2.3 实时监控目录下新文件

[案例需求与分析]

案例需求: 使用Flume监听整个目录的文件创建情况, 并上传至HDFS
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-65SOxzNg-1629794979118)(2021-08-24-06-06-12.png)]

[案例实操]

  1. 在之前创建的job目录中, 创建并编写配置文件flume-dir-hdfs.conf
#1. 给agent的各个组件命名
a3.sources=r3
a3.sinks = k3
a3.channels = c3


#2. 配置 source (spooldir source)
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume-1.7.0/upload
a3.sources.r3.fileSuffix=.COMPLETED
a3.sources.r3.fileHeader = true

#### 忽略所有.tmp结尾的文件, 不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)


# 3. 配置sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata01:8020/flume/upload/%Y%m%d/%H

### 上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-

### 是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true

### 多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1

### 重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour

### 是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true

### 积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100

#### 设置文件类型, 可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream

### 多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 30 

### 设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700

### 文件的滚动与Event的数量无关
a2.sinks.k2.hdfs.rollCount = 0


# 4. 配置channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100


# 5. 把sources, sinks 分别与channel绑定
a3.sources.r3.channels =c3
a3.sinks.k3.channel = c3
  1. 启动flume
  • 启动Flume(bin/flume-ng agent -n a3 -c conf/ -f job/flume-dir-hdfs.conf -Dflume.root.logger=INFO,console), 成功启动标识如下图所示:
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7H0YfIpP-1629794979119)(2021-08-24-06-30-59.png)]
  1. 在配置文件中指定的目录下创建新的文件, 并查看hdfs和flume控制台

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WBaMKlGn-1629794979119)(2021-08-24-06-34-46.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-m8BqQy3v-1629794979120)(2021-08-24-07-55-40.png)]

注意:

  1. 当监控目录有新文件出现时, Flume会把新文件上传到HDFS中去, 在监控目录中, 上传完成的文件会以.COMPLETED后缀结尾(在配置文件中 a1.sources.r1.fileSuffix=.COMPLETED参数中定义). 后续的监控中, Flume会忽略这些COMPLETED后缀的文件, 即便这些文件的内容发生改变.
  2. 在Flume把新文件上传到HDFS的过程中, 会先在HDFS指定的目录中生成文件名.tmp的临时文件(由hdfs.inUseSuffix参数控制), 待上传完成后去掉.tmp前缀.
  3. Flume会对监控的文件夹, 每500毫秒扫描一次文件变动.
  4. 不要在监控目录中创建重名文件, 否则Flume会报错并且无法继续使用!

2.2.4 实时追踪目录下的多个追踪文件

Source的三种主要type的区别:

type作用
exec source适用于监控一个实时追加的文件, 但不能保证数据不丢失
spooldir source能保证数据不丢失, 且能够实现断点续传, 但延迟较高, 不能实时监控
taildir source即可以实现断点续传, 又可以保证数据不丢失, 还能够进行实时监控.
  1. ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。 例如:通过tail -F去获取Nginx的访问日志,如果Flume挂掉,Nginx访问日志继续导入到日志文件中,那么在Flume挂掉的这段时间中,新产生的日志Flume是无法获取到的.
  2. 为了更好的可靠性保证,可以考虑使用Spooling Directory Source,拿实时获取Nginx访问日志来说,Spooling Directory Source虽然做不到实时,但是也可以通过日志文件的切分,做到准实时。 然而Flume只能监控单一目录(子目录无法监控)下的新文件, 无法监控老文件中的新增数据.
  3. 而taildir source 就更厉害了, 它能够递归的监控指定目录下的所有文件实时的新增数据.

[案例需求和分析]

案例需求: 使用Flume监听整个目录的实时追加文件, 并上传到HDFS.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zzqGjXgY-1629794979121)(2021-08-24-09-08-25.png)]

[案例实操]

  1. 在之前创建的job目录中, 创建并编写配置文件flume-taildir-hdfs.conf
#1. 给agent的各个组件命名
a3.sources=r3
a3.sinks = k3
a3.channels = c3


#2. 配置 source (spooldir source)
a3.sources.r3.type = TAILDIR
### taildir的positionsFile, 自动创建json文件, 定期记录flume读取文件的位置.
a3.sources.r3.positionFile = /opt/module/flume-1.7.0/tail_dir.json

a3.sources.r3.filegroups = f1
a3.sources.r3.filegroups.f1 = /opt/module/flume-1.7.0/files/.*.txt


# 3. 配置sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata01:8020/tailDir/upload/%Y%m%d/%H

### 上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-

### 是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true

### 多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1

### 重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour

### 是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true

### 积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100

#### 设置文件类型, 可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream

### 多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60

### 设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700

### 文件的滚动与Event的数量无关
a2.sinks.k2.hdfs.rollCount = 0


# 4. 配置channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100


# 5. 把sources, sinks 分别与channel绑定
a3.sources.r3.channels =c3
a3.sinks.k3.channel = c3
  1. 在监控的目录中建立a.txt 和 b.txt两个文本文件, 然后启动flume, 在启动后往两个文本文件中追加内容.
  • 创建监控文件
  • 启动flume: flume-ng agent -n a3 -c conf/ -f job/flume-taildir-hdfs.conf -Dflume.root.logger=INFO,console
  • 往文本文件中追加内容, 并观察输出信息:
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cwSarN6U-1629794979122)(2021-08-24-14-26-27.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nT8IfCZL-1629794979123)(2021-08-24-14-27-55.png)]

待补充: 常用的source, sink, channel配置写法

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-25 12:16:45  更:2021-08-25 12:18:37 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:51:29-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码