IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flume数据迁移浅析 -> 正文阅读

[大数据]flume数据迁移浅析

Flume

Version:
	hadoop >>> 3.1.3
	hive   >>> 3.1.2
	hbase  >>> 2.3.5
	flume  >>> 1.9.0

核心概念

? Client:Client生产数据,运行在一个独立的线程。

Event: 一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)
  Flow: Event从源点到达目的点的迁移的抽象。
  Agent: 一个独立的Flume进程,包含组件Source、 Channel、 Sink。(Agent使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含 多个sources和sinks。)
  Source: 数据收集组件。(source从Client收集数据,传递给Channel)
  Channel: 中转Event的一个临时存储,保存由Source组件传递过来的Event。(Channel连接 sources 和 sinks ,这个有点像一个队列。)
  Sink: 从Channel中读取并移除Event, 将Event传递到FlowPipeline中的下一个Agent(如果有的话)(Sink从Channel收集数据,运行在一个独立线程。)

核心组件

SOURCECHANNELSINK
NETCAT( ip + port )memory:内训logger
SPOOLDIR(dir)file:文件hdfs
TAILDIR(dir)kafka
hive
hbase

案例一 :NETCAT

SOURCE: NETCAT || CHANNEL: MEMORY || SINK: LOGGER

vim /scrapts/flume_job/logconf/netcat/flume01.conf
---------------------------------NETCAT( ip + port )-------------------------------------
# 组件说明
a1.sources 	= s1
a1.channels = c1
a1.sinks    = k1

# 初始化数据源
a1.sources.s1.type = netcat
a1.sources.s1.bind = 192.168.150.150
a1.sources.s1.port = 6666

# 初始化通道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10

# 初始化数据槽
a1.sinks.k1.type = logger

# 关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

# 运行flume
flume-ng agent -n a1 -c /opt/software/flume/flume190/conf/ -f  /scrapts/flume_job/logconf/netcat/flume01.conf -Dflume.root.logger=INFO,console

# 监听端口
nv -v 192.168.150.150 6666

案例二:SPOOLDIR

SOURCE: SPOOLDIR || CHANNEL: file || SINK: hdfs

vim /scripts/flume_job/logconf/spooldir/flume_spooldir_file_hdfs.cnf
---------------------------------SPOOLDIR(dir)-------------------------------------
# 组件说明
a1.sources 	= s1
a1.channels = c1
a1.sinks    = k1

# 初始化数据源
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /root/data/flume
a1.sources.s1.ignorePattern = ^(.)*\\.bak$
a1.sources.s1.fileSuffix = .bak

# 初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data 
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

# 初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.150.150:9820/flume/events/fake/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true		# 时间舍弃
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.bacthSize = 0
a1.sinks.k1.hdfs.threadsPoolSize = 10
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1

# 关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

# 启动flume
flume-ng agent -n a1 -c /opt/software/flume/flume190/conf/ -f /scripts/flume_job/logconf/spooldir/flume_spooldir_file_hdfs.cnf -Dflume.root.logger=INFO,console


案例三: AVRO

SOURCE: AVRO || CHANNEL: file || SINK: hdfs

/scripts/flume_job/logconf/avro/flume_avro_file_hdfs.cnf

vim /opt/software/flume/flume190/flume-conf-files/flume_spooldir_file_hdfs.cnf
---------------------------------AVRO-------------------------------------
# 组件说明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

# 初始化数据源
a1.sources.s1.type = avro
a1.sources.s1.bind = 192.168.150.150
a1.sources.s1.port = 7777
a1.sources.s1.threads = 5

# 初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data 
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

# 初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.150.150:9820/flume/events/avroevent/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true		# 时间舍弃
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.bacthSize = 0
a1.sinks.k1.hdfs.threadsPoolSize = 10
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1

# 关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

# 启动flume_avro 监听机制
flume-ng agent -n a1 -c /opt/software/flume/flume190/conf/ -f /scripts/flume_job/logconf/avro/flume_avro_file_hdfs.cnf -Dflume.root.logger=INFO,console

# 指定端口读取指定文件
flume-ng avro-client -H 192.168.150.150 -p 7777 -c /opt/software/flume/flume190/conf/ -F /root/kb12_data/prologcopy.log

案例四: TAILDIR

断点续增

SOURCE: TAILDIR || CHANNEL: file || SINK: hdfs

/scripts/flume_job/logconf/tail/flume_taildir_file_hdfs.cnf

vim /scripts/flume_job/logconf/tail/flume_taildir_file_hdfs.cnf
---------------------------------TAILDIR-------------------------------------
# 组件说明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

# 初始化数据源
a1.sources.s1.type = taildir
a1.sources.s1.filegroups = f1 f2
a1.sources.s1.filegroups.f1 = /root/kb12_data/tail01/prolog.*.log
a1.sources.s1.filegroups.f2 = /root/kb12_data/tail02/prolog.*.log
a1.sources.s1.positionFile = /root/kb12_data/taildir/taildir_position.json

# 初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data 
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# 初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.150.150:9820/flume/events/taildirevent/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true		# 时间舍弃
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.bacthSize = 0
a1.sinks.k1.hdfs.threadsPoolSize = 10
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1

# 关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

# 启动flume
flume-ng agent -n a1 -c /opt/software/flume/flume190/conf/ -f /scripts/flume_job/logconf/tail/flume_taildir_file_hdfs.cnf -Dflume.root.logger=INFO,console


flume-ng avro-client -H 192.168.150.150 -p 7777 -c /opt/software/flume/flume190/conf/ -F /root/kb12_data/prologcopy.log

案例五: HIVE-SINK

SOURCE: TAIDIR || CHANNEL: file || SINK: HIVE

# flume hive sink 要求hive的表格式
	hive table
		partition
		bucket
		orc

# netstat -nl | grep 9083 		:metastore 服务

# flume 对 hive hcatalog 依赖
cp /opt/software/hive/hive312/hcatalog/share/hcatalog/*.jar /opt/software/flume/flume190/lib/

# 开启 hive 事务支持
SET hive.support.concurrency = true;
SET hive.enforce.bucketing = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on = true;
SET hive.compactor.worker.threads = 1;

# 创建 hive 表
create table familyinfo(
family_id int,
family_name string,
family_age int,
family_gender string
)
partitioned by(intime string)
clustered by(family_gender) into 2 buckets
row format delimited
fields terminated by ','
lines terminated by '\n'
stored as orc
tblproperties('transactional'='true');

# 根据当前日期时间手动添加分区
alter table familyinfo add partition(intime='21-07-05-16')

# 创建 hive 配置文件
vim /scripts/flume_job/logconf/hive_sink/flume_hive_sink_file_hdfs.cnf
---------------------------------TAILDIR-------------------------------------
# 组件说明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

# 初始化数据源
a1.sources.s1.type = taildir
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1 = /root/kb12_data/hive_sink/.*.log
a1.sources.s1.positionFile = /root/kb12_data/taildir/taildir_position.conf
a1.sources.s1.batchSize = 10

# 初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data 
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10

# 初始化数据槽
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore = thrift://192.168.150.150:9083
a1.sinks.k1.hive.database = test
a1.sinks.k1.hive.table = familyinfo
a1.sinks.k1.hive.partition = %y-%m-%d-%H
a1.sinks.k1.useLocalTimeStamp = true
a1.sinks.k1.autoCreatePartitions = false
a1.sinks.k1.hive.round = true		# 时间舍弃
a1.sinks.k1.roundValue = 10
a1.sinks.k1.batchSize = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ","
a1.sinks.k1.serializer.serdeSeparator = ','
a1.sinks.k1.serializer.fieldnames = family_id,family_name,family_age,family_gender

# 关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

# 启动flume
flume-ng agent -n a1 -c /opt/software/flume/flume190/conf/ -f /scripts/flume_job/logconf/hive_sink_taidir/hive_sink_taildir.cnf -Dflume.root.logger=INFO,console


# 如遇问题,清空checkpointDir 目录 ,更换 positionFile, 删除用户目录下的隐藏文件 .flume,之后启动即可

案例五: HBASE-SINK

SOURCE: TAIDIR || CHANNEL: file || SINK: HBASE

# 创建 hbase 表
create 'test:stuflumehbasesink','base'

# 创建 hbase 配置文件
vim /scripts/flume_job/logconf/hbase_sink_taildir/hbase_sink_taildir.cnf
---------------------------------TAILDIR-------------------------------------
# 组件说明
a1.sources = s1
a1.channels = c1
a1.sinks = k1

# 初始化数据源
a1.sources.s1.type = taildir
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1 = /root/kb12_data/hbase_sink/.*.log
a1.sources.s1.positionFile = /root/kb12_data/taildir/taildir_position.conf
a1.sources.s1.batchSize = 10

# 初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/flume190/mydata/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/flume190/mydata/data 
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10

# 初始化数据槽
a1.sinks.k1.type = hbase2
a1.sinks.k1.table = test:stuflumehbasesink
a1.sinks.k1.columnFamily = base
a1.sinks.k1.serializer.regex=(.*),(.*),(.*),(.*)
a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
a1.sinks.k1.serializer.colNames = ROW_KEY,name,age,gender
a1.sinks.k1.serializer.rowKeyIndex = 0
a1.sinks.k1.batchSize = 10

# 关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

# 启动flume
flume-ng agent -n a1 -c /opt/software/flume/flume190/conf/ -f /scripts/flume_job/logconf/hbase_sink_taildir/hbase_sink_taildir.cnf -Dflume.root.logger=INFO,console


# 如遇问题,清空checkpointDir 目录 ,更换 positionFile, 删除用户目录下的隐藏文件 .flume,之后启动即可
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-07 00:01:57  更:2021-07-07 00:02:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/1 11:28:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码