IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flume -> 正文阅读

[大数据]flume

flume 经典例题:

例题1 : 数据监控:

第一步:创建配置文件

[root@singlery ~]#cd script/flume_job	
vim flume01.cnf
#组件声明
a1.sources = s1
al.channels = cl
al.sinks = k1
	
#初始化数据源
a1.sources.s1.type = netcat
a1.sources.s1.bind = 192.168.6.160
al.sources.s1.sort = 6666
	
#初始化通道
	
a1.channels.c1.type = memory
al.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10
	
#初始化数据槽

a1.sinks.k1.type = logger

#关键组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

#第二步:执行命令:

[root@singlery flume_job]# flume-ng agent --conf conf/ --name a1 --conf-file /root/script/flume_job/flume01.cnf -Dflume.root.logger=INFO,console

另启页面

nc -v 192.168.6.160 6666

Ncat: Connected to 192.168.6.160:6666.  连接成功   

输入文字,如hello world,可以在执行命令页面检测到

**

例题2 :数据导入hdfs

**

#第一步:创建配置文件

[root@singlery flume190]# mkdir -p /mydata/checkpoint
[root@singlery flume190]# mkdir -p /mydata/data

[root@singlery flume190]# ls mydata/
checkpoint  data
[root@singlery flume190]# pwd
/opt/software/flume/flume190
[root@singlery flume190]# mkdir flume-conf-files
[root@singlery flume190]# cd flume-conf-files/
[root@singlery flume-conf-files]# vim flume_spooldir_file_hdfs
#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#初始化数据源
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /root/script/flume/log
a1.sources.s1.ignorePattern = ^(.)*\\.bak$
a1.sources.s1.fileSuffix = .bak

#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.6.160:9820/flume/events/fakeorder/%y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockreplicas = 1

#关键组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

#第二步:将flume下的guava-11.0.2.jar删除,并将hadoop313中的guava-27.0-jre.jar复制到flume lib目录下

[root@singlery ~]#cd /opt/software/flume/flume190/lib
[root@singlery lib]# rm -r guava-11.0.2.jar

[root@singlery flume]# find /opt/software/hadoop/hadoop313/ -name 'guava*.jar'
/opt/software/hadoop/hadoop313/share/hadoop/common/lib/guava-27.0-jre.jar
[root@singlery lib]# cp /opt/software/hadoop/hadoop313/share/hadoop/common/lib/guava-27.0-jre.jar ./

#第三步:执行命令

[root@singlery flume190]#flume-ng agent -n a1 -c conf/ -f flume-conf-files/flume_spooldir_file_hdfs.cnf -Dflume.root.logger=INFO,console

例3 :avro

#读取前10000行,放在prohead1000.copy中

cd /root/script/flume/log

[root@singlery log]# head -10000 prolog.log>prohead1000.copy

#第一步:配置文件

[root@singlery ~]# cd /opt/software/flume/flume190/flume-conf-files/
[root@singlery flume-conf-files]# vim test03_avro.cnf
#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#初始化数据源
a1.sources.s1.type = avro
a1.sources.s1.bind = 192.168.6.160
a1.sources.s1.port = 7777
a1.sources.s1.threads = 5

#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.6.160:9820/flume/events/avroevent/%y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockreplicas = 1

#关键组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

#第二步:执行命令

1.
[root@singlery flume190]#flume-ng agent -n a1 -c conf/ -f flume-conf-files/test03_avro.cnf -Dflume.root.logger=INFO,console

2.
[root@singlery flume190]# flume-ng avro-client -H 192.168.6.160 -p 7777 -c conf/ -F /root/script/flume/log/prohead10000.copy

例4: TAILDIR source

#创建日志文档

[root@singlery ~]# cd script/flume/log
[root@singlery log]#
[root@singlery log]# mkdir tail01 tail02
[root@singlery log]# ls
prohead1000.copy  prolog.log  tail01  tail02
[root@singlery log]# head -10000 prolog.log>tail01/prohead10000.log
[root@singlery log]# ls tail01
prohead10000.log
[root@singlery log]# wc tail01/*
  10000   50000 1922936 tail01/prohead10000.log
  
[root@singlery log]# tail -10000 prolog.log>tail02/prologtail10000.log
[root@singlery log]# ls tail02
prologtail10000.log
[root@singlery log]# wc tail02/*
  10000   50000 1941730 tail02/prologtail10000.log

#第一步:创建配置文件:

[root@singlery ~]# cd /opt/software/flume/flume190/flume-conf-files/
[root@singlery flume-conf-files]# vim test04_taildir_file_hdfs.conf
#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#初始化数据源 目录文件为正则
a1.sources.s1.type = TAILDIR
a1.sources.s1.filegropus=f1 f2
a1.sources.s1.filegroups.f1= /root/script/flume/log/tail01/prolog.*\\.log
a1.sources.s1.filegroups.f2= /root/script/flume/log/tail02/prolog.*\\.log
a1.sources.s1.positionFile =/opt/software/flume/flume190/data/taildir/tail_position.json

#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.6.160:9820/flume/events/tailevent/%y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockreplicas = 1

#关键组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

#第二步:执行命令

[root@singlery flume190]# flume-ng agent -n a1 -c conf/ -f flume-conf-files/test04_taildir_file_hdfs.conf -Dflume.root.logger=INFO,console

例5:导入数据到hive表

#flume hive sink要求hive的表格格式
hive table
partition 分区表
bucket 分桶表
orc 压缩表
#netstat -n1 |grep 9083 : metastore 服务

#服务对hive hcatalog依赖

#第一步:将hcatalog中的jar包复制到flume下的lib目录

 [root@singlery ~]# cd /opt/software/flume/flume190/lib
[root@singlery lib]#cp /opt/software/hive/hive312/hcatalog/share/hcatalog/*.jar ./

#第二步:开启hive事务支持

SET hive.support.concurrency = true;
SET hive.enforce.bucketing = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on = true;
SET hive.compactor.worker.threads = 1;

#第三步:创建hive表

create table familyinfo(
family_id int,
family_name string,
family_age int,
family_gender string
)
partitioned by(intime string)
clustered by(family_gender) into 2 buckets
row format delimited 
fields terminated by ','
lines terminated by '\n'
stored as orc
tblproperties('transactional'='true');

#第四步:根据当前日期时间手动添加分区

alter table familyinfo add partition(intime='21-07-05-20')

//当前日期的时间

#第五步:创建hive配置文件test05_hdfs.conf

[root@singlery ~]# cd /opt/software/flume/flume190/flume-conf-files/
[root@singlery flume-conf-files]# vim test05_hdfs.conf
#initialize
a1.sources = s1
a1.channels = c1
al.sinks = k1
#taildir source
a1.sources.s1.type = taildir
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1=/root/script/flume/log/tail03/.*.log
a1.sources.s1.positionFile=/opt/software/flume/flume190/data/taildir/tail_position2.json
a1.sources.s1.batchSize=10

#file channels
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint2
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#hive sink
a1.sinks.k1.channels = c1
a1.sinks.k1.hive.metastore =thrift://192.168.6.160:9083
a1.sinks.k1.hive.database=default
a1.sinks.k1.hive.table=familyinfo
a1.sinks.k1.hive.partition=%y-%m-%d-%H
a1.sinks.k1.useLocalTimeStamp=true
al.sinks.k1.autoCreatePartitions=false
a1.sinks.k1.round=true
a1.sinks.k1.batchSize=10
a1.sinks.k1.roundValue=10
a1.sinks.k1.roundUnit=minute
a1.sinks.k1.serializer=DELIMITED
a1.sinks.k1.serializer.delimiter=","
a1.sinks.k1.serializer.serdeSeparator=','
a1.sinks.k1.serializer.fieldnames=family_id,family_name,family_age,family_gender
#connect to channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel =c1

#第六步:执行命令

[root@singlery flume190]# flume-ng agent -n a1 -c conf/ -f flume-conf-files/test05_hdfs.conf -Dflume.root.logger=INFO,console

例6:hbase*

#第一步:hbase中创建表

create 'test:stuflumehbasesink','base'

#第二步: 创建hbase配置文件test06_hdfs.conf

[root@singlery ~]# cd /opt/software/flume/flume190/flume-conf-files/
[root@singlery flume-conf-files]# vim test06_hdfs.conf
#组件声明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#初始化数据源
a1.sources.s1.type = taildir
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1 = /root/script/flume/log/tail03/.*.log
a1.sources.s1.positionFile = /opt/software/flume/flume190/data/taildir/taildir_position3.conf
a1.sources.s1.batchSize = 10

#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume190/mydata/checkpoint3
a1.channels.c1.dataDirs = /opt/software/flume190/mydata/data
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10

hive thrift://192.168.6.160
a1.sinks.k1.type = hbase2
a1.sinks.k1.table = test:stuflumehbasesink
a1.sinks.k1.columnFamily = base
a1.sinks.k1.serializer.regex = (.*),(.*),(.*),(.*)
a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
a1.sinks.k1.serializer.colNames = ROW_KEY,name,age,gender
a1.sinks.k1.serializer.rowKeyIndex = 0
a1.sinks.k1.batchSize = 10

#关键组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

#第三步:执行命令:

[root@singlery flume190]# flume-ng agent -n a1 -c conf/ -f flume-conf-files/test06_hdfs.conf -Dflume.root.logger=INFO,console
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-16 11:22:15  更:2021-07-16 11:23:49 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/9 7:07:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码