一般项目中用到kafka,要么是本系统进行数据推送(这个比较好检查)
要么是本系统对接其他系统数据,而对接的其他系统数据,可能是直接落库,也可能是进行加工处理后落库 因为上游的数据可能是有多个下游在使用,不是每条收到的数据都是本系统需要的,所以需要对数据进行有效性过滤,然后在对数据进行加工或落库
今天有个想法,想通过类似sql的where 条件对消费到的数据进行筛选,有了大概思路后 写了个DEMO(核心是jsonpath和eval),后面有时间再逐渐完善
也分享一下kafka公共类的一个结构,主要是分享下思路(怎么连接kafka可以百度一下)
import jsonpath
class KafkaOperator():
def __init__(self,集群名,环境):
初始化连接信息
建议将连接信息配置到某处,这里入参为集群名和环境,根据集群名+环境拿连接信息
try:
获取连接信息
except:
提示环境信息未配置,需配置环境信息获通过setConnectInfo设置连接信息
def setConnectInfo(self,具体的连接信息):
设置连接信息
def comsumer():
连接消费者
self.consumer = xxx
def procedure():
连接生产者
self.procedure = xxx
def getData(self,filter = "",filePath=None):
for msg in self.consumer:
data = msg.value
data = dataFilter(eval(str(data,'utf-8')),filter)
print(data)
if filePath:
添加数据到指定文件
def dataFilter(self,data,filter=""):
filter = filter.replace('=', '==')
if filter:
if 'and' in filter:
filter = filter.split('and')
filter_sentence = dealFilter(filter, 'and')
else:
filter = filter.split('or')
filter_sentence = dealFilter(filter, 'and')
if eval(filter_sentence):
return data
else:
return data
def dealFilter(self,filters,connectTyp):
sentence = ''
for n,f in enumerate(filters):
idx = f.find(' ')
col = f[:idx]
condition = f[idx:]
tmp = "jsonpath.jsonpath(data,'$..{0}')[0]".format(col) + ' '+ condition
if n != 0:
sentence += connectTyp
sentence += tmp
return sentence
|