IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flume常用配置 -> 正文阅读

[大数据]flume常用配置

Flume简介:

??Apache Flume是一个分布式的、可靠的、可用的系统,用于有效地收集、聚合和将大量日志数据从许多不同的源移动到一个集中的数据存储。不仅仅局限于日志数据聚合。由于数据源是可定制的,Flume可以用于传输大量事件数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息和几乎所有可能的数据源。

数据流模型:

??Event是Flume定义的一个数据流传输的最小单元。Agent就是一个Flume的实例,本质是一个JVM进程,该JVM进程控制Event数据流从外部日志生产者那里传输到目的地(或者是下一个Agent)。
??Source消耗由外部(如Web服务器)传递给它的Event。外部以Flume Source识别的格式向Flume发送Event。例如,Avro Source 可接收从Avro客户端(或其他FlumeSink)接收Avro Event。用 Thrift Source 也可以实现类似的流程,接收的Event数据可以是任何语言编写的只要符合Thrift协议即可。
??Agent中的source和sink与channel存取Event是异步的。
??Flume的Source负责消费外部传递给它的数据(比如web服务器的日志)。外部的数据生产方以Flume Source识别的格式向Flume发送Event。

常用的flume配置

flume启动命令
flume-ng agent -n a1 -c /opt/software/flume/flume190/conf/ -f /root/script/flume_job/logconf/flume04.conf -Dflume.root.logger=INFO,console
SOURCE:NETCAT,CHANNEL:MEMORY,SINK:LOGGER
Netcat Source监听从ip:6666端口,它能识别的是带换行符的文本数据

#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#初始化数据源
a1.sources.s1.type = netcat
a1.sources.s1.bind = ip
a1.sources.s1.port = 6666

#初始化通道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10

#初始化数据槽
a1.sinks.k1.type = logger

#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
SOURCE:SPOOLDIR,CHANNEL:FILE,SINK:HDFS
Spool会将监视目录中产生的新文件,并在新文件出现时从新文件中解析数据出来
#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#初始化数据源
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /root/data/flume
a1.sources.s1.ignorePattern = ^(.)*\\.bak$
a1.sources.s1.fileSuffix = .bak

#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ip:9820/flume/events/fakeorder/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockreplicas = 1

#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
SOURCE:TAILDIR,CHANNEL:FILE,SINK:HDFS  
Taildir Source监控指定的一些文件,并在检测到新的一行数据产生的时候几乎实时地读取它们,如果新的一行数据还没写完,Taildir Source会等到这行写完后再读取。
#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#初始化数据源
a1.sources.s1.type = taildir
a1.sources.s1.filegroups = f1 f2
a1.sources.s1.filegroups.f1 = /root/data/flume/tail01/.*log
a1.sources.s1.filegroups.f2 = /root/data/flume/tail02/.*log
a1.sources.s1.positionFile=/opt/software/flume/flume190/data/tailder_position.conf

#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ip:9820/flume/events/tailevent/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockreplicas = 1

#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
SOURCE:AVRO,CHANNEL:FILE,SINK:HDFS
avro sources接收从外部Avro客户端发送来的数据流,需要额外的启动客户端命令如:flume-ng avro-client -H singlepower -p 7777 -c /opt/software/flume/flume190/conf/ -F /root/data/flume/prohead1000.copy
监听prohead1000.copy文件
#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#初始化数据源
a1.sources.s1.type = avro
a1.sources.s1.bind = ip
a1.sources.s1.port = 7777
a1.sources.s1.threads = 5

#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ip:9820/flume/events/avroevent/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockreplicas = 1

#关键组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
SOURCE:TAILDIR,CHANNEL:FILE,SINK:HIVE

#flume对hive hcatalog依赖
cp /opt/software/hive/hive312/hcatalog/share/hcatalog/*.jar /opt/software/flume/flume190/lib

#开启hive事务支持
SET hive.support.concurrency = true;
SET hive.enforce.bucketing = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on = true;
SET hive.compactor.worker.threads = 1;

#建一个测试表
create table familyinfo(
family_id int,
family_name string,
family_age int,
family_gender string
)
partitioned by(intime string)
clustered by(family_gender) into 2 buckets
row format delimited 
fields terminated by ','
lines terminated by '\n'
stored as orc
tblproperties('transactional'='true');

#根据当前日期时间手动添加分区
alter table familyinfo add partition(intime='21-07-05-15');

#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#初始化数据源
a1.sources.s1.type = taildir
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1 = /root/data/flume/tail03/.*.log
a1.sources.s1.positionFile = /opt/software/flume/flume190/data/taildr_position.conf
a1.sources.s1.batchSize = 10

#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint02
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10

#初始化数据槽
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore = thrift://ip:9083
a1.sinks.k1.hive.database = test
a1.sinks.k1.hive.table = familyinfo
a1.sinks.k1.hive.partition = %y-%m-%d-%H
a1.sinks.k1.useLocalTimeStamp = true
a1.sinks.k1.autoCreatePartitions = false
a1.sinks.k1.round = true
a1.sinks.k1.batchSize = 10
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ","
a1.sinks.k1.serializer.serdeSeparator = ','
a1.sinks.k1.serializer.fieldnames = family_id,family_name,family_age,family_gender

#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
SOURCE:TAILDIR,CHANNEL:FILE,SINK:HBASE

#创建hbase表
create 'test:stuflumehbasesink','base'

#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#初始化数据源
a1.sources.s1.type = taildir
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1 = /root/data/flume/tail04/.*.log
a1.sources.s1.positionFile = /opt/software/flume/flume190/data/taildr_position.conf
a1.sources.s1.batchSize = 10

#初始化数据槽
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint02
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10

#初始化数据槽
a1.sinks.k1.type = hbase2
a1.sinks.k1.table = test:stuflumehbasesink
a1.sinks.k1.columnFamily = base
a1.sinks.k1.serializer.regex = (.*),(.*),(.*),(.*)
a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
a1.sinks.k1.serializer.colNames = ROW_KEY,name,age,gender
a1.sinks.k1.serializer.rowKeyIndex = 0
a1.sinks.k1.batchSize = 10

#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
SOURCE:TAILDIR,CHANNEL:FILE,SINK:KAFKA

#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#初始化数据源
a1.sources.s1.type = taildir
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1 = /root/data/flume/prolog*.log
a1.sources.s1.positionFile = /opt/software/flume/flume190/data/taildir_position.conf
a1.sources.s1.batchSize = 10
#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#初始化数据槽
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = singlepower:9092
a1.sinks.k1.kafka.topic = prolog_01
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.linger.ms = 500
a1.sinks.k1.kafka.producer.acks = 1

#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-04 11:16:50  更:2021-08-04 11:18:16 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 19:23:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码