Flume Agent内部原理
架构图
如有问题,欢迎指正,学习中,勿喷! 
主要组件
- Source
- Channel Processor
- Interceptor
- Channel Selector
- Replicating Channel Selector (default,类似MQ的订阅模式)
- Multiplexing Channel Selector(多路复用,需要配合自定义Interceptor添加自定义header,根据配置策略进行channel的选择)
- Custom Channel Selector
- Sink Processor
- Default Sink Processor(只接受一个sink,即Processor下游只能有一个sink)
- Failover Sink Processor(故障转移,可以配置多个sink,根据优先级进行故障切换,同一时刻只有一个工作者)
- Load balancing Sink Processor(负载均衡,多个sink共同协作,同一时刻,可以并行工作)
- Sink
其中黑体部分为重点内容
进阶案例
使用avro对flume进行拓扑连接
复制
练习重点:Channel Selector 复制选择器,也是默认情况 场景案例: 假设我们有三台机器,hadoop102、hadoop102、hadoop103。其中hadoop101需要从本地端口监听数据,并将数据分别下发给hadoop102、hadoop103
所以架构设计图为  问题:为什么这里我们hadoop101使用了两个Channel?? 答:是因为如果我们这里使用的是一个Channel,无法在向下同时推送进入两个sink,因为Sink Processor只有failover 和load balancing 两种策略是可以接收多个sink的,但是他们又不是复制分发的策略。
hadoop101配置文件
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 4141
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4142
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 4143
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
hadoop102配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4142
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
hadoop103配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4143
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
故障转移
考察重点:Sink Processor的策略 案例:需求与上述类似,本次hadoop102、hadoop103不在是接收相同的数据,而是hadoop103作为hadoo102的热备份,当hadoop102宕机时通过hadoop103来接收数据。
此案例只需要修改hadoop101配置
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 4141
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4142
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 4143
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
a1.sinkgroups.g1.processor.maxpenalty = 10000
修改内容主要:
- 去掉一个channel
- 增加sink 组定义,设置为
faliover 相关配置
注意:如果hadoop102下线了,那么hadoop103会顶上去,但是当hadoop102再次上线,因为优先级别高所以再次会得到接收权(前提是超过了processor.maxpenalty )
负载均衡
考察重点:Sink Processor的策略 案例:这次hadoop103不在是热备份,需要hadoop103和hadoop102一起工作,分担压力
同样的还是只需要修改hadoop101的配置,将组策略调整为load balance 。 故:只贴出修改的内容
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
多路复用
案例:hadoop101负责通过netcat收集数据,每条数据信息的第一个单词如果包含hello 字符串,则需要将其sink到haoop102,否则sink到hadoop103。 可以通过channel selector为多路复用+自定义Interceptor
实现自定义Interceptor 1.pomx.xml
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
- 实现
org.apache.flume.interceptor.Interceptor 来实现拦截器逻辑 - 实现
org.apache.flume.interceptor.Interceptor.Builder 来实现创建拦截器实例 - 打包jar放入flume/lib下
package com.xbz.study.bigdata.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class StateInterceptor implements Interceptor {
public String key;
public StateInterceptor(String key){
this.key = key;
}
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
try {
String msg = new String(event.getBody(), "UTF-8");
String[] split = msg.split("\\s+");
if(split[0].contains("hello")){
headers.put(key,"sink1");
}else{
headers.put(key,"sink2");
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
list.forEach(this::intercept);
return list;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
private String key = null;
@Override
public Interceptor build() {
return new StateInterceptor(this.key);
}
@Override
public void configure(Context context) {
key = context.getString("key","state");
}
}
}
hadoop101配置文件
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 4141
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4142
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 4143
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.xbz.study.bigdata.flume.StateInterceptor$Builder
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.sink1 = c1
a1.sources.r1.selector.mapping.sink2 = c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
聚合
分别通过hadoop101和hadoop102进行netcat数据收集,然后通过hadoop103进行数据的聚合 hadoop103可选则是从一个Source进行收集还是两个Source进行收集,这里我们选择使用一个sink进行收集
hadoop101配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 4141
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4143
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
hadoop102配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4142
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4143
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
hadoop102配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4143
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
|