IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 小白学数据仓库日记day3——flume3 -> 正文阅读

[大数据]小白学数据仓库日记day3——flume3

拦截器

在Flume运行过程中 ,Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的。拦截器有下面几个特点:
- 拦截器需要实现org.apache.flume.interceptor.Interceptor接口。
- 拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件。
- 拦截器采用了责任链模式,多个拦截器可以按指定顺序拦截。
- 一个拦截器返回的事件列表被传递给链中的下一个拦截器。
- 如果一个拦截器需要删除事件,它只需要在返回的事件集中不包含要删除的事件即可。
常用拦截器:
1. Timestamp Interceptor :时间戳拦截器,将当前时间戳(毫秒)加入到events header中,key名字为:timestamp,值为当前时间戳。用的不是很多
2. Host Interceptor:主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为:host(也可自定义)
3. Static Interceptor:静态拦截器,用于在events header中加入一组静态的key和value。

自定义拦截器:

案例:

需求:为了提高Flume的扩展性,用户可以自己定义一个拦截器, 对每一组的item_type和active_time都过滤出相应的HOST和USERID

代码实现

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>org.example</groupId>
    <artifactId>flume</artifactId>
    <version>1.0-SNAPSHOT</version>
 
    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.8.0</version>
        </dependency>
 
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
    </dependencies>
 
</project>
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;

public class MyJSONSplit implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        return null;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> result=new ArrayList<>();
        try {
            for (int i = 0; i < events.size() ; i++) {
                Event event=events.get(i);
                String body=new String(event.getBody(),"utf-8");
                JSONObject jsonObject = JSONObject.parseObject(body);
                String host=jsonObject.getString("host");
                String userID=jsonObject.getString("user_id");
                JSONArray items=jsonObject.getJSONArray("items");
                System.out.println("接收到Event:"+body);
                for (int j = 0; j < items.size() ; j++) {
                    JSONObject itemsJSONObject = items.getJSONObject(j);
                    String itemType=itemsJSONObject.getString("item_type");
                    long activeTime=itemsJSONObject.getLong("active_time");

                    JSONObject converted=new JSONObject();
                    converted.put("host",host);
                    converted.put("user_id",userID);
                    converted.put("item_type",itemType);
                    converted.put("active_time",activeTime);
                    Event newEvent=new SimpleEvent();
                    newEvent.setBody(converted.toJSONString().getBytes());
                    System.out.println("拆分出新的Event:"+converted.toJSONString());
                    result.add(newEvent);
                }
            }
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        return result;
    }

    @Override
    public void close() {

    }
    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new MyJSONSplit();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

使用maven将拦截器打包,然后把此包和依赖的fastjson一起上传到flume lib目录下

编写方案1:

编写 MyJSONSplit 文件
[root@sh01~]# vi MyJSONSplit.conf
#agent.source
a1.sources = r1   #指定source名
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1=/home/flume/data/.*log
a1.sources.r1.fileHeader = true

# 定义拦截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.al.flume.MyJSONSplit$Builder
a1.sources.r1.channels=c1
#agent.channel
# 定义channel名
a1.channels = c1
# or file
a1.channels.c1.type=memory

#agent.sink
a1.sinks=k1
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1

启动agent

[root@sh01 ~]# flume-ng agent -c /usr/local/flume1.8/conf -f /root/flumeconf/MyJSONSplit.conf  -n a1 -Dflume.root.logger=INFO,console#agent.source

测试

注意:测试之前要把fastjson的架包加入到flume 的lib目录下

[root@sh01 ~]# vi my.sh
#!/bin/bash
log='{
"host":"www.baidu.com",
"user_id":"13755569427",
"items":[
    {
        "item_type":"eat",
        "active_time":156234
    },
    {
        "item_type":"car",
        "active_time":156233
    }
 ]
}'
echo $log>> /home/flume/data/test.log

[root@sh01 ~]# bash my.sh

编写方案2:

编写 MyJSONSplit 文件
[root@sh01~]# vi MyJSONSplit.conf
#agent.source
a1.sources = r1   #指定source名
a1.sources.r1.type = http
a1.sources.r1.bind=hadoop00 #绑定ip,在此我做过映射所以可以写成hadoop00
a1.sources.r1.port=6161

# 定义拦截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.al.flume.MyJSONSplit$Builder
a1.sources.r1.channels=c1
#agent.channel
# 定义channel名
a1.channels = c1
# or file
a1.channels.c1.type=memory

#agent.sink
a1.sinks=k1
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1

启动agent

[root@sh01 ~]# flume-ng agent -c /usr/local/flume1.8/conf -f /root/flumeconf/MyJSONSplit.conf  -n a1 -Dflume.root.logger=INFO,console#agent.source

测试

在postman下创建连接以及测试数据

备注:

若出现:

错误,除了json格式不正确以外,还有可能是因为没有导入fastjson架包

注意:由于http协议需要发送请求所以要有headers与body不能直接写json:

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-22 23:00:39  更:2021-07-22 23:00:56 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/28 10:15:18-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码