安装前的环境准备
- 安装jdk
- 安装zk
- 官网下载kafka的压缩包:http://kafka.apache.org/downloads
- 解压缩至如下路径
/usr/local/kafka/
- 修改配置文件:/usr/local/kafka/kafka2.11-2.4/config/server.properties
broker.id= 0
listeners=PLAINTEXT://10.100.12.246:9092
log.dir=/usr/local/data/kafka-logs
zookeeper.connect= 10.100.12.246:2181
启动kafka服务器
进入到bin目录下。使用命令来启动 先启动zookeeper
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
./kafka-server-start.sh -daemon../config/server.properties
查看进程在不在
server.properties核心配置详解:
使用命令操作kafka
./kafka-topics.sh --create --zookeeper 10.100.12.246:2181 --replication-factor 1 --partitions 1 --topic test
./kafka-topics.sh --list --zookeeper 10.100.12.246:2181
- 产生消息
kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic
./kafka-console-producer.sh --broker-list 10.100.12.246:9092 --topic test
- 消费消息
对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输 出, 默认是消费最新的消息 。使用kafka的消费者消息的客户端,从指定kafka服务器的指定 topic中消费消息。
方式一:从最后一条消息的偏移量+1开始消费
./kafka-console-consumer.sh --bootstrap-server 10.100.12.246:9092 --topic test
方式二:从头开始消费
./kafka-console-consumer.sh --bootstrap-server 10.100.12.246:9092 --from-beginning --topic test
几个注意点: 消息会被存储 消息是顺序存储 消息是有偏移量的 消费时可以指明偏移量进行消费
|