大数据课程——Flume综合运用
实验内容以及要求
如下图所示,在某一实际应用中,有一个的数据源(可用Source类型为Exec Source或NetCat Source的Agent a1来用模拟),为方便后期数据分析,需要记录事件的产生IP、时间(格式:年月日时分秒)以及事件类型(事件类型根据事件Body中包含WARNING:、ERROR:、INFO:来确定为WARNING、ERROR、INFO,如不包含,则无需记录事件类型),事件经处理后汇总到Agent a2。Agent a2根据事件类型,将事件分别送入不同通道进行下一步处理,并将事件类型为WARNING、ERROR写入Kafka集群,并最终被消费者和Agent a3进行消费。 请按照上述应用场景,部署Flume和Kafka,并完成相应的配置。
问题总结
本次实验其实不算非常成功,主要问题在于Agent2的选择器没能起作用,不能成功根据Header中的EventType对消息进行分流。最终找了很久也没找出原因。我自己的测试过eventType是否成功被写入Header中,但测试结果是肯定的,已经成功把eventType写入了event的Header中了。而且配置文件也检查了很多遍,没发现问题,运行时也没报错。但就是不知道为什么选择器不能成功发挥作用。
实验步骤
Centos01(Agent3)配置
Centos01作为Agent3,负责从Kafka集群里获取数据写入HDFS中。因此Source写Kafka,Sink写HDFS。
Centos02(Agent2)配置
Centos02作为Agent2,负责从Agent1接受数据,并且设置选择器,将接收到的数据进行分流,eventType为NULL或INFO的数据直接输出到Console;为ERROR或WARNING的数据输出到Kafka集群中。因此Source设置为Avro,设置selector;Sink设置为Kafka。
Centos03(Agent1)配置
Centos03作为Agent1,负责监控data.log,产生消息。并且设置拦截器,加入timestamp、host、eventType拦截器。使得从event的body中提取出消息类型,加入到header中,给body加入时间戳和Host信息等。最后发送给Agent02进行消息分流。因此,Source设置为exec,Sink设置为Avro。
运行展示
Agent1产生消息,写入带有eventType的消息。如果某条消息不带有eventType的话,则默认消息类型为NULL。 由于Agent1有Host拦截器,所以会在event的header中添加Host信息;有Timestamp拦截器,所以会在header中添加时间戳信息;有自定义的EventType拦截器,能从消息中提取出消息类型eventType,并且往header中添加eventType,并且从header中提取出Host和时间戳信息,添加到event的Body中。
在Agent2上如果开启了Kafka消费者的话,可以看到event的body发生了改变,除了原有信息外,还被添加了host、timestamp和eventType信息,说明Agent1的拦截器成功生效。并且这些信息会被Agent2进行分流:消息类型为NULL或INFO的信息被直接输出到console上;而消息类型为ERROR或WARNING的信息会被写入到Kafka的topictest话题中。
在Agent3上,Agent3会从Kafka的topictest话题中获取信息,并将这些信息全部写入到HDFS中进行存储,如上图所示。 只是很可惜,我不知道为什么我这里Agent2没能成功对消息进行分流,全部都被写入到了HDFS当中。
|