import os, sys
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
import json
from functools import wraps
from kafka import KafkaProducer, KafkaConsumer, BrokerConnection, KafkaAdminClient, KafkaClient
from kafka.admin.new_topic import NewTopic
import pandas as pd
import threading
import time
from datetime import datetime
KAFKA_TOPIC = "abc.efg.topic"
key = b'spiderResult'
bootstrap_servers = ["1.1.1.1:1111","1.1.1.2:1111","1.1.1.3:1111"]
class ConnectKafka(object):
def __init__(self, **kwargs):
self.bootstrap_servers = bootstrap_servers
self.compression_type = kwargs.get("compression_type","gzip")
self.testConnect(is_raise=True)
self.receive_queue = []
def connect(self,type:str,**kwargs):
if type == "admin":
admin_client = KafkaAdminClient(bootstrap_servers=self.bootstrap_servers)
return admin_client
if type == "producer":
producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers, compression_type=self.compression_type)
return producer
if type == "consumer":
topic = kwargs.get("topic","test")
consumer_timeout_ms = kwargs.get("consumer_timeout_ms",1000)
consumer = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers,consumer_timeout_ms=consumer_timeout_ms, auto_offset_reset='latest')
return consumer
def send(self,client_id:str,topic:str,source,**kwargs):
'''
发送消息
:param client_id:
:param topic:
:param source:
:param kwargs:
:return:
'''
key = kwargs.get("key",None)
headers = kwargs.get("headers",None)
partition = kwargs.get("partition",0)
timestamp_ms = kwargs.get("timestamp_ms",None)
if isinstance(source, pd.DataFrame):
source = source.applymap(lambda x: x.replace("'", '"'))
source = source.applymap(lambda x: None if x == "" else x)
if "hashlib_md5" in source.columns:
source = source.drop(["hashlib_md5"], axis=1)
values = str(source.to_dict(orient="records"))
else:
values = source
send_message = f'''<qhdataStartInfo>{client_id}</qhdataStartInfo><qhdataInfo>{str(values)}</qhdataInfo>'''.encode()
conn = self.connect(type="producer")
future = conn.send(topic, key=key, value=send_message, partition=partition, headers=headers, timestamp_ms=timestamp_ms)
result = future.get(timeout=100)
print(f'{datetime.now().strftime("%B Week %w %A: [%Y-%m-%d %H:%M:%S %f]")}, kafka send msg result ->', result)
time.sleep(1)
def receive(self,topic,**kwargs):
consumer_timeout_ms = kwargs.get("consumer_timeout_ms", 10)
consumer_timeout_ms = consumer_timeout_ms * 1000
def _receive(topic,consumer_timeout_ms):
conn = self.connect(type="consumer", topic=topic,consumer_timeout_ms=consumer_timeout_ms)
for msg in conn:
self.receive_queue.append(msg)
thread = threading.Thread(target=_receive, args=([topic,consumer_timeout_ms,]))
thread.start()
def getKafkaVersion(self,is_print = False):
client = KafkaClient(bootstrap_servers=self.bootstrap_servers)
broker_version = client.check_version()
api_versions = client.get_api_versions()
if is_print:
print("broker version: "+str(broker_version))
print("api version: "+str(api_versions))
client.close()
return broker_version, api_versions
def testConnect(self,is_print = False,is_raise =False):
error_message = None
try:
client = KafkaClient(bootstrap_servers=self.bootstrap_servers)
client.close()
return error_message
except Exception as ex:
error_message = "ConnectionError: " + str(ex)
if is_print:
print(error_message)
if is_raise:
raise ConnectionError(str(ex))
return error_message
def _createTopics(self,need_create_topics:list):
'''
用于创建新主题的类
:param need_create_topics:list(dict)
dict:name (string):主题名称
num_partitions (int):分区数或-1如果指定了copy_assignment)
replication_factor (int):复制因子 或者-1 (如果指定了eplica赋值)
replica_assignment (dict of int: [int]):包含分区id和要分配给它的副本的映射。
topic_configs (dict of str: str):主题的配置键和值的映射。
:return:True or None:如果成功了返回True,失败则无返回
'''
topic_list = []
for topic in need_create_topics:
topic_list.append(NewTopic(name=topic.get("name"),
num_partitions=topic.get("num_partitions"),
replication_factor=topic.get("replication_factor"),
replica_assignments=topic.get("replica_assignments"),
topic_configs=topic.get("topic_configs")
)
)
conn = self.connect(type="admin")
conn.create_topics(new_topics=topic_list)
return True
def _deleteTopics(self,need_delete_topics:list):
'''
删除主题
:param need_delete_topics: list,topic的名称(str)的列表
:return: True or None:如果成功了返回True,失败则无返回
'''
self.connect(type="admin").delete_topics(need_delete_topics)
return True
def _listTopics(self):
'''
查询所有主题名称
:return: list,主题列表
'''
return self.connect(type="admin").list_topics()
def _describeTopics(self):
return self.connect(type="admin").describe_topics()
def _describeCluster(self):
return self.connect(type="admin").describe_cluster()
if __name__ == "__main__":
print("初始化")
ck = ConnectKafka()
print("获取服务器信息")
_, _ = ck.getKafkaVersion(is_print=True)
print("开始查询数据")
topic = "test1"
time.sleep(1)
client_id = "1"
source = pd.DataFrame([["value11","value12"],["value21","value22"]],columns=["col","col1"])
ck.send(client_id, topic, source, key=key)
ck.send(client_id, topic, source, key=key)
ck.receive(topic, consumer_timeout_ms=10)
print(ck.receive_queue)
|