IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flume的应用场景 -> 正文阅读

[大数据]flume的应用场景

log4j +fastjson 模拟日志

log4j : 日志级别 INFO ,DEBUG,WARN,ERROR,FATAL
log4j.appender.R.File={flume}
#java 代码详情见

#lunix 环境下执行jar文件
java -jar jar包 /root/data/flume 配置文件名加路径

-----------------------flume ---------------------------------------------------------------

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

系统功能编辑

日志收集

Flume最早是Cloudera提供的日志收集系统,目前是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据。
数据处理
Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力 。Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统),支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力。

工作方式编辑

Flume-og采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。Flume Master间使用gossip协议同步数据。
Flume-ng最明显的改动就是取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。Flume-ng另一个主要的不同点是读入数据和写出数据现在由不同的工作线程处理(称为 Runner)。 在 Flume-og 中,读入线程同样做写出工作(除了故障重试)。如果写出慢的话(不是完全失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。

Event 
SOUCER 		=> 		CHannel	 =>		SINk
NETCAT(ip+port) 	 memory 		logger
SPOOLDIR(dir)		file			hdfs
TAILDIR(dir)						kafka



SOUCER:NETACT,SHANEL:MEMORT,SINK:LOGGER

vim flume01.cnf

配置文件

#组件声明
a1.sources = s1 
a1.channels = c1
a1.sinks = k1

#初始化数据源
a1.sources.s1.type = netcat
a1.sources.s1.bind = 192.168.92.200
a1.sources.s1.port = 6666
#以bak结尾的忽略掉
a1.sources.s1.ignorePasttern = ^(.)*\\.bak$
#添加后缀
a1.sources.s1.fileSuffix = .bak

#初始化通道
a1.channels.c1.type = memory
#
a1.channels.c1.checkpointDir= /opt/software/flume190/mydata/checkpoint
a1.channels.c1.dataDirs= /opt/software/flume190/mydata/data
a1.channels.c1.capacity =100
a1.channels.c1.transactionCapacity = 10

#初始化数据槽
a1.sinks.k1.type = logger 

#关键组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1



执行命令:flume-ng agent -n a1 -c conf/ -f /root/script/flume_job/logger.cnf -Dflume.root.logger=INFO,console

再另一个窗口 输入  nc -v 192.168.92.200 6666

配置文件

#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#初始化数据源
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /root/script/mu
a1.sources.s1.ignorePattern = ^(.)*\\.bak$
a1.sources.s1.fileSuffix = .bak

#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume190/mydata/checkpoint
a1.channels.dataDirs = /opt/software/flume190/mydata/data
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.92.200:9820/flume/events/fakeorder/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockreplicas = 1

#关键组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1




执行命令:flume-ng agent -n a1 -c conf/ -f /root/script/flume_job/logger.cnf -Dflume.root.logger=INFO,console

------------------------------------ TAILDIR----------------------------------------------------------------------------------

tailDir:可以说是spooling directory source+execSource的结合体。可以可以监控多个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。

配置文件

#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#初始化数据源
a1.sources.s1.type = TAILDIR
a1.sources.s1.filegroups = f1 f2
a1.sources.s1.filegroups.f1 = /root/script/mu/tail01/prolog.*\\.log
a1.sources.s1.filegroups.f2 = /root/script/mu/tail02/prolog.*\\.log
a1.sources.s1.positionFile = /opt/software/flume190/data/taildir/taildir_position.json
#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume190/mydata/checkpoint
a1.channels.dataDirs = /opt/software/flume190/mydata/data
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.92.200:9820/flume/events/tailevent/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockreplicas = 1

#关键组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1


执行命令:flume-ng agent -n a1 -c conf/ -f /root/script/flume_job/tail01.log -Dflume.root.logger=INFO,console

--------------------------------- avro --------------------------------------------------------------------------------------------

avro:侦听Avro端口并从外部Avro客户端流接收事件。 当与另一个(上一跳)Flume代理上的内置Avro Sink配对时,它可以创建分层集合拓扑。
flume-ng avro-client -H mch -p 7777 -c /conf -F /root/script/sqp_hive.sh

配置文件

#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#初始化数据源
a1.sources.s1.type = avro
a1.sources.s1.bind=192.168.92.200
a1.sources.s1.port = 7777 
a1.sources.s1.threads=5

#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume190/mydata/checkpoint
a1.channels.dataDirs = /opt/software/flume190/mydata/data
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.92.200:9820/flume/events/avroevents/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockreplicas = 1

#关键组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1



起侦听:
flume-ng agent -name a1 -c /opt/software/flume190/conf/ -f /root/script/flume_job/logger.cnf -Dflume.root.logger=INFO,console
执行命令:	
flume-ng avro-client -H 192.168.92.200 -p 7777 -c /opt/software/flume190/conf/  -F /root/script/mu/tail01/prologhead10000.log

--------------------logger sink 和 hdf sink ------------------------------------------------------------------------------------------------

#flume hvie sink 要求hive的表格式
hive table 

parition 
bucket 
orc
#netstat -nl |grep 9083 : metastore 服务


#flume 对hive hcatalog依赖 
 cp /opt/software/hive312/hcatalog/share/hcatalog/*  ./ 
# 开启hive 事务支持
set hive.support.concurrency = true; 
set hive.enforce.bucketing = true; 
set hive.exec.dynamic.partition.mode = nonstrict; 
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager ; 
set hive.compactor.initiator.on = true; 
set hive.compactor.worker.threads =1 ; 

###建表	
create  table familyinfo (
family_id int, 
family_name string, 
family_age int ,
family_gender string 
)
partitioned by (intime string )
clustered by (family_gender) into 2 buckets 
row format delimited 
fields terminated by ','
lines terminated by '\n'
stored as orc 
tblproperties('transactional'='true' );
#根据当前日期时间手动添加分区
alter table familyinfo add partition(intime ='21-07-05-16')

配置文件

#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#taildir source
a1.sources.s1.type=taildir
a1.sources.s1.filegroups=f1
a1.sources.s1.filegroups.f1=/root/script/mu/tail03/logger.log
a1.sources.s1.positionFile=/root/data/flume/taildir/taildir_positio.json
a1.sources.s1.batchSize=10

#file channel 
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/software/flume190/mydata/q
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity=100
a1.channels.c1.transactionCapacity=10
#hive sink
a1.sinks.k1.type=hive
a1.sinks.k1.hive.metastore=thrift://192.168.92.200:9083
a1.sinks.k1.hive.database=testZZ
a1.sinks.k1.hive.table=familyinfo
a1.sinks.k1.hive.partition=%y-%m-%d-%H
a1.sinks.k1.useLocalTimeStamp=true
a1.sinks.k1.autoCreatePartitions=false
a1.sinks.k1.round=true
a1.sinks.k1.batchSize=10
a1.sinks.k1.roundValue=10
a1.sinks.k1.roundUnit=minute
a1.sinks.k1.serializer=DELIMITED
a1.sinks.k1.serializer.delimited=','
a1.sinks.k1.serializer.serdeSeparator=','
a1.sinks.k1.serializer.fieldnames=family_id,family_name,family_age,family_gender
#关联组件
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1


flume-ng agent -n a1 -c conf/ -f /root/script/mu/tail1.log -Dflume.root.logger=INFO,console

-------------------------hbase sink --------------------------------------------------------------------------------------------------------------

#创建hbase
create 'test:stuflumehbasehbasesink','base'

配置文件

#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#taildir source
a1.sources.s1.type=taildir
a1.sources.s1.filegroups=f1
a1.sources.s1.filegroups.f1=/root/script/mu/tail03/logger.log
a1.sources.s1.positionFile=/root/data/flume/taildir/taildir_positio.json
a1.sources.s1.batchSize=10

#file channel 
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/software/flume190/mydata/m
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity=100
a1.channels.transactionCapacity=10
#hbase sinkp
a1.sinks.k1.type=hbase2
a1.sinks.k1.table = test:stuflumehbasehbasesink
a1.sinks.k1.columnFamily = base
a1.sinks.k1.serializer.regex = (.*),(.*),(.*),(.*)
a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
a1.sinks.k1.serializer.colNames = ROW_KEY,name,age,gender
a1.sinks.k1.serializer.rowKeyIndex = 0
a1.sinks.k1.batchSize = 10

#关联组件
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1

flume-ng agent -n a1 -c conf/ -f /root/script/flume_job/tail03.log -Dflume.root.logger=INFO,console
flume-ng agent -name a1 -c /opt/software/flume190/conf/ -f /root/script/flume_job/tail03.log -Dflume.root.logger=INFO,console
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-07 00:01:57  更:2021-07-07 00:02:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/22 12:14:08-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码