?
创建kafka主题
#启动kafka服务
kafka-server-start.sh /opt/software/kafka280cala212/conf/kraft/server.properites
#创建主题
#topic主题名test01
#partitions分区数1
#replication-factor备份数量1
kafka-topics.sh --create --topic test01 --partitions 1 --replication-factor 1 --botstrap-server 192.168.131.200:9092
#查看主题
kafka-topics.sh --list --bootstrap-server 192.168.131.200:9092
创建flume配置文件(采用KafkaSink作为kafka生产者)
#创建并编辑文件名为flume_kafka01.conf配置文件
vim /root/flume/flume_kafka01.conf
#创建flume 的三大组件sources channels sinks
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#这里选用的是taildir类型的source,支持断点续采
a1.sources.s1.type = taildir
#需要侦听的文件,支持多目录侦听
a1.sources.s1.filegroups = f1
#侦听前缀为prolog的文件
a1.sources.s1.filegroups.f1 = /root/flume_log/prolog*
#断点记录保存文件路径
a1.sources.s1.positionFile = /opt/software/fluem190/data/taildir/tail_prolog_01.json
#设置采集批量
a1.sources.s1.batchSize = 10
a1.channels.c1.type = file
a1.channels.c1.file.checkpointDir = /opt/software/flume190/mydata/checkpoint04
a1.channels.c1.file.capacity = 1000
a1.channels.c1.file.transactionCapacity = 100
#transactionCapacity 默认值为100,且必须大于100
#transactionCapacity >= batchSize
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 192.168.131.200:9092
a1.sinks.k1.kafka.topic = test01
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.linger.ms = 500
a1.sinks.k1.kafka.acks = 1
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
创建flume配置文件(采用KafkaSource作为kafka消费者)
vim /root/flume/kafka_flume01.conf
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.batchSize = 10
a1.sources.si.batchDurationMillis = 2000
a1.sources.s1.kafka.bootstrap.server = 192.168.131.200:9092
a1.sources.s1.topics = test01
a1.sources.s1.kafka.consumer.groupid = first_test
a1.sources.s1.kafka.consumer.auto.offset.reset = earliest
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume190/mydata/checkpoint05
a1.channels.c1.file.dataDirs = /opt/software/flume190/mydata/data
a1.channels.c1.capaticy = 1000
a1.channels.c1.transactionCapacity = 10
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /kafka_flume/log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
sinks.k1.hdfs.roundUnit = minute
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
启动flume消费者
flume-ng agent -n a1 -c conf/ -f /root/flume/kafka_flume01.conf -Dflume.root.logger=INFO,console
启动flume生产者
flume-ng agent -n a1 -c conf/ -f /root/flume/flume_kafka02.conf -Dflume.root.logger=INFO,console
启动控制台kafka消费者
kafka-console-consumer.sh --bootstrap-server test:9092 --from-beginning --topic kb12_01 --property print.key=true --key-deserializer org.apache.kafka.common.serialization.LongDeserializer --value-deserializer org.apache.kafka.common.serialization.StringDeserializer
|