IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【云原生】简单谈谈海量数据采集组件Flume的理解 -> 正文阅读

[大数据]【云原生】简单谈谈海量数据采集组件Flume的理解

1、特点

flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力

2、Flume的可靠性:

当节点出现故障时,日志能够被传送到其他节点上而不会丢失。
Flume提供了三种级别的可靠性保障:

  1. end to end:收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。
  2. Store on failure:这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送
  3. Besteffort:数据发送到接收方后,不会进行确认

3、Flume的可恢复性:

使用FileChannel模式运行,会将数据采集的事件持久化在本地文件系统里(性能较差)

4、Flume的一些核心概念:

名称说明
Agent(代理)使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
Client(委托)生产数据,运行在一个独立的线程。
Source(来源)从Client收集数据,传递给Channel。
Sink(接收器)从Channel收集数据,运行在一个独立线程。
Channel(通道)连接 sources 和 sinks ,类似一个队列。
Events(事件)可以是日志记录、 avro 对象等。

5、部署类型:

5.1 单一流程:

Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成。

image-20210510074804950

5.2 多代理流程:

可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的Agent
的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。

image-20210510074846356

5.3 流的合并:

这种情况应用的场景比较多,比如要收集Web网站的用户行为日志, Web网站为了可用性使用的负载集群模式,每个节点都产生用户行为日志,可以为每
个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。

image-20210510074927560

5.4 多路复用流:

Flume还支持多级流,什么多级流?来举个例子,当syslog, java, nginx、 tomcat等混合在一起的日志流开始流入一个agent后,可以agent中将混杂的日志流分开,然后给每种日志建立一个自己的传输通道。

image-20210510075008759

5.5 负载平衡:

将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上。

6、Flume抓取日志类型

1. Avro Source监听Avro端口接收从外部Avro客户端发送来的数据流。如果与上一层Agent的 Avro Sink 配合使用就组成了一个分层的拓扑结构

属性默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是: avro
bind监听的服务器名hostname或者ip
port监听的端口
threads生成的最大工作线程数量
selector.type可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器的相关属性
compression-typenone可选值: nonedeflate 。这个类型必须跟Avro Source相匹配
sslfalse设置为 true 可启用SSL加密,如果为true必须指定下面的 keystorekeystore-password
keystoreSSL加密使用的Java keystore文件路径
keystore-passwordJava keystore的密码
keystore-typeJKSJava keystore的类型. 可选值有 JKSPKCS12
exclude-protocolsSSLv3指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除
ipFilterfalse设置为true可启用ip过滤(netty方式的avro)
ipFilterRulesnetty ipFilter的配置(参考下面的ipFilterRules详细介绍和例子)

配置范例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

2. Spool:Spool监测配置的目录下新增的文件,并将文件中的数据读取出来

这个Source允许你把要收集的文件放入磁盘上的某个指定目录。它会将监视这个目录中产生的新文件,并在新文件出现时从新文件中解析数据出来。数据解析逻辑是可配置的。在新文件被完全读入Channel之后会重命名该文件以示完成(也可以配置成读完后立即删除)。

与Exec Source不同,Spooling Directory Source是可靠的,即使Flume重新启动或被kill,也不会丢失数据。同时作为这种可靠性的代价,指定目录中的文件必须是不可变的、唯一命名的。Flume会自动检测避免这种情况发生,如果发现问题,则会抛出异常:

  1. 如果文件在写入完成后又被再次写入新内容,Flume将向其日志文件(这是指Flume自己logs目录下的日志文件)打印错误并停止处理。
  2. 如果在以后重新使用以前的文件名,Flume将向其日志文件打印错误并停止处理。

为了避免上述问题,生成新文件的时候文件名加上时间戳是个不错的办法。

属性名默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是: spooldir.
spoolDirFlume Source监控的文件夹目录,该目录下的文件会被Flume收集
fileSuffix.COMPLETED被Flume收集完成的文件被重命名的后缀。1.txt被Flume收集完成后会重命名为1.txt.COMPLETED
deletePolicynever是否删除已完成收集的文件,可选值: neverimmediate
fileHeaderfalse是否添加文件的绝对路径名(绝对路径+文件名)到header中。
fileHeaderKeyfile添加绝对路径名到header里面所使用的key(配合上面的fileHeader一起使用)
basenameHeaderfalse是否添加文件名(只是文件名,不包括路径)到header 中
basenameHeaderKeybasename添加文件名到header里面所使用的key(配合上面的basenameHeader一起使用)
includePattern^.*$指定会被收集的文件名正则表达式,它跟下面的ignorePattern不冲突,可以一起使用。如果一个文件名同时被这两个正则匹配到,则会被忽略,换句话说ignorePattern的优先级更高
ignorePattern^$指定要忽略的文件名称正则表达式。它可以跟 includePattern 一起使用,如果一个文件被 ignorePatternincludePattern 两个正则都匹配到,这个文件会被忽略。
trackerDir.flumespool用于存储与文件处理相关的元数据的目录。如果配置的是相对目录地址,它会在spoolDir中开始创建
consumeOrderoldest设定收集目录内文件的顺序。默认是“先来先走”(也就是最早生成的文件最先被收集),可选值有: oldestyoungestrandom 。当使用oldest和youngest这两种选项的时候,Flume会扫描整个文件夹进行对比排序,当文件夹里面有大量的文件的时候可能会运行缓慢。 当使用random时候,如果一直在产生新的文件,有一部分老文件可能会很久才会被收集
pollDelay500Flume监视目录内新文件产生的时间间隔,单位:毫秒
recursiveDirectorySearchfalse是否收集子目录下的日志文件
maxBackoff4000等待写入channel的最长退避时间,如果channel已满实例启动时会自动设定一个很低的值,当遇到ChannelException异常时会自动以指数级增加这个超时时间,直到达到设定的这个最大值为止。
batchSize100每次批量传输到channel时的size大小
inputCharsetUTF-8解析器读取文件时使用的编码(解析器会把所有文件当做文本读取)
decodeErrorPolicyFAIL当从文件读取时遇到不可解析的字符时如何处理。 FAIL :抛出异常,解析文件失败; REPLACE :替换掉这些无法解析的字符,通常是用U+FFFD; IGNORE :忽略无法解析的字符。
deserializerLINE指定一个把文件中的数据行解析成Event的解析器。默认是把每一行当做一个Event进行解析,所有解析器必须实现EventDeserializer.Builder接口
deserializer.*解析器的相关属性,根据解析器不同而不同
bufferMaxLines(已废弃)
bufferMaxLineLength5000(已废弃)每行的最大长度。改用 deserializer.maxLineLength 代替
selector.typereplicating可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器相关的属性配置

配置范例:

a1.channels = ch-1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true

3. Exec:EXEC执行一个给定的命令获得源的变化。

这个source在启动时运行给定的Unix命令,并期望该进程在标准输出上连续生成数据(stderr 信息会被丢弃,除非属性 logStdErr 设置为 true )。 如果进程因任何原因退出, 则source也会退出并且不会继续生成数据。 综上来看cat [named pipe]或tail -F [file]这两个命令符合要求可以产生所需的结果,而date这种命令可能不会,因为前两个命令(tail 和 cat)能产生持续的数据流,而后者(date这种命令)只会产生单个Event并退出。

属性默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是: exec
command所使用的系统命令,一般是cat 或者tail
shell设置用于运行命令的shell。 例如 / bin / sh -c。 仅适用于依赖shell功能的命令,如通配符、后退标记、管道等。
restartThrottle10000尝试重新启动之前等待的时间(毫秒)
restartfalse如果执行命令线程挂掉,是否重启
logStdErrfalse是否会记录命令的stderr内容
batchSize20读取并向channel发送数据时单次发送的最大数量
batchTimeout3000向下游推送数据时,单次批量发送Event的最大等待时间(毫秒),如果等待了batchTimeout毫秒后未达到一次批量发送数量,则仍然执行发送操作。
selector.typereplicating可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器相关的属性配置

配置范例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

4. Multiport Syslog TCP Source

这是一个增强版的 Syslog TCP Source ,它更新、更快、支持监听多个端口。因为支持了多个端口,port参数已经改为了ports。这个Source使用了Apache mina(一个异步通信的框架,同netty类似)来实现。 提供了对RFC-3164和许多常见的RFC-5424格式消息的支持。 支持每个端口配置不同字符集。

属性默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是:multiport_syslogtcp
host要监听的hostname或者IP地址
ports一个或多个要监听的端口,多个用空格分开
eventSize2500解析成Event的每行数据的最大字节数
keepFieldsnone是否保留syslog消息头中的一些属性到Event中,可选值 allnone 或自定义指定保留的字段,如果设置为all,则会保留Priority, Timestamp 和Hostname三个属性到Event中。 也支持单独指定保留哪些属性(支持的属性有:priority, version, timestamp, hostname),用空格分开即可。现在已经不建议使用 truefalse ,建议改用 allnone 了。
portHeader如果配置了这个属性值,端口号会被存到每个Event的header里面用这个属性配置的值当key。这样就可以在拦截器或者channel选择器里面根据端口号来自定义路由Event的逻辑。
charset.defaultUTF-8解析syslog使用的默认编码
charset.port.针对具体某一个端口配置编码
batchSize100每次请求尝试处理的最大Event数量,通常用这个默认值就很好。
readBufferSize1024内部Mina通信的读取缓冲区大小,用于性能调优,通常用默认值就很好。
numProcessors(自动分配)处理消息时系统使用的处理器数量。 默认是使用Java Runtime API自动检测CPU数量。 Mina将为每个检测到的CPU核心生成2个请求处理线程,这通常是合理的。
selector.typereplicating可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器相关的属性配

配置范例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port

5. JSONHandler:接受json格式数据。

这是HTTP Source的默认解析器(handler),根据请求所使用的编码把http请求中json格式的数据解析成Flume
Event数组(不管是一个还是多个,都以数组格式进行存储),
如果未指定编码,默认使用UTF-8编码。这个handler支持UTF-8、UTF-16和UTF-32编码。

json数据格式如下

[{
  "headers" : {
             "timestamp" : "434324343",
             "host" : "random_host.example.com"
             },
  "body" : "random_body"
  },
  {
  "headers" : {
             "namenode" : "namenode.example.com",
             "datanode" : "random_datanode.example.com"
             },
  "body" : "really_random_body"
  }]

在一个POST请求中发送的所有 Event 视为一个批处理,并在一个事务中插入到 channel。

属性默认值解释
channels与Source绑定的channel,多个用空格分开
type组件类型,这个是: http
port要监听的端口
bind0.0.0.0要监听的hostname或者IP地址
handlerorg.apache.flume.source.http.JSONHandler所使用的handler,需填写handler的全限定类名
handler.*handler的一些属性配置
selector.typereplicating可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.*channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors该source所使用的拦截器,多个用空格分开
interceptors.*拦截器相关的属性配
enableSSLfalse设置为true启用SSL,HTTP Source不支持SSLv3协议
excludeProtocolsSSLv3指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除
keystorekeystore 文件的位置
keystorePasswordKeystore 的密码

6. HDFS Sink:配置Hadoop接受数据。

这个Sink将Event写入Hadoop分布式文件系统(也就是HDFS)。 目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据写入的时间、文件大小或Event数量定期滚动文件(关闭当前文件并创建新文件)。 它还可以根据Event自带的时间戳或系统时间等属性对数据进行分区。 存储文件的HDFS目录路径可以使用格式转义符,会由HDFS Sink进行动态地替换,以生成用于存储Event的目录或文件名。 使用此Sink需要安装hadoop, 以便Flume可以使用Hadoop的客户端与HDFS集群进行通信。 注意, 需要使用支持sync() 调用的Hadoop版本 。

属性名默认值解释
channel与 Sink 连接的 channel
type组件类型,这个是: hdfs
hdfs.pathHDFS目录路径(例如:hdfs://namenode/flume/webdata/)
hdfs.filePrefixFlumeDataFlume在HDFS文件夹下创建新文件的固定前缀
hdfs.fileSuffixFlume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置)
hdfs.inUsePrefixFlume正在写入的临时文件前缀,默认没有
hdfs.inUseSuffix.tmpFlume正在写入的临时文件后缀
hdfs.rollInterval30当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
hdfs.rollSize1024当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
hdfs.rollCount10当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
hdfs.idleTimeout0关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒
hdfs.batchSize100向 HDFS 写入内容时每次批量操作的 Event 数量
hdfs.codeC压缩算法。可选值:gzipbzip2lzolzop 、 ``snappy`
hdfs.fileTypeSequenceFile文件格式,目前支持: SequenceFileDataStreamCompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数
hdfs.maxOpenFiles5000允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭
hdfs.minBlockReplicas指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。
hdfs.writeFormatWritable文件写入格式。可选值: TextWritable 。在使用 Flume 创建数据文件之前设置为 Text,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件。
hdfs.callTimeout10000允许HDFS操作文件的时间,比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。(毫秒)
hdfs.threadsPoolSize10每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)
hdfs.rollTimerPoolSize1每个HDFS Sink实例调度定时文件滚动的线程数
hdfs.kerberosPrincipal用于安全访问 HDFS 的 Kerberos 用户主体
hdfs.kerberosKeytab用于安全访问 HDFS 的 Kerberos keytab 文件
hdfs.proxyUser代理名
hdfs.roundfalse是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)
hdfs.roundValue1向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30
hdfs.roundUnitsecond向下舍入的单位,可选值: secondminutehour
hdfs.timeZoneLocal Time解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai
hdfs.useLocalTimeStampfalse使用日期时间转义符时是否使用本地时间戳(而不是使用 Event header 中自带的时间戳)
hdfs.closeTries0开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀;如果设置为0,Sink会一直尝试重命名文件直到成功为止;关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。
hdfs.retryInterval180连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。
serializerTEXTEvent 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。
serializer.*根据上面 serializer 配置的类型来根据需要添加序列化器的参数

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

7. File Roll Sink:配置本地文件接受数据。

属性默认值解释
channel与 Sink 绑定的 channel
type组件类型,这个是: file_roll.
sink.directoryEvent 将要保存的目录
sink.pathManagerDEFAULT配置使用哪个路径管理器,这个管理器的作用是按照规则生成新的存储文件名称,可选值有: defaultrolltime。default规则:prefix+当前毫秒值+“-”+文件序号+“.”+extension;rolltime规则:prefix+yyyyMMddHHmmss+“-”+文件序号+“.”+extension;注:prefix 和 extension 如果没有配置则不会附带
sink.pathManager.extension如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的扩展名
sink.pathManager.prefix如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的文件名的固定前缀
sink.rollInterval30表示每隔30秒创建一个新文件进行存储。如果设置为0,表示所有 Event 都会写到一个文件中。
sink.serializerTEXT配置 Event 序列化器,可选值有:textheader_and_textavro_event 或者自定义实现了 EventSerializer.Builder 接口的序列化器的全限定类名.。 text 只会把 Event 的 body 的文本内容序列化; header_and_text 会把 header 和 body 内容都序列化。
batchSize100每次请求批处理的 Event 数

配置范例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-07-03 10:53:18  更:2022-07-03 10:56:09 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:35:21-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码