IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka消费者——confluent_kafka -> 正文阅读

[大数据]kafka消费者——confluent_kafka

一、版本

kafka:2.5.0
python:3.6.1
confluent-kafka:1.5.0
confluent-avro:1.5.0
avro-python3:1.9.1

二、需求概述

前置条件:使用kafka connect 消费kafka 数据写入hive 表。
前端会有一个写入状态表,告诉我们什么时候写完,但是遇到问题是我们会拉取该状态表然后会再加上配置kafka connect 延迟时间去启动一个处理程序去处理hive表的数据,但由于数据激增最高峰已达到90M/S且都是写入了一个分区中,所以导致数据消费延迟,但work的启动却没能自动适配,导致交付的数据出现缺失问题,所以需要启动一个自定义的kafka 消费者去指定偏移量进行消费,判断消费内容是否消费。判断内容忽略,本篇只介绍打通kafka 消费者部分。

三、解决思路

  1. 使用CMAK rest api去拿到当前每个分区消费的offset信息。
  2. 启动一个消费者
  3. 指定分区offset进行消费

四、执行

由于前端写入使用的是confluent-kafka 的包,指定了avro 格式,所以下游消费也会使用该包
confluent-kafka api文档

4.1 获取每个分区消费的offset信息

config = {
    'cluster_name': "cluster_name",
    'consumer_name': "consumer_name",
    'topic_name': "topic_name",
    'host': "http://cmak",
}
base_url = "{host}/api/status/{cluster_name}/{consumer_name}/{topic_name}/KF/topicSummary".format(
    **config)

def get_partition_offset(base_url):
    r = requests.get(base_url, verify=False)
    response = json.loads(r.content)
    return response.get("partitionOffsets", None)

def get_topic_partition(topic, partition_offsets):
    correct = []
    i = 0
    for latest_offset in partition_offsets:
        correct.append(TopicPartition(topic, i, latest_offset))
        i += 1
    return correct

get_partition_offset 获取每个分区正在消费的offset信息,返回一个list列表!
get_topic_partition 根据对应的topic & partition_offsets 信息封装TopicPartition,用于指定分区偏移量消费

4.2 启动一个消费者

topic = "topic_name"
schema_registry_url = "http://kafka-schema-registry:8081"
kafka_topics = ["topic_name"]
kafka_servers = 'host1:9092, host1:9092, host1:9092'

c = Consumer({
    'bootstrap.servers': kafka_servers,
    'group.id': 'test_custom_cosumer'
})
register_client = CachedSchemaRegistryClient(url=schema_registry_url)
c.subscribe(kafka_topics)
partition_offsets = get_partition_offset(base_url)
topic_partition = get_topic_partition(topic, partition_offsets)

4.3 指定offsets消费

c.assign(topic_partition)

4.4 进行消费

while True:
    try:
        msg = c.poll(10)
    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print('Message Value - ', unpack(msg.value()))
    print('Message Key - ', unpack(msg.key()))
    print('Topic - ', msg.topic())
    print('Pattition - ', msg.partition())
    print('Offset - ', msg.offset())

4.5 完整代码

import struct

import io

import json
import requests
from confluent_kafka import TopicPartition, Consumer
from avro.io import BinaryDecoder, DatumReader
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

MAGIC_BYTES = 0

topic = "topic_name"
schema_registry_url = "http://kafka-schema-registry:8081"
kafka_topics = ["topic_name"]
kafka_servers = 'host1:9092, host2:9092, host3:9092'

config = {
    'cluster_name': "cluster_name",
    'consumer_name': "consumer_name",
    'topic_name': "topic_name2",
    'host': "http://host",
}

base_url = "{host}/api/status/{cluster_name}/{consumer_name}/{topic_name}/KF/topicSummary".format(
    **config)


def get_partition_offset(base_url):
    r = requests.get(base_url, verify=False)
    response = json.loads(r.content)
    print(response)
    return response.get("partitionOffsets", None)


def get_topic_partition(topic, partition_offsets):
    correct = []
    i = 0
    for latest_offset in partition_offsets:
        correct.append(TopicPartition(topic, i, latest_offset))
        i += 1
    return correct


def unpack(payload):
    magic, schema_id = struct.unpack('>bi', payload[:5])
    if magic == MAGIC_BYTES:
        schema = register_client.get_by_id(schema_id)
        reader = DatumReader(schema)
        output = BinaryDecoder(io.BytesIO(payload[5:]))
        content = reader.read(output)
        return content
    else:
        return payload.decode()


c = Consumer({
    'bootstrap.servers': kafka_servers,
    'group.id': 'test_custom_cosumer'
})
register_client = CachedSchemaRegistryClient(url=schema_registry_url)
c.subscribe(kafka_topics)

partition_offsets = get_partition_offset(base_url)
topic_partition = get_topic_partition(topic, partition_offsets)
c.assign(topic_partition)

while True:
    try:
        msg = c.poll(10)
    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print('Message Value - ', unpack(msg.value()))
    print('Message Key - ', unpack(msg.key()))
    print('Topic - ', msg.topic())
    print('Pattition - ', msg.partition())
    print('Offset - ', msg.offset())

c.close()

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-06 11:06:45  更:2022-05-06 11:06:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 8:04:55-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码