kafka2.12-2.20包下载地址:链接
本文主要是kafka在两台机器中的消息传输
kafka在需要链接zk的场合,有两种方式 1、使用kafka自带的zk 2、下载zk部署 使用的过程都差不多,这里就用自带的zk举例,首先查看配置文件,主要使用的是server.properties(kafka启动的配置文件)和zookeeper.properties(zk启动的配置文件)  接下来是简单使用,zk的配置文件主要就是clientPort,指定zk的端口,用于链接kafka  然后是kafka的配置文件,主要是几个地方注意,使用最易懂的方式理解
 
broker.id 可以看做是链接两端的id,一个会话只能一个,要是启动遇到/brokers/ids的报错,可以使用下面的解决办法
如下图bin下有一个zookeeper-shell.sh,./zookeeper-shell.sh ip:port 进入zk,rmr /brokers/ids/<id>删除后重启kafka
如果是自己部署的zk,可以使用zkcli进入,之后的命令一样

listeners=PLAINTEXT://127.0.0.1:9092 #kafka链接开启的地址端口
advertised.listeners=PLAINTEXT://your.host.name:9092 #和上面不同的是,这个可以链接到其他机器的kafka
zookeeper.connect=127.0.0.1:2181 #链接的zk地址,需要zk先启动
接下来启动
先启动zk bin/zookeeper-server-start.sh conf/zookeeper.properties
在启动kafka bin/kafka-server-start.sh conf/server.properties
根据报错情况,观察zk是否启动正常,kafka链接zk是否正常
例举几个简单测试操作, 关于消费者命令的详细解释可参考https://blog.csdn.net/qq_29116427/article/details/80206125 关于生产者命令的详细解释可参考https://blog.csdn.net/qq_29116427/article/details/105912397
bin/kafka-topics.sh --list --bootstrap-server localhost:9192 #列出topic
bin/kafka-console-producer.sh --broker-list localhost:9192 --topic test #开启消费者端
bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic test --from-beginning#开启消费者端,--from-begin查看从开始的所有收到信息
接下来操作使用python在两台机器间直接链接kafka发送接收消息 这里我们将一台机器(客户端)server.proproties的配置文件从listeners注释,使用advertised.listeners,地址指向另一台机器(服务端)的地址,如192.168.173.2:9192,其他配置不变,重启kafka,重启过程可能会报/brokers/ids的错,根据上面的方法解决。 下面放一下python链接kafka的代码,
首先下载kafka库导入from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='192.168.79.131:9192')
topic = 'test'
producer.send(topic, j.encode())#注意需要转码才能发送
在另一台机器接收
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
bootstrap_servers=['127.0.0.1:9092'],
auto_offset_reset='latest',# 消费kafka中最近的数据,如果设置为earliest则消费最早的数据,不管这些数据是否消费
)
for msg in consumer:
print (msg)

python链接kafka还有其他许多参数,更详细的可百度或参考官方文档
|