IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【Python】【自动化测试】kafka公共类的大概思路和数据过滤 -> 正文阅读

[大数据]【Python】【自动化测试】kafka公共类的大概思路和数据过滤

一般项目中用到kafka,要么是本系统进行数据推送(这个比较好检查)

要么是本系统对接其他系统数据,而对接的其他系统数据,可能是直接落库,也可能是进行加工处理后落库
因为上游的数据可能是有多个下游在使用,不是每条收到的数据都是本系统需要的,所以需要对数据进行有效性过滤,然后在对数据进行加工或落库

今天有个想法,想通过类似sql的where 条件对消费到的数据进行筛选,有了大概思路后 写了个DEMO(核心是jsonpath和eval),后面有时间再逐渐完善

也分享一下kafka公共类的一个结构,主要是分享下思路(怎么连接kafka可以百度一下)

import jsonpath

class KafkaOperator():

    def __init__(self,集群名,环境):
        初始化连接信息
        建议将连接信息配置到某处,这里入参为集群名和环境,根据集群名+环境拿连接信息
        try:
            获取连接信息
        except:
            提示环境信息未配置,需配置环境信息获通过setConnectInfo设置连接信息

    def setConnectInfo(self,具体的连接信息):
        设置连接信息

    def comsumer():
        连接消费者
        self.consumer = xxx

    def procedure():
        连接生产者
        self.procedure = xxx


    def getData(self,filter = "",filePath=None):
    # 消费kafka数据
    # 如果filter传了筛选条件,就根据过滤条件进行数据过滤
    # 如果传了文件路径,则将消费到的数据保存到该路径
        for msg in self.consumer:
            data = msg.value
            # msg.value是byte,可用str转为utf-8
            data = dataFilter(eval(str(data,'utf-8')),filter)
            print(data)

            if filePath:
                # 通常要保留原始数据,以便追溯问题
                添加数据到指定文件


    def dataFilter(self,data,filter=""):
        # 过滤数据,data需要为json
        # 过滤条件计划的是做成同sql的where条件,例如:filter = "satus in ('A','D') and account='123456'"
        filter = filter.replace('=', '==')
        if filter:
            if 'and' in filter:
                filter = filter.split('and')
                filter_sentence = dealFilter(filter, 'and')
            else:
                filter = filter.split('or')
                filter_sentence = dealFilter(filter, 'and')
            if eval(filter_sentence): # 执行过滤条件
                return data
        else:
            return data


    def dealFilter(self,filters,connectTyp):
    	# 将筛选条件转换为python能够执行的语句
        sentence = ''
        for n,f in enumerate(filters):
            idx = f.find(' ')
            col = f[:idx]
            condition = f[idx:]
            tmp = "jsonpath.jsonpath(data,'$..{0}')[0]".format(col) + ' '+ condition
            if n != 0:
                sentence += connectTyp
            sentence += tmp
        return sentence

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-03 11:59:18  更:2021-09-03 12:00:07 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:13:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码