1、Flume概述 ?? ?1、flume是什么? ?? ??? ?flume是海量日志收集、聚合、传输系统 ?? ?2、flume基础架构 ?? ??? ?Agent: JVM进程,启动flume采集数据的时候启动的就是agent进程 ?? ??? ??? ?Source: 定义从哪里采集数据 ?? ??? ??? ?Channel: Source和sink之间的缓冲区,解耦 ?? ??? ??? ??? ?Memory channel: ?? ??? ??? ??? ??? ?数据保存在内存 ?? ??? ??? ??? ??? ?读写速度快 ?? ??? ??? ??? ??? ?数据不安全 ?? ??? ??? ??? ??? ?容量较小 ?? ??? ??? ??? ?File channel:? ?? ??? ??? ??? ??? ?数据保存在磁盘 ?? ??? ??? ??? ??? ?读写速度比较慢 ?? ??? ??? ??? ??? ?数据安全 ?? ??? ??? ??? ??? ?容量较大 ?? ??? ??? ?Sink: 定义数据保存到哪里 ?? ??? ?Event:source采集数据之后会封装成Event,Event分为Header、Body两部分 ?? ??? ??? ??? ?Header: 定义数据信息[自定义信息] ?? ??? ??? ??? ?Body: 存储数据 ?? ?3、Flume入门 ?? ??? ?Source: ?? ??? ??? ?netcat tcp source【监听端口数据】常用属性: ?? ??? ??? ??? ?type: netcat ?? ??? ??? ??? ?bind: 监听哪个主机 ?? ??? ??? ??? ?port: 监听哪个端口 ?? ??? ??? ?exec source【监听单个追加文件】常用属性: ?? ??? ??? ??? ?type: exec ?? ??? ??? ??? ?command: tail -F 文件路径 ?? ??? ??? ??? ?batchSize: 定义每批次采集多少数据, 必须<=事务容量大小 ?? ??? ??? ?exec source的缺点:exec的tail -F命令只会显示文件的最后N行,如果flume在集采的过程中宕机,在宕机的过程中向文件中写入了大量的数据,后续Flume启动之后,因为只会采集最后N行的数据,所以可能出现数据丢失 ?? ??? ??? ? ?? ??? ??? ?Spooling Directory Source【监听目录下新增文件】 ?? ??? ??? ??? ?Spooling Directory Source的缺点: ?? ??? ??? ??? ??? ?1、只能监听目录下新增文件,如果某一个文件已经采集完成,后续再向该文件中写入新内容,flume不会进行采集,采集后的文件会被标记为compaired ?? ??? ??? ??? ??? ?2、不能断点续传,(没有采集完成不会被标记) ?? ??? ??? ??? ?Spooling Directory Source常用属性: ?? ??? ??? ??? ??? ?type:spooldir ?? ??? ??? ??? ??? ?spoolDir: 监听的目录 ?? ??? ??? ??? ??? ?includePattern: 通过正则表达式监听符合要求的文件 ?? ??? ??? ??? ??? ?batchSize:定义每批次采集多少数据, 必须<=事务容量大小 ?? ??? ??? ? ?? ??? ??? ?Taildir Source【监听目录下新增文件以及追加文件】(多目录) ?? ??? ??? ??? ?Taildir Source优点: ?? ??? ??? ??? ??? ?1、监听目录下新增文件以及追加文件(维护了一个断点续传的json文件) ?? ??? ??? ??? ??? ?2、能够断点续传 ?? ??? ??? ??? ? ?? ??? ??? ??? ?Taildir Source常用属性: ?? ??? ??? ??? ??? ?type: TAILDIR ?? ??? ??? ??? ??? ?filegroups: 定义文件组的名称,多个文件组名称通过空格分隔 ?? ??? ??? ??? ??? ?filegroups.<filegroupName> : 定义对应的文件组监听的目录、文件 ?? ??? ??? ??? ??? ?positionFile: 记录上一次采集到了文件的哪个位置的文件 ?? ??? ??? ??? ??? ?batchSize:定义每批次采集多少数据, 必须<=事务容量大小 ?? ??? ??? ??? ? ?? ??? ??? ?avro source[一般flume传递数据给flume的时候使用]常用属性: ?? ??? ??? ??? ??? ?type: avro ?? ??? ??? ??? ??? ?bind: 监听哪个主机 ?? ??? ??? ??? ??? ?port: 监听哪个端口 ?? ??? ??? ?avro sink[发送数据到其他的flume]常用属性: ?? ??? ??? ??? ?type: avro ?? ??? ??? ??? ?hostname: 数据发送到哪个主机 ?? ??? ??? ??? ?port: 数据发送到哪个端口 ?? ??? ?Channel: ?? ??? ??? ?memory channel 常用属性: ?? ??? ??? ??? ?type: memory ?? ??? ??? ??? ?capacity: channel的容量大小 ?? ??? ??? ??? ?transactionCapacity: 事务容量大小,必须<=capacity ?? ??? ?Sink: ?? ??? ??? ?HDFS sink[将数据保存到HDFS] 常用属性: ?? ??? ??? ??? ?type: hdfs ?? ??? ??? ??? ?hdfs.path: 指定数据保存到HDFS哪个路径 ?? ??? ??? ??? ?hdfs.filePrefix: 指定文件的前缀 ?? ??? ??? ??? ?hdfs.rollInterval: 指定间隔多久滚动生成一个新文件,后续数据项文件中写入不会再向老文件中写(往tmp文件里写,当写完时生成文件) ?? ??? ??? ??? ?hdfs.rollSize: 指定文件多大之后滚动生成一个新文件,后续数据项文件中写入不会再向老文件中写,在工作中设置的时候要比128M稍微小一点 ?? ??? ??? ??? ?hdfs.rollCount: 指定向文件中写入多少个Event之后滚动生成一个新文件,后续数据项文件中写入不会再向老文件中写,在工作中一般设置为0禁用 ?? ??? ??? ??? ?hdfs.batchSize: 定义sink每个批次从channel拉取多少数据,必须<=事务容量大小 ?? ??? ??? ??? ?hdfs.codeC: 定义数据保存到HDFS的时候使用哪种压缩[gzip, bzip2, lzo, lzop, snappy] ?? ??? ??? ??? ?hdfs.fileType: 定义数据保存到HDFS的是以哪种文件格式保存[SequenceFile:二进制文件, DataStream:文本文件, CompressedStream:压缩文件] ?? ??? ??? ??? ?hdfs.round: 指定是否按照指定的时间间隔生成文件夹 ?? ??? ??? ??? ?hdfs.roundValue: 指定时间间隔的值 ?? ??? ??? ??? ?hdfs.roundUnit: 指定时间单位 ?? ??? ??? ??? ?hdfs.userLocalTimeStamp: 是否使用本地时间戳(格式化时间的时候使用) ?? ??? ??? ? ?? ??? ??? ?Logger Sink【数据保存到日志中】: 控制台 ?? ??? ??? ??? ?type: logger ?? ??? ??? ? ?? ??? ??? ??? ? ?? ??? ??? ? ?? ??? ??? ?File Roll Sink【数据保存到本地磁盘】常用属性: ?? ??? ??? ??? ?type: file_roll ?? ??? ??? ??? ?sink.directory: 数据保存的路径,这个地址需要我们手动创建好。 ?? ??? ??? ??? ?sink.batchSize: 定义sink每个批次从channel拉取多少数据,必须<=事务容量大小(传向hdfs) ?? ??? ?3、flume进阶: ?? ??? ??? ?1、flume事务 ?? ??? ??? ??? ?原因:flume的source采集数据之后,如果不做控制,在source->channel以及channel->sink这中间是有可能出现数据丢失的。所以需要用事务进行控制 ?? ??? ??? ??? ?flume的事务分为两种: source->channel的put事务,channel->sink的take事务(channel只是作为一个缓存的单元) ?? ??? ??? ??? ??? ?put事务流程:? ?? ??? ??? ??? ??? ??? ?1、Source采集一个批次的数据 ?? ??? ??? ??? ??? ??? ?2、通过doPut方法将批次的数据放入putList[putList的大小=事务容量大小]中 ?? ??? ??? ??? ??? ??? ?3、putList有容量限制不能无限存放数据,所以当putList满了之后会将putList里面的所有数据通过doCommit保存到channel里面 ?? ??? ??? ??? ??? ??? ?4、如果channel中空间不足,此时会保存失败,失败的时候事务会回滚[会清空putList里面的所有数据,然后抛出异常,source捕获到异常之后会重新采集] ?? ??? ??? ??? ??? ?take事务流程: ?? ??? ??? ??? ??? ??? ?1、sink从channel中拉取一个批次的数据【拉取的是数据的引用 指针】 ?? ??? ??? ??? ??? ??? ?2、通过doTake方法将批次的数据写入takeList[takeList的大小=事务容量大小]里面 ?? ??? ??? ??? ??? ??? ?3、通过doCommit方法将takeList里面的数据写入存储介质,比如HDFS ?? ??? ??? ??? ??? ??? ?4、如果写入报错,此时会回滚[清空takeList里面的数据],如果写入成功,清空takeList里面的数据,同时也会清除channel里面的已经写入成功数据 ?? ??? ??? ?2、agent内部原理 ?? ??? ??? ??? ?Source-> ?? ??? ??? ??? ??? ?Channel processor-> ?? ??? ??? ??? ??? ??? ?拦截器-> ?? ??? ??? ??? ??? ??? ??? ?通过channel selector确定数据应该发到哪个channel里面->channel-> ?? ??? ??? ??? ??? ??? ??? ?通过sink processor确定数据应该发到哪个sink里面->sink ?? ??? ??? ??? ??? ?channel selector的种类:(确定将哪种配置信息发送到channel里面) ?? ??? ??? ??? ??? ??? ?1、Replicating Channel Selector: 将Source中的每个数据都向所有的channel都发送一份 ?? ??? ??? ??? ??? ??? ?2、Multiplexing Channel Selector: 将source中的数据指定发到哪个或者那几个channel中(多路复用) ?? ??? ??? ??? ??? ?sink processor的种类: ?? ??? ??? ??? ??? ??? ?一个channel对应一个sink的时候使用(a1.sinkgroups.g1.processor.type = failover) ?? ??? ??? ??? ??? ??? ??? ?1、Default Sink Processor:? ?? ??? ??? ??? ??? ??? ?一个channel对应多个sinkd的时候使用 ?? ??? ??? ??? ??? ??? ??? ?2、Failover Sink Processor: 故障转移,channel中的数据首先向某一个sink写入,直到该sink宕机才会想第二个sink写入 ?? ??? ??? ??? ??? ??? ??? ?3、Load balancing Sink Processor: 负载均衡, 数据均衡的写入不同的sink(不是完全平均的哦)
|