IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 使用Flume消费Kafka数据并落盘到HDFS -> 正文阅读

[大数据]使用Flume消费Kafka数据并落盘到HDFS

1.大体流程

在这里插入图片描述

2.具体配置

在这里插入图片描述

3.配置流程

1.配置Flume Agent

在hadoop104的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件

[lili@hadoop104 conf]$ vim kafka-flume-hdfs.conf

文件配置内容如下:

#定义组件
#由于要分别从Kafka的两个分区中获得数据,因此我们定义两个source
#r1获取topic_start的数据,r2获取topic_event的数据
a1.sources=r1 r2
#同理定义创建两个传输通道
a1.channels=c1 c2
#定义两个Sink。
#k1将数据发送到到HDFS的opic_start中
#k2将数据发送到到HDFS的opic_event中
a1.sinks=k1 k2


#配置source1
#定义source的类型为KafkaSource
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#每次写入Channel的最大消息数
a1.sources.r1.batchSize = 5000
#延迟时间。如果一批消息数不够5000,则会在延迟时间到达时,将该批次消息写入Channel
a1.sources.r1.batchDurationMillis = 2000
#kafka集群
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
#指定消费的哪个topic。
a1.sources.r1.kafka.topics=topic_start

## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event

#配置channel1
#定义channel的类型为FileChannel
a1.channels.c1.type = file
#磁盘索引存储路径
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
#磁盘数据存放路径
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
#将source的数据通过putlist传输进入channel
#如果channel已经满的,putlist需要等待**一定时间**然后继续写
#putlist如果反复写入,channel一直都是满的,则将putlist清除。
#在清除之后,在重新拉取刚刚读的数据。
#因此我们希望putlist的等待时间稍微长一点,尽可能的让channel里的数据消化一部分
#在这里我们设置6秒
a1.channels.c1.keep-alive = 6

## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c2.keep-alive = 6

#配置sink1
#定义sink的属性为HDFS Sink
a1.sinks.k1.type = hdfs
#定义数据上到的HDFS地址
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
#HDFS文件名称加前缀
a1.sinks.k1.hdfs.filePrefix = logstart-

##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-

#不要产生大量小文件,生产环境rollInterval配置为3600
#配置每次滚动生成新文件的时间(单位秒)
a1.sinks.k1.hdfs.rollInterval = 10
#文件在达到128M时会滚动生成新文件
a1.sinks.k1.hdfs.rollSize = 134217728
#0表示禁止使用,文件在达到一定event的数量时会滚动生成新文件
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0


# 控制输出文件是原生文件。
#定义文件的类型为压缩流
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.fileType = CompressedStream 
#压缩方式lzop支持索引
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop

# 拼装
#将数据从source放入channel中
a1.sources.r1.channels = c1
#channel将数据传入到sink中
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

2.Flume启动停止脚本

  1. 创建脚本f2.sh

    [lili@hadoop102 bin]$ vim f2.sh 
    #! /bin/bash
    case $1 in
    "start"){
            for i in hadoop104
            do
                    echo " --------启动 $i 消费flume-------"
                    ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/job/data-warehouse/flume2.log   2>&1 &"
            done
    };;
    "stop"){
            for i in hadoop104
            do
                    echo " --------停止 $i 消费flume-------"
                    ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
            done
    
    };;
    esac
    
  2. 增加权限

    [lili@hadoop102 bin]$ chmod 777 f2.sh
    
  3. 启动脚本

    [lili@hadoop102 bin]$ f2.sh start
    
  4. 停止脚本

    [lili@hadoop102 bin]$ f1.sh stop
    

4.Flume内存优化

1.抛出异常

  1. 问题描述:如果启动消费Flume抛出如下异常

    ERROR hdfs.HDFSEventSink: process failed

    java.lang.OutOfMemoryError: GC overhead limit exceeded

  2. 解决方法:

    1. 在hadoop102服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置

      [lili@hadoop102 ~]$ vim /opt/module/flume/conf/flume-env.sh
      export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
      
    2. 同步配置到hadoop103、hadoop104服务器

2.内存参数设置及优化

  1. JVM heap一般设置为4G或更高,部署在单独的服务器上(4核8线程16G内存)
  2. -Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁Full GC。
  3. -Xms表示JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发Full GC。

5.采集通道启动停止脚本

  1. 创建脚本

    [lili@hadoop102 ~]$ vim /home/lili/bin/cluster.sh
    #! /bin/bash
    
    case $1 in
    "start"){
    	echo "================    开始启动集群    ================"
    
    	echo "================    正在启动HDFS    ================"
    	/opt/module/hadoop-2.7.2/sbin/start-dfs.sh 
    	
    	echo "================    正在启动YARN    ================"
    	ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/start-yarn.sh"
    
    	#启动 Zookeeper集群
    	echo "==============    正在启动zookeeper   =============="
    	zk.sh start
    
    	sleep 4s;
    
    	#启动 Flume采集集群
    	f1.sh start
    	
    	sleep 2s;
    
    	#启动 Kafka采集集群
    	kf.sh start
    
    	sleep 6s;
    
    	#启动 Flume消费集群
    	f2.sh start
    
    };;
    "stop"){
    
      	echo "================    开始停止集群    ================"
    
       	#停止 Flume消费集群
    	f2.sh stop
    
    	sleep 2s; 
    
    	#停止 Kafka采集集群
    	kf.sh stop
    
    	sleep 6s;
    
    	#停止 Flume采集集群
    	f1.sh stop
    	
    	sleep 2s;
    	
    	echo "==============    正在停止zookeeper   =============="
    	#停止 Zookeeper集群
    	zk.sh stop
    	
    	echo "================    正在停止YARN    ================"
    	ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh"
    	
    	echo "================    正在停止HDFS    ================"
    	/opt/module/hadoop-2.7.2/sbin/stop-dfs.sh 
    
    };;
    esac
    
  2. 增加权限

    [lili@hadoop102 bin]$ chmod 777 cluster.sh
    
  3. 启动脚本

    [lili@hadoop102 bin]$ cluster.sh start
    
  4. 停止脚本

    [lili@hadoop102 bin]$ cluster.sh stop
    

6.数据传输测试

1.启动集群

[lili@hadoop102 bin]$ cluster.sh start
================    开始启动集群    ================
================    正在启动HDFS    ================
Starting namenodes on [hadoop102]
hadoop102: starting namenode, logging to /opt/module/hadoop-2.7.2/logs/hadoop-lili-namenode-hadoop102.out
hadoop103: starting datanode, logging to /opt/module/hadoop-2.7.2/logs/hadoop-lili-datanode-hadoop103.out
hadoop104: starting datanode, logging to /opt/module/hadoop-2.7.2/logs/hadoop-lili-datanode-hadoop104.out
hadoop102: starting datanode, logging to /opt/module/hadoop-2.7.2/logs/hadoop-lili-datanode-hadoop102.out
Starting secondary namenodes [hadoop104]
hadoop104: starting secondarynamenode, logging to /opt/module/hadoop-2.7.2/logs/hadoop-lili-secondarynamenode-hadoop104.out
================    正在启动YARN    ================
starting yarn daemons
starting resourcemanager, logging to /opt/module/hadoop-2.7.2/logs/yarn-lili-resourcemanager-hadoop103.out
hadoop102: starting nodemanager, logging to /opt/module/hadoop-2.7.2/logs/yarn-lili-nodemanager-hadoop102.out
hadoop104: starting nodemanager, logging to /opt/module/hadoop-2.7.2/logs/yarn-lili-nodemanager-hadoop104.out
hadoop103: starting nodemanager, logging to /opt/module/hadoop-2.7.2/logs/yarn-lili-nodemanager-hadoop103.out
==============    正在启动zookeeper   ==============
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
 --------启动 hadoop102 采集flume-------
 --------启动 hadoop103 采集flume-------
 --------启动 hadoop102 Kafka-------
 --------启动 hadoop103 Kafka-------
 --------启动 hadoop104 Kafka-------
 --------启动 hadoop104 消费flume-------
[lili@hadoop102 bin]$ 

2.生成日志数据

[lili@hadoop102 bin]$ lg.sh 
------hadoop102 生成日志-------
------hadoop103 生成日志-------
[lili@hadoop102 bin]$

3.进入HDFS的Web页面查看落盘情况

在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-04 11:16:50  更:2021-08-04 11:19:05 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/25 9:55:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码