今天实验kafka的时候 生产者明明生产了一条数据 但是消费者消费了两条数据查了半天文档之类的还有思路回溯想想 原来是没加消费组 - -
class MyHeartbeatConsumer(object):
def __init__(self, app=None):
self.app = None
self.C_obj = None
if app is not None:
self.init_app(app)
def init_app(self, app):
"""
将flask对象挂在当前对象中
:param app:
:return: 返回值是当前实例化对象,将当前对象反挂在flask对象中
"""
self.app = app
self.C_obj = self.link_kafka_consumer(self)
thread = Thread(target=self.circular_msg, kwargs={"obj": self.C_obj})
thread.setDaemon(True)
thread.setName("辅助管理kafka线程")
thread.start()
return self
@error_function
def link_kafka_consumer(self, *args, **kwargs):
"""
链接kafka的消费者
"""
consumer = KafkaConsumer(
self.app.config.get("KAFKA_HEARTBEAT_TOPIC"),
bootstrap_servers=[self.app.config.get("KAFKA_HEARTBEAT")],
group_id='my-group'
)
return consumer
@error_function
def circular_msg(self, *args, **kwargs):
"""
consumer : 是一个消息队列,当后台有消息时,这个消息队列就会自动增加.所以遍历也总是会有数据,当消息队列中没有数据时,就会堵塞等待消息带来
obj : 当前链接的实体库
"""
obj = kwargs.get("obj")
print("1231312312")
for message in obj:
dic = {}
dic.update(
{
json.loads(message.key): json.loads(message.value)
}
)
print(dic)
大概的意思就是 生产者生产消息会推送到构建kafka时设置的分区里 而我就是两个 没有消费组的话就会共同消费两个分区 设置了消费组则两个分区消费同一份数据
|