IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 采集日志Flume快速入门 -> 正文阅读

[大数据]采集日志Flume快速入门

1.概述

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的软件。

Flume的核心是把数据从数据源(source)收集过来,再将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume在删除自己缓存的数据。

Flume支持定制各类数据发送方,用于收集各类型数据;同时,Flume支持定制各种数据接受方,用于最终存储数据。一般的采集需求,通过对flume的简单配置即可实现。针对特殊场景也具备良好的自定义扩展能力。因此,flume可以适用于大部分的日常数据采集场景。

当前Flume有两个版本。Flume 0.9X版本的统称Flume OG(original generation),Flume1.X版本的统称Flume NG(next generation)。由于Flume NG经过核心组件、核心配置以及代码架构重构,与Flume OG有很大不同,使用时请注意区分。改动的另一原因是将Flume纳入 apache 旗下,Cloudera Flume 改名为 Apache Flume。

2.运行机制

Flume系统中核心的角色是agent,agent本身是一个Java进程,一般运行在日志收集节点。

在这里插入图片描述

每一个agent相当于一个数据传递员,内部有三个组件:

Source:采集源,用于跟数据源对接,以获取数据;

Sink:下沉地,采集数据的传送目的,用于往下一级agent传递数据或者往最终存储系统传递数据;

Channel:agent内部的数据传输通道,用于从source将数据传递到sink;

在整个数据的传输的过程中,流动的是event,它是Flume内部数据传输的最基本单元。event将传输的数据进行封装。如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

一个完整的event包括:event headers、event body、event信息,其中event信息就是flume收集到的日记记录。

3.Flum采集系统构建图

3.1简单结构

单个 agent 采集数据

在这里插入图片描述

3.2复杂结构

多个 agent 之间串联

在这里插入图片描述

4.采集日志Flume快速入门

4.1安装部署

  1. 将apache-flume-1.7.0-bin.tar.gz上传到linux的/opt/software目录下

  2. 解压apache-flume-1.7.0-bin.tar.gz到/opt/module/目录

    [lili@hadoop102 software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
    
  3. 修改apache-flume-1.7.0-bin的名称为flume

    [lili@hadoop102 module]$ mv apache-flume-1.7.0-bin flume
    
  4. 将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件

    [lili@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh
    [lili@hadoop102 conf]$ vim flume-env.sh
    export JAVA_HOME=/opt/module/jdk1.8.0_144  
    

4.2Flume具体配置

  1. 在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件

    [lili@hadoop102 conf]$ vim file-flume-kafka.conf
    
  2. 配置该文件

    File—>Flume—>Kafka

    Flume官网:官网入口

    #1.定义组件
    # a1即为Agent1
    a1.sources=r1	#定义一个数据源为r1
    a1.channels=c1 c2	#定义两个传输通道为c1和c2 
    
    #2.配置source--configure source
    #定义读取数据源r1的读取方式为talidir
    a1.sources.r1.type = TAILDIR
    #设置JSON格式的文件的位置。该文件中记录tialing最后位置,实现断点续传
    a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
    #定义文件组,如果有多个文件组以空格的形式隔开
    a1.sources.r1.filegroups = f1
    #定义要读取的日志数据位置,即每个文件组的绝对路径
    a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
    #是否在event的header中添加文件的完整路径信息
    a1.sources.r1.fileHeader = true
    #指定source(数据源为r1)使用的channel(传输通道)为c1 c2
    a1.sources.r1.channels = c1 c2
    
    #配置拦截器--interceptor
    #定义source(数据源为r1)的拦截器有i1 i2
    a1.sources.r1.interceptors =  i1 i2
    #定义拦截器i1为ETL拦截器,拦截不规范的数据
    a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder
    #定义拦截器i2为日志类型拦截器,将启动日志与事件日志分开
    a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder
    
    #定义source(数据源为r1)的选择器类型为multiplexing(多路
    #Multiplexing 可以选择该发往哪些channel
    #Replicating 会将source过来的events发往所有channel
    a1.sources.r1.selector.type = multiplexing
    #设置选择器的键值对header的key为topic
    a1.sources.r1.selector.header = topic
    #判断选择器的键值对header的topic对应的value,如果为topic_start则数据通过c1传输,反之用c2
    a1.sources.r1.selector.mapping.topic_start = c1
    a1.sources.r1.selector.mapping.topic_event = c2
    #通过这种方法成功将启动日志与事件日志分离
    
    #配置传输通道--configure channel
    #配置传输通道的类型。
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    #kafka集群主机列表,中间用逗号隔开
    a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    #日志类型为启动日志,数据发往哪一个Kafka中去
    a1.channels.c1.kafka.topic = topic_start
    #如果为true,数据前会有FlumeEvent前缀。设置为false则没有前缀
    a1.channels.c1.parseAsFlumeEvent = false
    #设置消费者组,Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型:
    #如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;
    #如果所有实例分别属于不同的 Group,那么它实现的就是发布 / 订阅模型。
    a1.channels.c1.kafka.consumer.group.id = flume-consumer
    
    a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.channels.c2.kafka.topic = topic_event
    a1.channels.c2.parseAsFlumeEvent = false
    a1.channels.c2.kafka.consumer.group.id = flume-consumer
    

4.3Flume的ETL和分类型拦截器

ETL拦截器:主要用于,过滤时间戳不合法和Json数据不完整的日志

日志类型区分拦截器:主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。

前期准备:

  1. 创建Maven工程flume-interceptor。
  2. 创建包名com.atguigu.flume.interceptor

4.3.1配置pom.xml文件

 <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

4.3.2ETL拦截器

package com.atguigu.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class LogETLInterceptor implements Interceptor {
    @Override
    public void initialize() {
        //初始化
    }

    @Override
    //判断单个event
    public Event intercept(Event event) {
        //获取数据
        byte[] body = event.getBody();
        String log = new String(body,Charset.forName("UTF-8"));

        //判断数据类型并向Header中赋值
        if (log.contains("start")) {
            if (LogUtils.validateStart(log)) {
                return event;
            }
        } else {
            if(LogUtils.validateEvent(log)) {
                return event;
            }
        }
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        ArrayList<Event> interceptors = new ArrayList<>();

        for (Event event : list) {
            Event intercept1 = intercept(event);

            if (intercept1 != null) {
                interceptors.add(intercept1);
            }
        }
        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogETLInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

4.3.3日志过滤工具类

package com.atguigu.flume.interceptor;

import org.apache.commons.lang.math.NumberUtils;

public class LogUtils {
    public static boolean validateEvent(String log) {
        //切割
        String[] logContents = log.split("\\|");
/*
1615265478494|{"cm":
{"ln":"-87.9","sv":"V2.2.2","os":"8.2.8","g":"204080W9@gmail.com","mid":"997","nw":"3G"
,"l":"en","vc":"1","hw":"640*1136","ar":"MX","uid":"997","t":"1615174356590","la":"-2.8
","md":"Huawei-12","vn":"1.2.8","ba":"Huawei","sr":"M"},"ap":"app"}
*/
        //校验
        if (logContents.length != 2) {
            return false;
        }
        //校验服务器时间

        if (logContents[0].length() != 13 || !NumberUtils.isDigits(logContents[0])) {
            return false;
        }

        //校验Json
        //trim()  去掉字符串两端的空格  不论空格多少
        if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")) {
            return false;
        }
        return true;
    }
    public static boolean validateStart(String log) {
        if (log == null) return false;
		/*{"action":"1","ar":"MX","ba":"Sumsung","detail":"","en":"start",
		"entry":"3","extend1":"","g":"YNNY6OZ7@gmail.com","hw":"640*960"}
		*/
        //校验json
        if (!log.trim().startsWith("{") || !log.trim().endsWith("}")) return false;

        return true;
    }
}

4.3.4日志分类拦截器

package com.atguigu.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class LogTypeInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        //区分日志类型  body header
        //获取body数据
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));

        //获取header
        Map<String, String> headers = event.getHeaders();

        //判断判断数据类型并向header中赋值
        if (log.contains("start")) {
            //启动日志
            headers.put("topic" , "topic_start");
        } else {
            //事件日志
            headers.put("topic" , "topic_event");
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        ArrayList<Event> interceptors = new ArrayList<>();

        for (Event event : list) {
            Event intercept1 = intercept(event);
            interceptors.add(intercept1);
        }
        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

4.3.5打包上传并启动

  1. 将不带依赖的包,上传到hadoop102的Flume的lib文件夹下。

    [lili@hadoop102 lib]$ ls | grep interceptor
    flume-interceptor-1.0-SNAPSHOT.jar
    
  2. 将flume-interceptor-1.0-SNAPSHOT.jar分发到hadoop103,hadoop104

    前期准备:xysnc分发脚本

    #!/bin/bash
    #1 获取输入参数个数,如果没有参数,直接退出
    pcount=$#
    if ((pcount==0)); then
    echo no args;
    exit;
    fi
    
    #2 获取文件名称
    p1=$1
    fname=`basename $p1`
    echo fname=$fname
    
    #3 获取上级目录的绝对路径
    pdir=`cd -P $(dirname $p1); pwd`
    echo pdir=$pdir
    
    #4 获取当前用户名称
    user=`whoami`
    
    #5 循环
    for host in hadoop102 hadoop103 hadoop104
    do
        echo ------------------- $host --------------
        rsync -av $pdir/$fname $user@$host:$pdir
    done
    

    分发flume-interceptor-1.0-SNAPSHOT.jar

    [lili@hadoop102 lib]$ xsync flume-interceptor-1.0-SNAPSHOT.jar 
    fname=flume-interceptor-1.0-SNAPSHOT.jar
    pdir=/opt/module/flume/lib
    ------------------- hadoop102 --------------
    sending incremental file list
    
    sent 72 bytes  received 12 bytes  168.00 bytes/sec
    total size is 6667  speedup is 79.37
    ------------------- hadoop103 --------------
    sending incremental file list
    flume-interceptor-1.0-SNAPSHOT.jar
    
    sent 6782 bytes  received 31 bytes  4542.00 bytes/sec
    total size is 6667  speedup is 0.98
    ------------------- hadoop104 --------------
    sending incremental file list
    flume-interceptor-1.0-SNAPSHOT.jar
    
    sent 6782 bytes  received 31 bytes  13626.00 bytes/sec
    total size is 6667  speedup is 0.98
    
  3. 分发file-flume-kafka.conf到hadoop102,hadoop103

    [lili@hadoop102 conf]$ pwd
    /opt/module/flume/conf
    [lili@hadoop102 conf]$ xsync file-flume-kafka.conf 
    fname=file-flume-kafka.conf
    pdir=/opt/module/flume/conf
    ------------------- hadoop102 --------------
    sending incremental file list
    
    sent 59 bytes  received 12 bytes  47.33 bytes/sec
    total size is 1307  speedup is 18.41
    ------------------- hadoop103 --------------
    sending incremental file list
    file-flume-kafka.conf
    
    sent 1409 bytes  received 31 bytes  2880.00 bytes/sec
    total size is 1307  speedup is 0.91
    ------------------- hadoop104 --------------
    sending incremental file list
    file-flume-kafka.conf
    
    sent 1409 bytes  received 31 bytes  960.00 bytes/sec
    total size is 1307  speedup is 0.91
    
    

5.采集日志FLume启动脚本

  1. 在/home/lili/bin目录下创建脚本f1.sh

    [lili@hadoop102 bin]$ vim f1.sh
    #!/bin/bash
    #flume启动脚本
    #脚本启动的为hadoop1与hadoop2
    #本项目是 flume -- kafka -- flume的形式
    #该脚本为第一个flume,即为采集数据输送到kafka
    case $1 in
    "start"){
    	for i in hadoop102 hadoop103
    	do
    		 echo " --------启动 $i 采集flume-------"
    		ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/job/data-warehouse/flume.log 2>&1  &"
    	done
    };;
    "stop"){
    	for i in hadoop102 hadoop103
    	do
    		echo " --------停止 $i 采集flume-------"
                    ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs kill"	
    	done
    };;
    esac
    
    # nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。
    # awk默认分隔符为空格
    # xargs表示取出前面命令运行的结果,作为后面命令的输入参数。
    
    
  2. 增加脚本权限

    [lili@hadoop102 bin]$ chmod 777 f1.sh
    
  3. Flume集群启动

    [lili@hadoop102 flume]$ f1.sh start
     --------启动 hadoop102 采集flume-------
     --------启动 hadoop103 采集flume-------
    [lili@hadoop102 bin]$ jps
    17358 Jps
    17139 Application
    
  4. Flume集群停止

    [lili@hadoop102 flume]$ f1.sh stop
     --------停止 hadoop102 采集flume-------
     --------停止 hadoop103 采集flume-------
    [lili@hadoop102 bin]$ jps
    17378 Jps
    

6.脚本补充

6.1Hadoop启动脚本

[lili@hadoop102 bin]$ vim hdp
#!/bin/bash
#1 获取参数个数,如果没有参数,直接退出
pcount=$#
if((pcount==0)); then 
echo no args; 
exit; 
fi 

case $1 in
"start" ){

	echo "================    开始启动集群    ================"

	# 开启hadoop、yarn、历史服务器
	echo "================    正在启动HDFS    ================"
	ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/start-dfs.sh'
	
	echo "================    正在启动YARN    ================"
	ssh hadoop103 '/opt/module/hadoop-2.7.2/sbin/start-yarn.sh'
	
	echo "================    正在启动JobHistoryServer    ================"
	ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/mr-jobhistory-daemon.sh start historyserver'
};;

"stop" ){

	echo "================    开始关闭集群    ================"

	# 关闭hadoop、yarn、历史服务器
	echo "================    正在关闭JobHistoryServer    ================"
	ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/mr-jobhistory-daemon.sh stop historyserver'
	
	echo "================    正在关闭YARN    ================"
        ssh hadoop103 '/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh'
	
	echo "================    正在关闭HDFS    ================"
        ssh hadoop102 '/opt/module/hadoop-2.7.2/sbin/stop-dfs.sh'
};;
esac

6.2生成日志脚本

[lili@hadoop102 bin]$ vim lg.sh
#!/bin/bash
for i in hadoop102 hadoop103
do
	echo ----------生成日志----------
	ssh $i "java -jar /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar $1 $2 >/dev/null 2>&1 &"
done

6.3zookeeper启动脚本

[lili@hadoop102 bin]$ vim zk.sh
#! /bin/bash
case $1 in
"start"){
	for i in hadoop102 hadoop103 hadoop104
	do
		ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start"
	done
};;
"stop"){
	for i in hadoop102 hadoop103 hadoop104
	do
		ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop"
	done
};;
"status"){
	for i in hadoop102 hadoop103 hadoop104
	do
		ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status"
	done
};;
esac

6.3集群命令脚本

注:对集群发出命令并返回结果,多用于进程的查看

[lili@hadoop102 bin]$ vim xcall.sh
#!/bin/bash
for i in hadoop102 hadoop103 hadoop104
do
	echo ----------$i----------
	ssh  $i "$* "
done

例如:

[lili@hadoop102 bin]$ xcall.sh jps
----------hadoop102----------
17469 Jps
----------hadoop103----------
10961 Jps
----------hadoop104----------
11583 Jps

6.4统一集群时间脚本

[lili@hadoop102 bin]$ vim dt.sh
#!/bin/bash
for i in hadoop102 hadoop103 hadoop104
do
	echo ----------$i 同步时间----------
	#当用到sudo的时候需要加  -t
	ssh -t $i "sudo date -s $1 "
done

例如:

[lili@hadoop102 bin]$ dt.sh 16:00:00
----------hadoop102 同步时间----------
[sudo] password for lili: 
2021年 07月 30日 星期五 16:00:00 CST
Connection to hadoop102 closed.
----------hadoop103 同步时间----------
[sudo] password for lili: 
2021年 07月 30日 星期五 16:00:00 CST
Connection to hadoop103 closed.
----------hadoop104 同步时间----------
[sudo] password for lili: 
2021年 07月 30日 星期五 16:00:00 CST
Connection to hadoop104 closed.
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-31 16:42:40  更:2021-07-31 16:44:03 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 18:31:34-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码