python之kafka消费者——confluent-kafka
一、版本
kafka:2.5.0 python:3.6.1 confluent-kafka:1.5.0 confluent-avro:1.5.0 avro-python3:1.9.1
二、需求概述
前置条件:使用kafka connect 消费kafka 数据写入hive 表。 前端会有一个写入状态表,告诉我们什么时候写完,但是遇到问题是我们会拉取该状态表然后会再加上配置kafka connect 延迟时间去启动一个处理程序去处理hive表的数据,但由于数据激增最高峰已达到90M/S且都是写入了一个分区中,所以导致数据消费延迟,但work的启动却没能自动适配,导致交付的数据出现缺失问题,所以需要启动一个自定义的kafka 消费者去指定偏移量进行消费,判断消费内容是否消费。判断内容忽略,本篇只介绍打通kafka 消费者部分。
三、解决思路
- 使用CMAK rest api去拿到当前每个分区消费的offset信息。
- 启动一个消费者
- 指定分区offset进行消费
四、执行
由于前端写入使用的是confluent-kafka 的包,指定了avro 格式,所以下游消费也会使用该包 confluent-kafka api文档
4.1 获取每个分区消费的offset信息
config = {
'cluster_name': "cluster_name",
'consumer_name': "consumer_name",
'topic_name': "topic_name",
'host': "http://cmak",
}
base_url = "{host}/api/status/{cluster_name}/{consumer_name}/{topic_name}/KF/topicSummary".format(
**config)
def get_partition_offset(base_url):
r = requests.get(base_url, verify=False)
response = json.loads(r.content)
return response.get("partitionOffsets", None)
def get_topic_partition(topic, partition_offsets):
correct = []
i = 0
for latest_offset in partition_offsets:
correct.append(TopicPartition(topic, i, latest_offset))
i += 1
return correct
get_partition_offset 获取每个分区正在消费的offset信息,返回一个list列表! get_topic_partition 根据对应的topic & partition_offsets 信息封装TopicPartition,用于指定分区偏移量消费
4.2 启动一个消费者
topic = "topic_name"
schema_registry_url = "http://kafka-schema-registry:8081"
kafka_topics = ["topic_name"]
kafka_servers = 'host1:9092, host1:9092, host1:9092'
c = Consumer({
'bootstrap.servers': kafka_servers,
'group.id': 'test_custom_cosumer'
})
register_client = CachedSchemaRegistryClient(url=schema_registry_url)
c.subscribe(kafka_topics)
partition_offsets = get_partition_offset(base_url)
topic_partition = get_topic_partition(topic, partition_offsets)
4.3 指定offsets消费
c.assign(topic_partition)
4.4 进行消费
while True:
try:
msg = c.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Message Value - ', unpack(msg.value()))
print('Message Key - ', unpack(msg.key()))
print('Topic - ', msg.topic())
print('Pattition - ', msg.partition())
print('Offset - ', msg.offset())
4.5 完整代码
import struct
import io
import json
import requests
from confluent_kafka import TopicPartition, Consumer
from avro.io import BinaryDecoder, DatumReader
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
MAGIC_BYTES = 0
topic = "topic_name"
schema_registry_url = "http://kafka-schema-registry:8081"
kafka_topics = ["topic_name"]
kafka_servers = 'host1:9092, host2:9092, host3:9092'
config = {
'cluster_name': "cluster_name",
'consumer_name': "consumer_name",
'topic_name': "topic_name2",
'host': "http://host",
}
base_url = "{host}/api/status/{cluster_name}/{consumer_name}/{topic_name}/KF/topicSummary".format(
**config)
def get_partition_offset(base_url):
r = requests.get(base_url, verify=False)
response = json.loads(r.content)
print(response)
return response.get("partitionOffsets", None)
def get_topic_partition(topic, partition_offsets):
correct = []
i = 0
for latest_offset in partition_offsets:
correct.append(TopicPartition(topic, i, latest_offset))
i += 1
return correct
def unpack(payload):
magic, schema_id = struct.unpack('>bi', payload[:5])
if magic == MAGIC_BYTES:
schema = register_client.get_by_id(schema_id)
reader = DatumReader(schema)
output = BinaryDecoder(io.BytesIO(payload[5:]))
content = reader.read(output)
return content
else:
return payload.decode()
c = Consumer({
'bootstrap.servers': kafka_servers,
'group.id': 'test_custom_cosumer'
})
register_client = CachedSchemaRegistryClient(url=schema_registry_url)
c.subscribe(kafka_topics)
partition_offsets = get_partition_offset(base_url)
topic_partition = get_topic_partition(topic, partition_offsets)
c.assign(topic_partition)
while True:
try:
msg = c.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Message Value - ', unpack(msg.value()))
print('Message Key - ', unpack(msg.key()))
print('Topic - ', msg.topic())
print('Pattition - ', msg.partition())
print('Offset - ', msg.offset())
c.close()
|