上一章获取到了排行的视频信息,并且每小时更新一次,那么这里使用flume监控文件并配置到kafka中。
自用启动代码::
cd soft/kafka_2.12-2.7.0/bin/
启动zookeeper: ./zookeeper-server-start.sh ../config/zookeeper.properties
启动kafka: ./kafka-server-start.sh ../config/server.properties
flume链接kafka: /root/soft/flume-1.8.0/bin/flume-ng agent -c conf/ -n a1 -f /root/soft/flume-1.8.0/conf/flume-2-kafka.conf
测试数据: ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic data --from-beginning
kafkaf服务关闭: /root/soft/kafka_2.12-2.7.0/bin/kafka-server-stop.sh
zookeeper服务关闭: /root/soft/kafka_2.12-2.7.0/bin/zookeeper-server-stop.sh
flume安装
将下载的flume放到指定位置并解压 进入安装目录:输入命令查看安装版本信息:
[root@VM-16-6-centos soft]# ls
apache-flume-1.8.0-bin.tar.gz flume-1.8.0 jdk-8u144-linux-x64.tar.gz kafka_2.12-2.7.0.tgz
data jdk1.8.0_144 kafka_2.12-2.7.0 tmp
[root@VM-16-6-centos soft]# cd flume-1.8.0/
[root@VM-16-6-centos flume-1.8.0]# ls
bin CHANGELOG conf DEVNOTES doap_Flume.rdf docs lib LICENSE NOTICE README.md RELEASE-NOTES tools
[root@VM-16-6-centos flume-1.8.0]# ./bin/flume-ng version
Flume 1.8.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f
Compiled by denes on Fri Sep 15 14:58:00 CEST 2017
From source with checksum fbb44c8c8fb63a49be0a59e27316833d
kafka单机版安装
将下载的Kafka放到指定位置并解压 Kafka自带了zookeeper集群,我这里只有一台机器,直接使用单机版的
cd /usr/local/kafka/config #进入配置目录
vi zookeeper.properties #编辑修改相应的参数
dataDir=/usr/local/kafka/zookeeper #zookeeper #修改数据目录,其他默认
vi server.properties #编辑修改相应的参数
log.dirs=/usr/local/kafka/log/kafka #修改日志存放路径 其他默认
测试一下:
再开一个窗口,进入bin目录启动zookeeper
我这里没有后台启动,因为这样如果出现问题可以看见报错
[root@VM-16-6-centos ~]# cd soft/kafka_2.12-2.7.0/
[root@VM-16-6-centos kafka_2.12-2.7.0]# cd bin/
[root@VM-16-6-centos bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties
再开一个窗口,进入bin目录启动kafka
./kafka-server-start.sh ../config/server.properties
再开一个窗口,进入bin目录查看topic
[root@VM-16-6-centos ~]# cd soft/kafka_2.12-2.7.0/bin/
[root@VM-16-6-centos bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
再开一个窗口,进入bin目录控制台测试
控制台生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
再开一个窗口,进入bin目录,打开消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
配置Flume和kafka
首先再flume的conf目录下新建一个conf文件,里面定义如下东西:
[root@VM-16-6-centos ~]# cd soft/flume-1.8.0/conf/
[root@VM-16-6-centos conf]# vi flume-2-kafka.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = exec # 使用exec模式
a1.sources.r1.command = tail -F -c +0 /root/soft/data/data # 要监控的文件路径
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = data
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 # 这里是zookeeper的服务器地址
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动链接kafka
/root/soft/flume-1.8.0/bin/flume-ng agent -c conf/ -n a1 -f /root/soft/flume-1.8.0/conf/flume-2-kafka.conf
这里出现了一个Info,排查一下发现这个不影响功能
Info: Including Hive libraries found via () for Hive access
+ exec /root/soft/jdk1.8.0_144/bin/java -Xmx20m -cp '/root/soft/flume-1.8.0/conf:/root/soft/flume-1.8.0/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application -n a1 -f /root/soft/flume-1.8.0/conf/flume-2-kafka.conf
打印控制台测试一下数据
进入Kafka的bin目录
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic data --from-beginning
消费到了最新数据
|