IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flume的TaildirSource配置路径匹配滚动时间(修改源码) -> 正文阅读

[大数据]Flume的TaildirSource配置路径匹配滚动时间(修改源码)

Flume的TaildirSource配置路径匹配滚动时间(修改源码)

一、背景

最近有个需求,要用flume读取腾讯云CDN的日志,由于日志是运维同学直接挂在到本地服务器上的,每次访问目录下的文件都相当于要与腾讯云之间有个交互,会产生大量的访问量,似乎这部分要收费的,所有要尽可能地少访问。
日志路径如下

/data/cos_dir/tencentcdnlog/2022/04/02/15

在这里插入图片描述
目录是按照时间实时滚动增加的,也就是说16点时02目录下会多一个16的文件夹,16文件夹里会放有那些日志文件
需求:现在要按时间每小时读取对应文件夹下的文件
原本用的采集工具是fluentd,它的采集配置本身支持滚动的时间但现在要用flume了,flume本身是不支持父目录里有通配符的,所以我们需要改下源码
fluentd配置如下

<source>
@type tail
path /data/cos_dir/tencentcdnlog/%Y/%m/%d/%H/*
pos_file /home/john/logs/td-agent/john_cdn_log.log.pos
tag john_cdn_log.bar
read_from_head true
enable_watch_timer false
<parse>
@type none
</parse>
</source>

<match john_cdn_log.bar>
@type kafka_buffered

# list of seed brokers
brokers xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092

# buffer settings
buffer_type file
buffer_path /home/john/logs/td-agent/buffer/john_cdn_log/td
buffer_chunk_limit 256m
buffer_queue_limit 64
# 拉取频率
flush_interval 1s
num_threads 2

# topic settings
default_topic john_cdn_log_input

# data type settings
output_data_type attr:message
compression_codec gzip

# producer settings
max_send_retries 1
kafka_agg_max_bytes 1000000
</match>

二、正片(修改TaildirSource源码)

修改完源码后有如下功能:
a、支持带时间通配符的目录
b、支持配置初始化时自动创建不存在的父目录

1、下载源码

本次修改的是1.9版本
官网下载源码包

https://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-src.tar.gz

git上下也行

https://github.com/apache/flume

找到taildirSource对应位置
在这里插入图片描述

2、分析源码

TaildirSource.java

start()方法,主要做三件事,一、开启一个ReliableTaildirEventReader事件监听者,二、开启个线程去校验文件什么时候关闭,三、开启一个线程去维护position位置文件。在这里插入图片描述
configure(Context context)
负责装载配置文件
在这里插入图片描述
process()
负责与channel的交互,把读取到的文件给channel
在这里插入图片描述

ReliableTaildirEventReader.java

看构造方法
会在这里创建TaildirMatcher对象,每个父路径一个对象,待会我们重点改它
在这里插入图片描述
看updateTailFiles(boolean skipToEnd)方法
这个方法很重要,会遍历每个TaildirMatcher(一个TM相当于一个父目录),更新下面的文件信息
在这里插入图片描述
TaildirSource中的process也会调用它
在这里插入图片描述

TaildirMatcher.java

看构造方法
这里,每个TaildirMatcher都会保存一个对应的父目录信息,我们要改的也主要是这里
在这里插入图片描述
这还有个getMatchingFiles()用来更新文件
在这里插入图片描述

3、修改源码

TaildirMatcher.java

主要是在创建TaildirMatcher时,把输入的路径中含有的时间通配符换成了当前时间对应的准确值,由于TaildirMathcer会在一开始判断下配置中给定的文件路径的父目录是否存在,若不存在会报错,而我测试中用的时间目录是精确到分钟的,所以为了我每次启动时不报错,就在配置中加了个配置,让它一开始检测到父目录不存在时可以自动创建父目录
在这里插入图片描述

在这里插入图片描述

ReliableTaildirEventReader.java

主要的替换操作就在这了,当需要更新文件时,会去判断TaildirMatcher中的父目录parentDir是否是当前时间的目录,如果不是,就会把parentDir替换成当前时间对应的目录
在这里插入图片描述
在这里插入图片描述

TaildirSource.java

在这里插入图片描述
在这里插入图片描述

TaildirSourceConfigurationConstants.java

在这里插入图片描述
改完后install打包,只取taildirsource包,丢到flume的lib目录下就行了
在这里插入图片描述
在这里插入图片描述

以上所有操作都是针对带时间通配符的路径的,对其他路径不会有影响,主体思路是比较简单的,就是在每次更新文件的时候去判断对应父目录是否是当前时间的,如果不是那就更新为当前时间的父目录,上面的一些其他代码主要是为了实现我的另外两个操作,一、可以加个配置在开始时自动创建不存在的时间父目录而不报错,二、在时间滚动到下一个时间时,判断下对应父目录是否存在,若不存在则不替换父目录,同时打个不存在的info,但为了避免每5秒的更新都打印这个info我又加了个缓存来记录上一次的路径,从而与当前对比,不同时才打印

三、注意

1、由于父目录会更新,导致position游标文件也会被后面目录覆盖
2、上文提到有个默认5秒的更新文件时间,假设路径中的时间最小精度是分,当父目录滚动时,在前一分钟的最后5秒(不一定是)到下一分钟这段时间的数据是否会丢失还没去验证

四、可能遇到的问题

1、打包时遇到其他组件(flume-ng-morphline-solr-sink)报错,直接从maven中干掉它不打包它就行了
在这里插入图片描述
在这里插入图片描述
2、打包时遇到checkstyle格式不符合规范报错,vm中加点参数,跳过检查就行了
在这里插入图片描述

在这里插入图片描述
3、遇到spotbugs代码错误检查报错,找到对应pom.xml稍微提高下上限就行了,因为不一定是有问题的错。
在这里插入图片描述

五、其他

自定义channel

为了方便看channel的数据,顺便自定义了个简陋的Chanenl往本地文件写接受到的source数据
pom.xml

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>
    </dependencies>
package com.john.myflumechannel;

import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.flume.*;
import org.apache.flume.channel.AbstractChannel;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;

public class MyFlumeChannel1 extends AbstractChannel {
    private static final Logger LOG = LoggerFactory.getLogger(MyFlumeChannel1.class);
    // 文件的落盘位置
    String outputPath;
    File file;

    public void put(Event event) throws ChannelException {
        Transaction transaction = null;
        try{
            transaction = getTransaction();
            transaction.begin();
            String s = new String(event.getBody());
            // 追加写
            FileUtils.writeStringToFile(file, s + "\n", true);
            if(s.length() > 0){
                LOG.info("接收到一条数据:" + s);
            }
            transaction.commit();
        }catch (Exception e){
            transaction.rollback();
            LOG.info("something is error: " + e.getMessage());
        }finally {
            transaction.close();
        }
    }

    public Event take() throws ChannelException {
        return null;
    }

    public Transaction getTransaction() {
        return new BasicTransactionSemantics() {
            @Override
            protected void doPut(Event event) throws InterruptedException {

            }

            @Override
            protected Event doTake() throws InterruptedException {
                return null;
            }

            @Override
            protected void doCommit() throws InterruptedException {
            }

            @Override
            protected void doRollback() throws InterruptedException {

            }
        };
    }

    public void configure(Context context){
        String outputPath = context.getString("outputPath");
        Preconditions.checkNotNull(outputPath, "outputPath must be set!!");
        this.outputPath = outputPath;
        file = new File(outputPath);
    }
}

打包同样丢到flume的lib下就行了

测试时用配置文件


# example.conf: A single-node Flume configuration
# Name the components on this agent
mytest.sources = r1 r2
mytest.channels = c1 c2

# Describe/configure the source
mytest.sources.r1.type = TAILDIR
mytest.sources.r1.channels = c1
mytest.sources.r1.filegroups = f1 f2
mytest.sources.r1.filegroups.f1 = /root/john/%Y/%m/%d/%H/%M/.*
mytest.sources.r1.filegroups.f2 = /root/john/2022/04/02/14/05/.*
mytest.sources.r1.positionFile = /root/john/flume/flume-position/myposition1.json
mytest.sources.r1.batchSize = 100
mytest.sources.r1.initTimeParentDir = true

#mytest.channels.c1.type = file
#mytest.channels.c1.checkpointDir = /root/john/flume/checkpoint
#mytest.channels.c1.dataDirs = /root/john/flume/data

mytest.channels.c1.type = com.john.myflumechannel.MyFlumeChannel1
mytest.channels.c1.outputPath = /root/john/flume/myChannel/mydata.log

# Describe/configure the source
mytest.sources.r2.type = TAILDIR
mytest.sources.r2.channels = c2
mytest.sources.r2.filegroups = f1
mytest.sources.r2.filegroups.f1 = /root/john/flume/testlog/.*
mytest.sources.r2.positionFile = /root/john/flume/flume-position/myposition2.json
mytest.sources.r2.batchSize = 100

mytest.channels.c2.type = com.john.myflumechannel.MyFlumeChannel1
mytest.channels.c2.outputPath = /root/john/flume/myChannel/mydata.log
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-04 12:18:09  更:2022-04-04 12:22:16 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 15:47:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码