flume HA部署环境
- 使用的基础环境版本:CentOS7、hadoop-3.2.2、flume-1.9.0、zookeeper-3.6.2、jdk1.8.0
- hadoop部署请看《hadoop3.2.2集群搭建》
- 虚拟机列表及角色部署
主机名 | IP | 角色 |
---|
node1 | 192.168.56.114 | agent | node1 | 192.168.56.115 | agent, collector | node3 | 192.168.56.116 | collector |
flume配置
- 解压flume的安装包,修改${FLUME_HOME}/conf/flume-env.sh文件,如下:
[root@node1 opt]
[root@node1 conf]
[root@node1 conf]
export JAVA_HOME=/opt/jdk
修改JAVA_HOME路径。配置完成后,把flume的安装文件复制到node2、node3各一份。 执行如下命令:
[root@node1 flume-1.9.0]
[root@node1 opt]
[root@node1 opt]
配置agent的配置文件,如下:
[root@node1 flume-1.9.0]
[root@node1 flume-1.9.0]
a1.channels=c1
a1.sources=r1
a1.sinks=k1 k2
a1.sinkgroups=g1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sources.r1.channels=c1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F /opt/flume-1.9.0/test.log
a1.sinks.k1.channel=c1
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=node2
a1.sinks.k1.port=12345
a1.sinks.k2.channel=c1
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=node3
a1.sinks.k2.port=12345
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=failover
a1.sinkgroups.g1.processor.priority.k1=10
a1.sinkgroups.g1.processor.priority.k2=1
a1.sinkgroups.g1.processor.maxpenalty=10000
复制agent1.properties文件到node2一份,执行如下命令:
[root@node1 opt]
node2和node1的agent在当前模式下配置完全一样,不用修改。
[root@node2 flume-1.9.0]
[root@node2 flume-1.9.0]
a2.sources=r1
a2.channels=c1
a2.sinks=k1
a2.channels.c1.type=memory
a2.channels.c1.capacity=1000
a2.channels.c1.transactionCapacity=100
a2.sources.r1.type=avro
a2.sources.r1.bind=node2
a2.sources.r1.port=12345
a2.sources.r1.channels=c1
a2.sinks.k1.type=hdfs
a2.sinks.k1.hdfs.path=hdfs://mycluster/flume/input/%Y-%m-%d
a2.sinks.k1.hdfs.fileType=DataStream
a2.sinks.k1.hdfs.writeFormat=Text
a2.sinks.k1.hdfs.minBlockReplicas=1
a2.sinks.k1.hdfs.rollInterval=0
a2.sinks.k1.hdfs.hdfs.rollCount=0
a2.sinks.k1.hdfs.rollSize=67108864
a2.sinks.k1.hdfs.useLocalTimeStamp=true
a2.sinks.k1.channel=c1
注意:下面三个配置项,以免生成过多的小文件,如果使用rollSize滚动文件,其他两项设置成0;依据场景来配置。
Name | Default | Description |
---|
hdfs.rollInterval | 30 | 依据时间间隔来触发文件的滚动 (0:不根据时间间隔滚动 ) | hdfs.rollCount | 10 | 依据接收事件数目来触发文件的滚动(0:不根据事件数滚动) | hdfs.rollSize | 1024 | 依据文件大小来触发文件的滚动,以字节为单位(0:不根据文件大小滚动) |
注意:hdfs.useLocalTimeStamp配置项,默认值是false,改成true,不然启动的时候可能会报异常:
2021-07-22 11:18:23,822 (New I/O worker
2021-07-22 11:18:23,852 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:459)] process failed
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:251)
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:460)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:379)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
2021-07-22 11:18:23,853 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:251)
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:460)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:379)
... 3 more
复制配置文件collector.properties到node3:
[root@node2 opt]
[root@node3 opt]
[root@node3 myconf]
a2.sources=r1
a2.channels=c1
a2.sinks=k1
a2.channels.c1.type=memory
a2.channels.c1.capacity=1000
a2.channels.c1.transactionCapacity=100
a2.sources.r1.type=avro
a2.sources.r1.bind=node3
a2.sources.r1.port=12345
a2.sources.r1.channels=c1
a2.sinks.k1.type=hdfs
a2.sinks.k1.hdfs.path=hdfs://mycluster/flume/input/%Y-%m-%d
a2.sinks.k1.hdfs.fileType=DataStream
a2.sinks.k1.hdfs.writeFormat=Text
a2.sinks.k1.hdfs.minBlockReplicas=1
a2.sinks.k1.hdfs.rollInterval=0
a2.sinks.k1.hdfs.rollCount=0
a2.sinks.k1.hdfs.rollSize=67108864
a2.sinks.k1.hdfs.useLocalTimeStamp=true
a2.sinks.k1.channel=c1
修改主机名即可,修改成本机的主机名。 到此配置文件就完成。
启动flume-HA
- 先启动node2和node3的collector,执行如下命令:
[root@node2 flume-1.9.0]
[root@node3 flume-1.9.0]
- 然后启动node1和node2的agent,命令如下:
[root@node1 flume-1.9.0]
[root@node2 flume-1.9.0]
已经启动完成!
测试
在node1的$FLUME_HOME创建文件test.log,添加一下日志,如下:
[root@node1 flume-1.9.0]
2021-07-22 15:05:59,737 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k2 started
2021-07-22 15:05:59,737 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:212)] Rpc sink k2: Building RpcClient with hostname: node3, port: 12345
2021-07-22 15:05:59,737 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:113)] Attempting to create Avro Rpc client.
2021-07-22 15:05:59,737 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.api.AbstractRpcClient.parseBatchSize(AbstractRpcClient.java:75)] Batch size string = null
2021-07-22 15:05:59,738 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:594)] Using default maxIOWorkers
2021-07-22 15:05:59,746 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:237)] Rpc sink k2: Created RpcClient: NettyAvroRpcClient { host: node3, port: 12345 }
查看hdfs文件,如下图: 结束一个agent或者collector进程,测试高可用;比较简单都是成功的,不再赘述了。
|