IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka -> 正文阅读

[大数据]kafka

作者:>
# -*- coding:utf-8 -*-
# author: cyz
# time: 2021/1/28 17:51
# https://github.com/dpkp/kafka-python
# https://kafka-python.readthedocs.io/
import os, sys

sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
# os.chdir(os.path.dirname(os.path.abspath(__file__)))


import json
from functools import wraps
from kafka import KafkaProducer, KafkaConsumer, BrokerConnection, KafkaAdminClient, KafkaClient
from kafka.admin.new_topic import NewTopic
import pandas as pd
import threading
import time
from datetime import datetime


#kafka配置
# KAFKA_HOST = "1.1.1.1"
# KAFKA_PORT = 1111
# bootstrap_servers = [f'''{KAFKA_HOST}:{KAFKA_PORT}''']
KAFKA_TOPIC = "abc.efg.topic" #
key = b'spiderResult'  # 键
bootstrap_servers = ["1.1.1.1:1111","1.1.1.2:1111","1.1.1.3:1111"]

# https://github.com/dpkp/kafka-python/issues/1308


class ConnectKafka(object):
    # def __init__(self,host,port,**kwargs):
    #     self.bootstrap_servers = [f'''{host}:{port}''']
    def __init__(self, **kwargs):
        self.bootstrap_servers = bootstrap_servers
        self.compression_type = kwargs.get("compression_type","gzip")
        self.testConnect(is_raise=True)
        self.receive_queue = []

    def connect(self,type:str,**kwargs):
        if type == "admin":
            admin_client = KafkaAdminClient(bootstrap_servers=self.bootstrap_servers)
            return admin_client
        if type == "producer":
            producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers, compression_type=self.compression_type)
            return producer
        if type == "consumer":
            topic = kwargs.get("topic","test")
            consumer_timeout_ms = kwargs.get("consumer_timeout_ms",1000)
            consumer = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers,consumer_timeout_ms=consumer_timeout_ms, auto_offset_reset='latest')
            return consumer
        # if type == "producer-test":
        #     producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers, compression_type=self.compression_type)
        #     return producer

    def send(self,client_id:str,topic:str,source,**kwargs):
        '''
        发送消息
        :param client_id:
        :param topic:
        :param source:
        :param kwargs:
        :return:
        '''
        key = kwargs.get("key",None)
        headers = kwargs.get("headers",None)
        partition = kwargs.get("partition",0)
        timestamp_ms = kwargs.get("timestamp_ms",None)
        if isinstance(source, pd.DataFrame):
            source = source.applymap(lambda x: x.replace("'", '"'))
            source = source.applymap(lambda x: None if x == "" else x)
            if "hashlib_md5" in source.columns:
                source = source.drop(["hashlib_md5"], axis=1)
            values = str(source.to_dict(orient="records"))
        else:
            values = source
        send_message = f'''<qhdataStartInfo>{client_id}</qhdataStartInfo><qhdataInfo>{str(values)}</qhdataInfo>'''.encode()
        conn = self.connect(type="producer")
        # conn = self.connect(type="producer-test")
        future = conn.send(topic, key=key, value=send_message, partition=partition, headers=headers, timestamp_ms=timestamp_ms)
        result = future.get(timeout=100)  # 获取实例信息
        print(f'{datetime.now().strftime("%B Week %w %A: [%Y-%m-%d %H:%M:%S %f]")}, kafka send msg result ->', result)
        time.sleep(1)



    def receive(self,topic,**kwargs):
        consumer_timeout_ms = kwargs.get("consumer_timeout_ms", 10)
        consumer_timeout_ms = consumer_timeout_ms * 1000
        def _receive(topic,consumer_timeout_ms):
            conn = self.connect(type="consumer", topic=topic,consumer_timeout_ms=consumer_timeout_ms)
            for msg in conn:
                # print("get data:", msg.key.decode(), msg.value.decode())
                self.receive_queue.append(msg)
        thread = threading.Thread(target=_receive, args=([topic,consumer_timeout_ms,]))  # 创建一个线程
        thread.start()
        # while thread.is_alive():
        #     pass
            # print(123)
    #     self._flag_thread = thread
    # def _receiveClose(self):
    #     thread = self._flag_thread
    #     thread.join()


    def getKafkaVersion(self,is_print = False):
        client = KafkaClient(bootstrap_servers=self.bootstrap_servers)
        broker_version = client.check_version()
        api_versions = client.get_api_versions()
        if is_print:
            print("broker version: "+str(broker_version))
            print("api version: "+str(api_versions))
        client.close()
        return broker_version, api_versions

    def testConnect(self,is_print = False,is_raise =False):
        error_message = None
        try:
            client = KafkaClient(bootstrap_servers=self.bootstrap_servers)
            client.close()
            return error_message
        except Exception as ex:
            error_message = "ConnectionError: " + str(ex)
            if is_print:
                print(error_message)
            if is_raise:
                raise ConnectionError(str(ex))
            return error_message

    def _createTopics(self,need_create_topics:list):
        '''
        用于创建新主题的类

        :param need_create_topics:list(dict)
           dict:name (string):主题名称
                num_partitions (int):分区数或-1如果指定了copy_assignment)
                replication_factor (int):复制因子 或者-1 (如果指定了eplica赋值)
                replica_assignment (dict of int: [int]):包含分区id和要分配给它的副本的映射。
                topic_configs (dict of str: str):主题的配置键和值的映射。
        :return:True or None:如果成功了返回True,失败则无返回
        '''
        # need_create_topics = [{"name":"test","num_partitions":1,"replication_factor":1}]
        topic_list = []
        for topic in need_create_topics:
            topic_list.append(NewTopic(name=topic.get("name"),
                                       num_partitions=topic.get("num_partitions"),
                                       replication_factor=topic.get("replication_factor"),
                                       replica_assignments=topic.get("replica_assignments"),
                                       topic_configs=topic.get("topic_configs")
                                       )
                              )
        conn = self.connect(type="admin")
        conn.create_topics(new_topics=topic_list)
        return True

    def _deleteTopics(self,need_delete_topics:list):
        '''
        删除主题

        :param need_delete_topics: list,topic的名称(str)的列表
        :return: True or None:如果成功了返回True,失败则无返回
        '''
        self.connect(type="admin").delete_topics(need_delete_topics)
        return True

    def _listTopics(self):
        '''
        查询所有主题名称

        :return: list,主题列表
        '''
        return self.connect(type="admin").list_topics()

    def _describeTopics(self):
        return self.connect(type="admin").describe_topics()

    def _describeCluster(self):
        return self.connect(type="admin").describe_cluster()


#
# # 管理者
# admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
# # admin_client.create_topics(["test"])
# admin_client.delete_topics(["test"])
# admin_client.list_topics()
# admin_client.describe_topics()
# admin_client.describe_cluster()


# 消费者
# consumer = KafkaConsumer('test1',bootstrap_servers=bootstrap_servers)
# # consumer.bootstrap_connected()
#
# for msg in consumer:
#     print("get data:",msg.key.decode(),msg.value.decode())
# consumer.topics() # 查看所有主题
# consumer.subscription()
# consumer.close()


# producer = KafkaProducer(bootstrap_servers=bootstrap_servers, compression_type='gzip')
# producer.send(queue = "test1",key=key, value=send_message,partition = 0)
#
# producer.bootstrap_connected()
# producer.partitions_for("test")
# producer.close()

if __name__ == "__main__":
    print("初始化")
    # ck = ConnectKafka(KAFKA_HOST,KAFKA_PORT)
    ck = ConnectKafka()
    print("获取服务器信息")
    _, _ = ck.getKafkaVersion(is_print=True)


    # print("创建主题")
    # need_create_topics = [{"name":"test1","num_partitions":1,"replication_factor":1}]
    # ck._createTopics(need_create_topics)
    # print("查询主题")
    # print("list topics:")
    # print(ck._listTopics())
    # print("describe topics:")
    # print(ck._describeTopics())
    # print("describe cluster:")
    # print(ck._describeCluster())
    #
    # print("删除主题")
    # need_delete_topics = ["test1"]
    # ck._deleteTopics(need_delete_topics)

    # print("查询主题")
    # print("list topics:")
    # print(ck._listTopics())

    print("开始查询数据")
    topic = "test1"
    # ck.receive(topic,consumer_timeout_ms = 10) # 简易接收
    time.sleep(1)
    client_id = "1"
    source = pd.DataFrame([["value11","value12"],["value21","value22"]],columns=["col","col1"])
    ck.send(client_id, topic, source, key=key)

    # print(ck.receive_queue)
    ck.send(client_id, topic, source, key=key)
    ck.receive(topic, consumer_timeout_ms=10)  # 简易接收
    print(ck.receive_queue)





  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-21 15:32:29  更:2021-08-21 15:34:19 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 18:42:20-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码