log4j +fastjson 模拟日志
log4j : 日志级别 INFO ,DEBUG,WARN,ERROR,FATAL log4j.appender.R.File={flume} #java 代码详情见
#lunix 环境下执行jar文件 java -jar jar包 /root/data/flume 配置文件名加路径
-----------------------flume ---------------------------------------------------------------
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
系统功能编辑
日志收集
Flume最早是Cloudera提供的日志收集系统,目前是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据。 数据处理 Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力 。Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统),支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力。
工作方式编辑
Flume-og采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。Flume Master间使用gossip协议同步数据。 Flume-ng最明显的改动就是取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。Flume-ng另一个主要的不同点是读入数据和写出数据现在由不同的工作线程处理(称为 Runner)。 在 Flume-og 中,读入线程同样做写出工作(除了故障重试)。如果写出慢的话(不是完全失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。
Event
SOUCER => CHannel => SINk
NETCAT(ip+port) memory logger
SPOOLDIR(dir) file hdfs
TAILDIR(dir) kafka
SOUCER:NETACT,SHANEL:MEMORT,SINK:LOGGER
vim flume01.cnf
配置文件
#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#初始化数据源
a1.sources.s1.type = netcat
a1.sources.s1.bind = 192.168.92.200
a1.sources.s1.port = 6666
#以bak结尾的忽略掉
a1.sources.s1.ignorePasttern = ^(.)*\\.bak$
#添加后缀
a1.sources.s1.fileSuffix = .bak
#初始化通道
a1.channels.c1.type = memory
#
a1.channels.c1.checkpointDir= /opt/software/flume190/mydata/checkpoint
a1.channels.c1.dataDirs= /opt/software/flume190/mydata/data
a1.channels.c1.capacity =100
a1.channels.c1.transactionCapacity = 10
#初始化数据槽
a1.sinks.k1.type = logger
#关键组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
执行命令:flume-ng agent -n a1 -c conf/ -f /root/script/flume_job/logger.cnf -Dflume.root.logger=INFO,console
再另一个窗口 输入 nc -v 192.168.92.200 6666
配置文件
#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#初始化数据源
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /root/script/mu
a1.sources.s1.ignorePattern = ^(.)*\\.bak$
a1.sources.s1.fileSuffix = .bak
#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume190/mydata/checkpoint
a1.channels.dataDirs = /opt/software/flume190/mydata/data
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.92.200:9820/flume/events/fakeorder/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockreplicas = 1
#关键组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
执行命令:flume-ng agent -n a1 -c conf/ -f /root/script/flume_job/logger.cnf -Dflume.root.logger=INFO,console
------------------------------------ TAILDIR----------------------------------------------------------------------------------
tailDir:可以说是spooling directory source+execSource的结合体。可以可以监控多个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。
配置文件
#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#初始化数据源
a1.sources.s1.type = TAILDIR
a1.sources.s1.filegroups = f1 f2
a1.sources.s1.filegroups.f1 = /root/script/mu/tail01/prolog.*\\.log
a1.sources.s1.filegroups.f2 = /root/script/mu/tail02/prolog.*\\.log
a1.sources.s1.positionFile = /opt/software/flume190/data/taildir/taildir_position.json
#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume190/mydata/checkpoint
a1.channels.dataDirs = /opt/software/flume190/mydata/data
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.92.200:9820/flume/events/tailevent/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockreplicas = 1
#关键组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
执行命令:flume-ng agent -n a1 -c conf/ -f /root/script/flume_job/tail01.log -Dflume.root.logger=INFO,console
--------------------------------- avro --------------------------------------------------------------------------------------------
avro:侦听Avro端口并从外部Avro客户端流接收事件。 当与另一个(上一跳)Flume代理上的内置Avro Sink配对时,它可以创建分层集合拓扑。
flume-ng avro-client -H mch -p 7777 -c /conf -F /root/script/sqp_hive.sh
配置文件
#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#初始化数据源
a1.sources.s1.type = avro
a1.sources.s1.bind=192.168.92.200
a1.sources.s1.port = 7777
a1.sources.s1.threads=5
#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume190/mydata/checkpoint
a1.channels.dataDirs = /opt/software/flume190/mydata/data
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.92.200:9820/flume/events/avroevents/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockreplicas = 1
#关键组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
起侦听:
flume-ng agent -name a1 -c /opt/software/flume190/conf/ -f /root/script/flume_job/logger.cnf -Dflume.root.logger=INFO,console
执行命令:
flume-ng avro-client -H 192.168.92.200 -p 7777 -c /opt/software/flume190/conf/ -F /root/script/mu/tail01/prologhead10000.log
--------------------logger sink 和 hdf sink ------------------------------------------------------------------------------------------------
#flume hvie sink 要求hive的表格式
hive table
parition
bucket
orc
#netstat -nl |grep 9083 : metastore 服务
#flume 对hive hcatalog依赖
cp /opt/software/hive312/hcatalog/share/hcatalog/* ./
# 开启hive 事务支持
set hive.support.concurrency = true;
set hive.enforce.bucketing = true;
set hive.exec.dynamic.partition.mode = nonstrict;
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager ;
set hive.compactor.initiator.on = true;
set hive.compactor.worker.threads =1 ;
###建表
create table familyinfo (
family_id int,
family_name string,
family_age int ,
family_gender string
)
partitioned by (intime string )
clustered by (family_gender) into 2 buckets
row format delimited
fields terminated by ','
lines terminated by '\n'
stored as orc
tblproperties('transactional'='true' );
#根据当前日期时间手动添加分区
alter table familyinfo add partition(intime ='21-07-05-16')
配置文件
#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#taildir source
a1.sources.s1.type=taildir
a1.sources.s1.filegroups=f1
a1.sources.s1.filegroups.f1=/root/script/mu/tail03/logger.log
a1.sources.s1.positionFile=/root/data/flume/taildir/taildir_positio.json
a1.sources.s1.batchSize=10
#file channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/software/flume190/mydata/q
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity=100
a1.channels.c1.transactionCapacity=10
#hive sink
a1.sinks.k1.type=hive
a1.sinks.k1.hive.metastore=thrift://192.168.92.200:9083
a1.sinks.k1.hive.database=testZZ
a1.sinks.k1.hive.table=familyinfo
a1.sinks.k1.hive.partition=%y-%m-%d-%H
a1.sinks.k1.useLocalTimeStamp=true
a1.sinks.k1.autoCreatePartitions=false
a1.sinks.k1.round=true
a1.sinks.k1.batchSize=10
a1.sinks.k1.roundValue=10
a1.sinks.k1.roundUnit=minute
a1.sinks.k1.serializer=DELIMITED
a1.sinks.k1.serializer.delimited=','
a1.sinks.k1.serializer.serdeSeparator=','
a1.sinks.k1.serializer.fieldnames=family_id,family_name,family_age,family_gender
#关联组件
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1
flume-ng agent -n a1 -c conf/ -f /root/script/mu/tail1.log -Dflume.root.logger=INFO,console
-------------------------hbase sink --------------------------------------------------------------------------------------------------------------
#创建hbase
create 'test:stuflumehbasehbasesink','base'
配置文件
#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#taildir source
a1.sources.s1.type=taildir
a1.sources.s1.filegroups=f1
a1.sources.s1.filegroups.f1=/root/script/mu/tail03/logger.log
a1.sources.s1.positionFile=/root/data/flume/taildir/taildir_positio.json
a1.sources.s1.batchSize=10
#file channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/software/flume190/mydata/m
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity=100
a1.channels.transactionCapacity=10
#hbase sinkp
a1.sinks.k1.type=hbase2
a1.sinks.k1.table = test:stuflumehbasehbasesink
a1.sinks.k1.columnFamily = base
a1.sinks.k1.serializer.regex = (.*),(.*),(.*),(.*)
a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
a1.sinks.k1.serializer.colNames = ROW_KEY,name,age,gender
a1.sinks.k1.serializer.rowKeyIndex = 0
a1.sinks.k1.batchSize = 10
#关联组件
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1
flume-ng agent -n a1 -c conf/ -f /root/script/flume_job/tail03.log -Dflume.root.logger=INFO,console
flume-ng agent -name a1 -c /opt/software/flume190/conf/ -f /root/script/flume_job/tail03.log -Dflume.root.logger=INFO,console
|