1、特点:
flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力
2、Flume的可靠性:
当节点出现故障时,日志能够被传送到其他节点上而不会丢失。 Flume提供了三种级别的可靠性保障:
- end to end:收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。
- Store on failure:这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送
- Besteffort:数据发送到接收方后,不会进行确认
3、Flume的可恢复性:
使用FileChannel模式运行,会将数据采集的事件持久化在本地文件系统里(性能较差)
4、Flume的一些核心概念:
名称 | 说明 |
---|
Agent(代理) | 使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。 | Client(委托) | 生产数据,运行在一个独立的线程。 | Source(来源) | 从Client收集数据,传递给Channel。 | Sink(接收器) | 从Channel收集数据,运行在一个独立线程。 | Channel(通道) | 连接 sources 和 sinks ,类似一个队列。 | Events(事件) | 可以是日志记录、 avro 对象等。 |
5、部署类型:
5.1 单一流程:
Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成。
5.2 多代理流程:
可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的Agent 的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。
5.3 流的合并:
这种情况应用的场景比较多,比如要收集Web网站的用户行为日志, Web网站为了可用性使用的负载集群模式,每个节点都产生用户行为日志,可以为每 个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。
5.4 多路复用流:
Flume还支持多级流,什么多级流?来举个例子,当syslog, java, nginx、 tomcat等混合在一起的日志流开始流入一个agent后,可以agent中将混杂的日志流分开,然后给每种日志建立一个自己的传输通道。
5.5 负载平衡:
将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上。
6、Flume抓取日志类型
1. Avro Source监听Avro端口接收从外部Avro客户端发送来的数据流。如果与上一层Agent的 Avro Sink 配合使用就组成了一个分层的拓扑结构
属性 | 默认值 | 解释 |
---|
channels | – | 与Source绑定的channel,多个用空格分开 | type | – | 组件类型,这个是: avro | bind | – | 监听的服务器名hostname或者ip | port | – | 监听的端口 | threads | – | 生成的最大工作线程数量 | selector.type | | 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用 | selector.* | | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | interceptors | – | 该source所使用的拦截器,多个用空格分开 | interceptors.* | | 拦截器的相关属性 | compression-type | none | 可选值: none 或 deflate 。这个类型必须跟Avro Source相匹配 | ssl | false | 设置为 true 可启用SSL加密,如果为true必须指定下面的 keystore 和 keystore-password 。 | keystore | – | SSL加密使用的Java keystore文件路径 | keystore-password | – | Java keystore的密码 | keystore-type | JKS | Java keystore的类型. 可选值有 JKS 、 PKCS12 。 | exclude-protocols | SSLv3 | 指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除 | ipFilter | false | 设置为true可启用ip过滤(netty方式的avro) | ipFilterRules | – | netty ipFilter的配置(参考下面的ipFilterRules详细介绍和例子) |
配置范例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
2. Spool:Spool监测配置的目录下新增的文件,并将文件中的数据读取出来
这个Source允许你把要收集的文件放入磁盘上的某个指定目录。它会将监视这个目录中产生的新文件,并在新文件出现时从新文件中解析数据出来。数据解析逻辑是可配置的。在新文件被完全读入Channel之后会重命名该文件以示完成(也可以配置成读完后立即删除)。
与Exec Source不同,Spooling Directory Source是可靠的,即使Flume重新启动或被kill,也不会丢失数据。同时作为这种可靠性的代价,指定目录中的文件必须是不可变的、唯一命名的。Flume会自动检测避免这种情况发生,如果发现问题,则会抛出异常:
- 如果文件在写入完成后又被再次写入新内容,Flume将向其日志文件(这是指Flume自己logs目录下的日志文件)打印错误并停止处理。
- 如果在以后重新使用以前的文件名,Flume将向其日志文件打印错误并停止处理。
为了避免上述问题,生成新文件的时候文件名加上时间戳是个不错的办法。
属性名 | 默认值 | 解释 |
---|
channels | – | 与Source绑定的channel,多个用空格分开 | type | – | 组件类型,这个是: spooldir . | spoolDir | – | Flume Source监控的文件夹目录,该目录下的文件会被Flume收集 | fileSuffix | .COMPLETED | 被Flume收集完成的文件被重命名的后缀。1.txt被Flume收集完成后会重命名为1.txt.COMPLETED | deletePolicy | never | 是否删除已完成收集的文件,可选值: never 或 immediate | fileHeader | false | 是否添加文件的绝对路径名(绝对路径+文件名)到header中。 | fileHeaderKey | file | 添加绝对路径名到header里面所使用的key(配合上面的fileHeader一起使用) | basenameHeader | false | 是否添加文件名(只是文件名,不包括路径)到header 中 | basenameHeaderKey | basename | 添加文件名到header里面所使用的key(配合上面的basenameHeader一起使用) | includePattern | ^.*$ | 指定会被收集的文件名正则表达式,它跟下面的ignorePattern不冲突,可以一起使用。如果一个文件名同时被这两个正则匹配到,则会被忽略,换句话说ignorePattern的优先级更高 | ignorePattern | ^$ | 指定要忽略的文件名称正则表达式。它可以跟 includePattern 一起使用,如果一个文件被 ignorePattern 和 includePattern 两个正则都匹配到,这个文件会被忽略。 | trackerDir | .flumespool | 用于存储与文件处理相关的元数据的目录。如果配置的是相对目录地址,它会在spoolDir中开始创建 | consumeOrder | oldest | 设定收集目录内文件的顺序。默认是“先来先走”(也就是最早生成的文件最先被收集),可选值有: oldest 、 youngest 和 random 。当使用oldest和youngest这两种选项的时候,Flume会扫描整个文件夹进行对比排序,当文件夹里面有大量的文件的时候可能会运行缓慢。 当使用random时候,如果一直在产生新的文件,有一部分老文件可能会很久才会被收集 | pollDelay | 500 | Flume监视目录内新文件产生的时间间隔,单位:毫秒 | recursiveDirectorySearch | false | 是否收集子目录下的日志文件 | maxBackoff | 4000 | 等待写入channel的最长退避时间,如果channel已满实例启动时会自动设定一个很低的值,当遇到ChannelException异常时会自动以指数级增加这个超时时间,直到达到设定的这个最大值为止。 | batchSize | 100 | 每次批量传输到channel时的size大小 | inputCharset | UTF-8 | 解析器读取文件时使用的编码(解析器会把所有文件当做文本读取) | decodeErrorPolicy | FAIL | 当从文件读取时遇到不可解析的字符时如何处理。 FAIL :抛出异常,解析文件失败; REPLACE :替换掉这些无法解析的字符,通常是用U+FFFD; IGNORE :忽略无法解析的字符。 | deserializer | LINE | 指定一个把文件中的数据行解析成Event的解析器。默认是把每一行当做一个Event进行解析,所有解析器必须实现EventDeserializer.Builder接口 | deserializer.* | | 解析器的相关属性,根据解析器不同而不同 | bufferMaxLines | – | (已废弃) | bufferMaxLineLength | 5000 | (已废弃)每行的最大长度。改用 deserializer.maxLineLength 代替 | selector.type | replicating | 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用 | selector.* | | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | interceptors | – | 该source所使用的拦截器,多个用空格分开 | interceptors.* | | 拦截器相关的属性配置 |
配置范例:
a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
3. Exec:EXEC执行一个给定的命令获得源的变化。
这个source在启动时运行给定的Unix命令,并期望该进程在标准输出上连续生成数据(stderr 信息会被丢弃,除非属性 logStdErr 设置为 true )。 如果进程因任何原因退出, 则source也会退出并且不会继续生成数据。 综上来看cat [named pipe]或tail -F [file]这两个命令符合要求可以产生所需的结果,而date这种命令可能不会,因为前两个命令(tail 和 cat)能产生持续的数据流,而后者(date这种命令)只会产生单个Event并退出。
属性 | 默认值 | 解释 |
---|
channels | – | 与Source绑定的channel,多个用空格分开 | type | – | 组件类型,这个是: exec | command | – | 所使用的系统命令,一般是cat 或者tail | shell | – | 设置用于运行命令的shell。 例如 / bin / sh -c。 仅适用于依赖shell功能的命令,如通配符、后退标记、管道等。 | restartThrottle | 10000 | 尝试重新启动之前等待的时间(毫秒) | restart | false | 如果执行命令线程挂掉,是否重启 | logStdErr | false | 是否会记录命令的stderr内容 | batchSize | 20 | 读取并向channel发送数据时单次发送的最大数量 | batchTimeout | 3000 | 向下游推送数据时,单次批量发送Event的最大等待时间(毫秒),如果等待了batchTimeout毫秒后未达到一次批量发送数量,则仍然执行发送操作。 | selector.type | replicating | 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用 | selector.* | | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | interceptors | – | 该source所使用的拦截器,多个用空格分开 | interceptors.* | | 拦截器相关的属性配置 |
配置范例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
4. Multiport Syslog TCP Source
这是一个增强版的 Syslog TCP Source ,它更新、更快、支持监听多个端口。因为支持了多个端口,port参数已经改为了ports。这个Source使用了Apache mina(一个异步通信的框架,同netty类似)来实现。 提供了对RFC-3164和许多常见的RFC-5424格式消息的支持。 支持每个端口配置不同字符集。
属性 | 默认值 | 解释 |
---|
channels | – | 与Source绑定的channel,多个用空格分开 | type | – | 组件类型,这个是:multiport_syslogtcp | host | – | 要监听的hostname或者IP地址 | ports | – | 一个或多个要监听的端口,多个用空格分开 | eventSize | 2500 | 解析成Event的每行数据的最大字节数 | keepFields | none | 是否保留syslog消息头中的一些属性到Event中,可选值 all 、none 或自定义指定保留的字段,如果设置为all,则会保留Priority, Timestamp 和Hostname三个属性到Event中。 也支持单独指定保留哪些属性(支持的属性有:priority, version, timestamp, hostname),用空格分开即可。现在已经不建议使用 true 和 false ,建议改用 all 和 none 了。 | portHeader | – | 如果配置了这个属性值,端口号会被存到每个Event的header里面用这个属性配置的值当key。这样就可以在拦截器或者channel选择器里面根据端口号来自定义路由Event的逻辑。 | charset.default | UTF-8 | 解析syslog使用的默认编码 | charset.port. | – | 针对具体某一个端口配置编码 | batchSize | 100 | 每次请求尝试处理的最大Event数量,通常用这个默认值就很好。 | readBufferSize | 1024 | 内部Mina通信的读取缓冲区大小,用于性能调优,通常用默认值就很好。 | numProcessors | (自动分配) | 处理消息时系统使用的处理器数量。 默认是使用Java Runtime API自动检测CPU数量。 Mina将为每个检测到的CPU核心生成2个请求处理线程,这通常是合理的。 | selector.type | replicating | 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用 | selector.* | – | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | interceptors | – | 该source所使用的拦截器,多个用空格分开 | interceptors.* | | 拦截器相关的属性配 |
配置范例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port
5. JSONHandler:接受json格式数据。
这是HTTP Source的默认解析器(handler),根据请求所使用的编码把http请求中json格式的数据解析成Flume Event数组(不管是一个还是多个,都以数组格式进行存储), 如果未指定编码,默认使用UTF-8编码。这个handler支持UTF-8、UTF-16和UTF-32编码。
json数据格式如下
[{
"headers" : {
"timestamp" : "434324343",
"host" : "random_host.example.com"
},
"body" : "random_body"
},
{
"headers" : {
"namenode" : "namenode.example.com",
"datanode" : "random_datanode.example.com"
},
"body" : "really_random_body"
}]
在一个POST请求中发送的所有 Event 视为一个批处理,并在一个事务中插入到 channel。
属性 | 默认值 | 解释 |
---|
channels | – | 与Source绑定的channel,多个用空格分开 | type | | 组件类型,这个是: http | port | – | 要监听的端口 | bind | 0.0.0.0 | 要监听的hostname或者IP地址 | handler | org.apache.flume.source.http.JSONHandler | 所使用的handler,需填写handler的全限定类名 | handler.* | – | handler的一些属性配置 | selector.type | replicating | 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用 | selector.* | | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | interceptors | – | 该source所使用的拦截器,多个用空格分开 | interceptors.* | | 拦截器相关的属性配 | enableSSL | false | 设置为true启用SSL,HTTP Source不支持SSLv3协议 | excludeProtocols | SSLv3 | 指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除 | keystore | | keystore 文件的位置 | keystorePassword | | Keystore 的密码 |
6. HDFS Sink:配置Hadoop接受数据。
这个Sink将Event写入Hadoop分布式文件系统(也就是HDFS)。 目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据写入的时间、文件大小或Event数量定期滚动文件(关闭当前文件并创建新文件)。 它还可以根据Event自带的时间戳或系统时间等属性对数据进行分区。 存储文件的HDFS目录路径可以使用格式转义符,会由HDFS Sink进行动态地替换,以生成用于存储Event的目录或文件名。 使用此Sink需要安装hadoop, 以便Flume可以使用Hadoop的客户端与HDFS集群进行通信。 注意, 需要使用支持sync() 调用的Hadoop版本 。
属性名 | 默认值 | 解释 |
---|
channel | – | 与 Sink 连接的 channel | type | – | 组件类型,这个是: hdfs | hdfs.path | – | HDFS目录路径(例如:hdfs://namenode/flume/webdata/) | hdfs.filePrefix | FlumeData | Flume在HDFS文件夹下创建新文件的固定前缀 | hdfs.fileSuffix | – | Flume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置) | hdfs.inUsePrefix | – | Flume正在写入的临时文件前缀,默认没有 | hdfs.inUseSuffix | .tmp | Flume正在写入的临时文件后缀 | hdfs.rollInterval | 30 | 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒 | hdfs.rollSize | 1024 | 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节 | hdfs.rollCount | 10 | 当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件) | hdfs.idleTimeout | 0 | 关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒 | hdfs.batchSize | 100 | 向 HDFS 写入内容时每次批量操作的 Event 数量 | hdfs.codeC | – | 压缩算法。可选值:gzip 、 bzip2 、 lzo 、 lzop 、 ``snappy` | hdfs.fileType | SequenceFile | 文件格式,目前支持: SequenceFile 、 DataStream 、 CompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数 | hdfs.maxOpenFiles | 5000 | 允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭 | hdfs.minBlockReplicas | – | 指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。 | hdfs.writeFormat | Writable | 文件写入格式。可选值: Text 、 Writable 。在使用 Flume 创建数据文件之前设置为 Text ,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件。 | hdfs.callTimeout | 10000 | 允许HDFS操作文件的时间,比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。(毫秒) | hdfs.threadsPoolSize | 10 | 每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等) | hdfs.rollTimerPoolSize | 1 | 每个HDFS Sink实例调度定时文件滚动的线程数 | hdfs.kerberosPrincipal | – | 用于安全访问 HDFS 的 Kerberos 用户主体 | hdfs.kerberosKeytab | – | 用于安全访问 HDFS 的 Kerberos keytab 文件 | hdfs.proxyUser | | 代理名 | hdfs.round | false | 是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符) | hdfs.roundValue | 1 | 向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30 | hdfs.roundUnit | second | 向下舍入的单位,可选值: second 、 minute 、 hour | hdfs.timeZone | Local Time | 解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai | hdfs.useLocalTimeStamp | false | 使用日期时间转义符时是否使用本地时间戳(而不是使用 Event header 中自带的时间戳) | hdfs.closeTries | 0 | 开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀;如果设置为0,Sink会一直尝试重命名文件直到成功为止;关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。 | hdfs.retryInterval | 180 | 连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。 | serializer | TEXT | Event 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。 | serializer.* | | 根据上面 serializer 配置的类型来根据需要添加序列化器的参数 |
配置范例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
7. File Roll Sink:配置本地文件接受数据。
属性 | 默认值 | 解释 |
---|
channel | – | 与 Sink 绑定的 channel | type | – | 组件类型,这个是: file_roll . | sink.directory | – | Event 将要保存的目录 | sink.pathManager | DEFAULT | 配置使用哪个路径管理器,这个管理器的作用是按照规则生成新的存储文件名称,可选值有: default 、 rolltime 。default规则:prefix+当前毫秒值+“-”+文件序号+“.”+extension;rolltime规则:prefix+yyyyMMddHHmmss+“-”+文件序号+“.”+extension;注:prefix 和 extension 如果没有配置则不会附带 | sink.pathManager.extension | – | 如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的扩展名 | sink.pathManager.prefix | – | 如果上面的 pathManager 使用默认的话,可以用这个属性配置存储文件的文件名的固定前缀 | sink.rollInterval | 30 | 表示每隔30秒创建一个新文件进行存储。如果设置为0,表示所有 Event 都会写到一个文件中。 | sink.serializer | TEXT | 配置 Event 序列化器,可选值有:text 、 header_and_text 、 avro_event 或者自定义实现了 EventSerializer.Builder 接口的序列化器的全限定类名.。 text 只会把 Event 的 body 的文本内容序列化; header_and_text 会把 header 和 body 内容都序列化。 | batchSize | 100 | 每次请求批处理的 Event 数 |
配置范例:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
|