Flume的TaildirSource配置路径匹配滚动时间(修改源码)
一、背景
最近有个需求,要用flume读取腾讯云CDN的日志,由于日志是运维同学直接挂在到本地服务器上的,每次访问目录下的文件都相当于要与腾讯云之间有个交互,会产生大量的访问量,似乎这部分要收费的,所有要尽可能地少访问。 日志路径如下
/data/cos_dir/tencentcdnlog/2022/04/02/15
目录是按照时间实时滚动增加的,也就是说16点时02目录下会多一个16的文件夹,16文件夹里会放有那些日志文件 需求:现在要按时间每小时读取对应文件夹下的文件 原本用的采集工具是fluentd,它的采集配置本身支持滚动的时间但现在要用flume了,flume本身是不支持父目录里有通配符的,所以我们需要改下源码 fluentd配置如下
<source>
@type tail
path /data/cos_dir/tencentcdnlog/%Y/%m/%d/%H/*
pos_file /home/john/logs/td-agent/john_cdn_log.log.pos
tag john_cdn_log.bar
read_from_head true
enable_watch_timer false
<parse>
@type none
</parse>
</source>
<match john_cdn_log.bar>
@type kafka_buffered
brokers xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092
buffer_type file
buffer_path /home/john/logs/td-agent/buffer/john_cdn_log/td
buffer_chunk_limit 256m
buffer_queue_limit 64
flush_interval 1s
num_threads 2
default_topic john_cdn_log_input
output_data_type attr:message
compression_codec gzip
max_send_retries 1
kafka_agg_max_bytes 1000000
</match>
二、正片(修改TaildirSource源码)
修改完源码后有如下功能: a、支持带时间通配符的目录 b、支持配置初始化时自动创建不存在的父目录
1、下载源码
本次修改的是1.9版本 官网下载源码包
https://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-src.tar.gz
git上下也行
https://github.com/apache/flume
找到taildirSource对应位置
2、分析源码
TaildirSource.java
start()方法,主要做三件事,一、开启一个ReliableTaildirEventReader事件监听者,二、开启个线程去校验文件什么时候关闭,三、开启一个线程去维护position位置文件。 configure(Context context) 负责装载配置文件 process() 负责与channel的交互,把读取到的文件给channel
ReliableTaildirEventReader.java
看构造方法 会在这里创建TaildirMatcher对象,每个父路径一个对象,待会我们重点改它 看updateTailFiles(boolean skipToEnd)方法 这个方法很重要,会遍历每个TaildirMatcher(一个TM相当于一个父目录),更新下面的文件信息 TaildirSource中的process也会调用它
TaildirMatcher.java
看构造方法 这里,每个TaildirMatcher都会保存一个对应的父目录信息,我们要改的也主要是这里 这还有个getMatchingFiles()用来更新文件
3、修改源码
TaildirMatcher.java
主要是在创建TaildirMatcher时,把输入的路径中含有的时间通配符换成了当前时间对应的准确值,由于TaildirMathcer会在一开始判断下配置中给定的文件路径的父目录是否存在,若不存在会报错,而我测试中用的时间目录是精确到分钟的,所以为了我每次启动时不报错,就在配置中加了个配置,让它一开始检测到父目录不存在时可以自动创建父目录
ReliableTaildirEventReader.java
主要的替换操作就在这了,当需要更新文件时,会去判断TaildirMatcher中的父目录parentDir是否是当前时间的目录,如果不是,就会把parentDir替换成当前时间对应的目录
TaildirSource.java
TaildirSourceConfigurationConstants.java
改完后install打包,只取taildirsource包,丢到flume的lib目录下就行了
以上所有操作都是针对带时间通配符的路径的,对其他路径不会有影响,主体思路是比较简单的,就是在每次更新文件的时候去判断对应父目录是否是当前时间的,如果不是那就更新为当前时间的父目录,上面的一些其他代码主要是为了实现我的另外两个操作,一、可以加个配置在开始时自动创建不存在的时间父目录而不报错,二、在时间滚动到下一个时间时,判断下对应父目录是否存在,若不存在则不替换父目录,同时打个不存在的info,但为了避免每5秒的更新都打印这个info我又加了个缓存来记录上一次的路径,从而与当前对比,不同时才打印
三、注意
1、由于父目录会更新,导致position游标文件也会被后面目录覆盖 2、上文提到有个默认5秒的更新文件时间,假设路径中的时间最小精度是分,当父目录滚动时,在前一分钟的最后5秒(不一定是)到下一分钟这段时间的数据是否会丢失还没去验证
四、可能遇到的问题
1、打包时遇到其他组件(flume-ng-morphline-solr-sink)报错,直接从maven中干掉它不打包它就行了 2、打包时遇到checkstyle格式不符合规范报错,vm中加点参数,跳过检查就行了
3、遇到spotbugs代码错误检查报错,找到对应pom.xml稍微提高下上限就行了,因为不一定是有问题的错。
五、其他
自定义channel
为了方便看channel的数据,顺便自定义了个简陋的Chanenl往本地文件写接受到的source数据 pom.xml
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
package com.john.myflumechannel;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.flume.*;
import org.apache.flume.channel.AbstractChannel;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
public class MyFlumeChannel1 extends AbstractChannel {
private static final Logger LOG = LoggerFactory.getLogger(MyFlumeChannel1.class);
String outputPath;
File file;
public void put(Event event) throws ChannelException {
Transaction transaction = null;
try{
transaction = getTransaction();
transaction.begin();
String s = new String(event.getBody());
FileUtils.writeStringToFile(file, s + "\n", true);
if(s.length() > 0){
LOG.info("接收到一条数据:" + s);
}
transaction.commit();
}catch (Exception e){
transaction.rollback();
LOG.info("something is error: " + e.getMessage());
}finally {
transaction.close();
}
}
public Event take() throws ChannelException {
return null;
}
public Transaction getTransaction() {
return new BasicTransactionSemantics() {
@Override
protected void doPut(Event event) throws InterruptedException {
}
@Override
protected Event doTake() throws InterruptedException {
return null;
}
@Override
protected void doCommit() throws InterruptedException {
}
@Override
protected void doRollback() throws InterruptedException {
}
};
}
public void configure(Context context){
String outputPath = context.getString("outputPath");
Preconditions.checkNotNull(outputPath, "outputPath must be set!!");
this.outputPath = outputPath;
file = new File(outputPath);
}
}
打包同样丢到flume的lib下就行了
测试时用配置文件
mytest.sources = r1 r2
mytest.channels = c1 c2
mytest.sources.r1.type = TAILDIR
mytest.sources.r1.channels = c1
mytest.sources.r1.filegroups = f1 f2
mytest.sources.r1.filegroups.f1 = /root/john/%Y/%m/%d/%H/%M/.*
mytest.sources.r1.filegroups.f2 = /root/john/2022/04/02/14/05/.*
mytest.sources.r1.positionFile = /root/john/flume/flume-position/myposition1.json
mytest.sources.r1.batchSize = 100
mytest.sources.r1.initTimeParentDir = true
mytest.channels.c1.type = com.john.myflumechannel.MyFlumeChannel1
mytest.channels.c1.outputPath = /root/john/flume/myChannel/mydata.log
mytest.sources.r2.type = TAILDIR
mytest.sources.r2.channels = c2
mytest.sources.r2.filegroups = f1
mytest.sources.r2.filegroups.f1 = /root/john/flume/testlog/.*
mytest.sources.r2.positionFile = /root/john/flume/flume-position/myposition2.json
mytest.sources.r2.batchSize = 100
mytest.channels.c2.type = com.john.myflumechannel.MyFlumeChannel1
mytest.channels.c2.outputPath = /root/john/flume/myChannel/mydata.log
|