IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 大数据入门--Flume(二)agent内部原理与进阶案例 -> 正文阅读

[大数据]大数据入门--Flume(二)agent内部原理与进阶案例

Flume Agent内部原理

架构图

如有问题,欢迎指正,学习中,勿喷!
在这里插入图片描述

主要组件

  • Source
  • Channel Processor
  • Interceptor
  • Channel Selector
    • Replicating Channel Selector (default,类似MQ的订阅模式)
    • Multiplexing Channel Selector(多路复用,需要配合自定义Interceptor添加自定义header,根据配置策略进行channel的选择)
    • Custom Channel Selector
  • Sink Processor
    • Default Sink Processor(只接受一个sink,即Processor下游只能有一个sink)
    • Failover Sink Processor(故障转移,可以配置多个sink,根据优先级进行故障切换,同一时刻只有一个工作者)
    • Load balancing Sink Processor(负载均衡,多个sink共同协作,同一时刻,可以并行工作)
  • Sink

其中黑体部分为重点内容

进阶案例

使用avro对flume进行拓扑连接

复制

练习重点:Channel Selector 复制选择器,也是默认情况
场景案例:
假设我们有三台机器,hadoop102、hadoop102、hadoop103。其中hadoop101需要从本地端口监听数据,并将数据分别下发给hadoop102、hadoop103

所以架构设计图为
在这里插入图片描述
问题:为什么这里我们hadoop101使用了两个Channel??
答:是因为如果我们这里使用的是一个Channel,无法在向下同时推送进入两个sink,因为Sink Processor只有failoverload balancing两种策略是可以接收多个sink的,但是他们又不是复制分发的策略。

hadoop101配置文件

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# 描述Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 4141

# 描述Sink,这里有两个
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4142

a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 4143

# 描述Channel,也是两个,使用内存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# 绑定三者关联
# 此处可以不声明(因为是默认配置),配置r1的channel selector 为复制
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

hadoop102配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述Source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4142

# 描述Sink
a1.sinks.k1.type = logger

# 描述Channel,也是两个,使用内存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# 绑定三者关联
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

hadoop103配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述Source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4143

# 描述Sink
a1.sinks.k1.type = logger

# 描述Channel,也是两个,使用内存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# 绑定三者关联
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

故障转移

考察重点:Sink Processor的策略
案例:需求与上述类似,本次hadoop102、hadoop103不在是接收相同的数据,而是hadoop103作为hadoo102的热备份,当hadoop102宕机时通过hadoop103来接收数据。

此案例只需要修改hadoop101配置

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# 描述Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 4141

# 描述Sink,这里有两个
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4142

a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 4143

# 描述Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 绑定三者关联
# 此处可以不声明(因为是默认配置),配置r1的channel selector 为复制
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

#sink组定义
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
# backoff 最大时间,单位:毫秒
a1.sinkgroups.g1.processor.maxpenalty = 10000

修改内容主要:

  • 去掉一个channel
  • 增加sink 组定义,设置为faliover相关配置

注意:如果hadoop102下线了,那么hadoop103会顶上去,但是当hadoop102再次上线,因为优先级别高所以再次会得到接收权(前提是超过了processor.maxpenalty

负载均衡

考察重点:Sink Processor的策略
案例:这次hadoop103不在是热备份,需要hadoop103和hadoop102一起工作,分担压力

同样的还是只需要修改hadoop101的配置,将组策略调整为load balance
故:只贴出修改的内容

#sink组定义
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
# 负载均衡的策略可以是随机或者轮训:random/round_robin(default)
a1.sinkgroups.g1.processor.selector = random

多路复用

案例:hadoop101负责通过netcat收集数据,每条数据信息的第一个单词如果包含hello字符串,则需要将其sink到haoop102,否则sink到hadoop103。
可以通过channel selector为多路复用+自定义Interceptor

实现自定义Interceptor
1.pomx.xml

<dependency>
   <groupId>org.apache.flume</groupId>
   <artifactId>flume-ng-core</artifactId>
   <version>1.9.0</version>
</dependency>
  1. 实现org.apache.flume.interceptor.Interceptor来实现拦截器逻辑
  2. 实现org.apache.flume.interceptor.Interceptor.Builder来实现创建拦截器实例
  3. 打包jar放入flume/lib下
package com.xbz.study.bigdata.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class StateInterceptor implements Interceptor {
    public String key;
    public StateInterceptor(String key){
        this.key = key;
    }
    @Override
    public void initialize() {
        //初始化准备
    }

    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        try {
            String msg = new String(event.getBody(), "UTF-8");
            String[] split = msg.split("\\s+");
            if(split[0].contains("hello")){
                headers.put(key,"sink1");
            }else{
                headers.put(key,"sink2");
            }
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        list.forEach(this::intercept);
        return list;
    }

    @Override
    public void close() {
        //关闭资源
    }

    public static class Builder implements Interceptor.Builder{
        private String key = null;
        @Override
        public Interceptor build() {
            return new StateInterceptor(this.key);
        }

        @Override
        public void configure(Context context) {
            key = context.getString("key","state");
        }
    }
}

hadoop101配置文件

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# 描述Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 4141

# 描述Sink,这里有两个
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4142

a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 4143

# 描述Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100


#定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.xbz.study.bigdata.flume.StateInterceptor$Builder
a1.sources.r1.interceptors.i1.key = state

# 绑定三者关联
## channel selector 多路复用
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.channels = c1 c2
## 根据header的state字段进行分发
a1.sources.r1.selector.header = state
## header中state = sink1 分发到c1
a1.sources.r1.selector.mapping.sink1 = c1
a1.sources.r1.selector.mapping.sink2 = c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

聚合

分别通过hadoop101和hadoop102进行netcat数据收集,然后通过hadoop103进行数据的聚合
hadoop103可选则是从一个Source进行收集还是两个Source进行收集,这里我们选择使用一个sink进行收集

hadoop101配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 4141

# 描述Sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4143

# 描述Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 绑定三者关联
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

hadoop102配置

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4142

# 描述Sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4143

# 描述Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 绑定三者关联
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

hadoop102配置

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述Source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4143

# 描述Sink
a1.sinks.k1.type = logger

# 描述Channel,也是两个,使用内存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 绑定三者关联
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-01 14:34:51  更:2021-08-01 14:35:26 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/3 5:42:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码