首先安装kafka:
pip install kafka-python
github页面:https://github.com/dpkp/kafka-python
文档位置:https://kafka-python.readthedocs.io/en/master/
准备运行环境
首先需要启动zookeeper与kafka:
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties
如果要后台运行zookeeper与kafka,可以使用:
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/kafka-server-start.sh -daemon config/server.properties
示例代码
注意:要先运行消费者,再运行生产者;或者运行消费者的时候,启动生产者的程序
生产者代码:
from kafka import KafkaProducer
import datetime
import json
producer = KafkaProducer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
my_topic = "python_test"
for i in range(5):
data = {'num': i, 'data': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
producer.send(my_topic, json.dumps(data).encode('utf-8')).get(timeout=30)
消费者使用:
from kafka import KafkaConsumer
my_topic = "python_test"
consumer = KafkaConsumer(my_topic, bootstrap_servers='localhost:9092', auto_offset_reset='latest',
api_version=(0, 10, 2))
for msg in consumer:
print(msg.value)
注意:
|