拦截器
在Flume运行过程中 ,Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的。拦截器有下面几个特点: - 拦截器需要实现org.apache.flume.interceptor.Interceptor接口。 - 拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件。 - 拦截器采用了责任链模式,多个拦截器可以按指定顺序拦截。 - 一个拦截器返回的事件列表被传递给链中的下一个拦截器。 - 如果一个拦截器需要删除事件,它只需要在返回的事件集中不包含要删除的事件即可。 常用拦截器: 1. Timestamp Interceptor :时间戳拦截器,将当前时间戳(毫秒)加入到events header中,key名字为:timestamp,值为当前时间戳。用的不是很多 2. Host Interceptor:主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为:host(也可自定义) 3. Static Interceptor:静态拦截器,用于在events header中加入一组静态的key和value。
自定义拦截器:
案例:
需求:为了提高Flume的扩展性,用户可以自己定义一个拦截器, 对每一组的item_type和active_time都过滤出相应的HOST和USERID
代码实现
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flume</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
</dependencies>
</project>
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
public class MyJSONSplit implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
return null;
}
@Override
public List<Event> intercept(List<Event> events) {
List<Event> result=new ArrayList<>();
try {
for (int i = 0; i < events.size() ; i++) {
Event event=events.get(i);
String body=new String(event.getBody(),"utf-8");
JSONObject jsonObject = JSONObject.parseObject(body);
String host=jsonObject.getString("host");
String userID=jsonObject.getString("user_id");
JSONArray items=jsonObject.getJSONArray("items");
System.out.println("接收到Event:"+body);
for (int j = 0; j < items.size() ; j++) {
JSONObject itemsJSONObject = items.getJSONObject(j);
String itemType=itemsJSONObject.getString("item_type");
long activeTime=itemsJSONObject.getLong("active_time");
JSONObject converted=new JSONObject();
converted.put("host",host);
converted.put("user_id",userID);
converted.put("item_type",itemType);
converted.put("active_time",activeTime);
Event newEvent=new SimpleEvent();
newEvent.setBody(converted.toJSONString().getBytes());
System.out.println("拆分出新的Event:"+converted.toJSONString());
result.add(newEvent);
}
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return result;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new MyJSONSplit();
}
@Override
public void configure(Context context) {
}
}
}
使用maven将拦截器打包,然后把此包和依赖的fastjson一起上传到flume lib目录下
编写方案1:
编写 MyJSONSplit 文件
[root@sh01~]# vi MyJSONSplit.conf
#agent.source
a1.sources = r1 #指定source名
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1=/home/flume/data/.*log
a1.sources.r1.fileHeader = true
# 定义拦截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.al.flume.MyJSONSplit$Builder
a1.sources.r1.channels=c1
#agent.channel
# 定义channel名
a1.channels = c1
# or file
a1.channels.c1.type=memory
#agent.sink
a1.sinks=k1
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
启动agent
[root@sh01 ~]# flume-ng agent -c /usr/local/flume1.8/conf -f /root/flumeconf/MyJSONSplit.conf -n a1 -Dflume.root.logger=INFO,console#agent.source
测试
注意:测试之前要把fastjson的架包加入到flume 的lib目录下
[root@sh01 ~]# vi my.sh
#!/bin/bash
log='{
"host":"www.baidu.com",
"user_id":"13755569427",
"items":[
{
"item_type":"eat",
"active_time":156234
},
{
"item_type":"car",
"active_time":156233
}
]
}'
echo $log>> /home/flume/data/test.log
[root@sh01 ~]# bash my.sh
编写方案2:
编写 MyJSONSplit 文件
[root@sh01~]# vi MyJSONSplit.conf
#agent.source
a1.sources = r1 #指定source名
a1.sources.r1.type = http
a1.sources.r1.bind=hadoop00 #绑定ip,在此我做过映射所以可以写成hadoop00
a1.sources.r1.port=6161
# 定义拦截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.al.flume.MyJSONSplit$Builder
a1.sources.r1.channels=c1
#agent.channel
# 定义channel名
a1.channels = c1
# or file
a1.channels.c1.type=memory
#agent.sink
a1.sinks=k1
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
启动agent
[root@sh01 ~]# flume-ng agent -c /usr/local/flume1.8/conf -f /root/flumeconf/MyJSONSplit.conf -n a1 -Dflume.root.logger=INFO,console#agent.source
测试
在postman下创建连接以及测试数据
备注:
若出现:
错误,除了json格式不正确以外,还有可能是因为没有导入fastjson架包
注意:由于http协议需要发送请求所以要有headers与body不能直接写json:
|