目录结构
mysql.py
# -*- coding: utf-8 -*-
import pymysql
import pymysql.cursors
import time
from config import Config
class MySQL(object):
_instance = None
_connection = None
def __init__(self):
pass
@classmethod
def get_instance(self):
#print("MySQL Connection Start\t%s" % ((time.strftime('%Y-%m-%d %X', time.localtime(time.time()))),))
try:
connection = pymysql.connect(host=Config.MYSQL_HOST, port=Config.MYSQL_PORT, user=Config.MYSQL_USER,
passwd=Config.MYSQL_PASSWORD, db=Config.MYSQL_DB, charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor)
#print("MySQL Connection Finished\t%s" % ((time.strftime('%Y-%m-%d %X', time.localtime(time.time()))),))
connection.autocommit(1)
self._connection = connection
self._instance = connection.cursor()
except pymysql.Error as e:
print("MySQL Connection Failed\t%s" % ((time.strftime('%Y-%m-%d %X', time.localtime(time.time()))),))
return self._instance, self._connection
@classmethod
def del_instance(self):
self._instance.close()
self._instance = None
?db.py
import time, datetime
import json
import celery
from schedule.__init__ import celery_job
from datetime import date, datetime, timedelta
from service.dbMgr import DbMgr
#celery 方法
@celery_job.task
def add(v,celerytype): #入口
dbMgr = DbMgr()
if celerytype=="1":
p=dbMgr.insert_1(v)
elif celerytype=="2":
p=dbMgr.insert_2(v)
return p
def git_time():
number_time = time.strftime('%Y%m%d', time.localtime())
number_time = time.strptime(number_time, "%Y%m%d")
number_time = str(time.mktime(number_time))
number_time = number_time[:-2]
if_time = int(number_time) + 60
return if_time
aes.py
from Crypto.Cipher import AES
from binascii import b2a_hex, a2b_hex
import time
from config import Config
import json
import hashlib
#pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pycryptodome 清华镜像
#pip install -i https://mirrors.aliyun.com/pypi/simple pycrypto
# 如果text不足十六位的倍数用空格补充
def add_to_16(text):
if len(text.encode('utf-8')) % 16:
add = 16 - (len(text.encode('utf-8')) % 16)
else:
add = 0
text = text + '\0' * add
return text
# 加密
def encrypt():
t = time.time()
t=int(round(t * 1000))#毫秒级
signstr=Config.APPID+"-"+Config.AGENTKEY+"-"+str(t)
sign=hashlib.sha1(signstr.encode("utf-8"))
sign=sign.hexdigest()
text=json.dumps({"appId":Config.APPID,"agentKey":Config.AGENTKEY,"timeStamp":str(t),"sign":sign})
cryptos = AES.new(Config.AGENTKEY.encode('utf-8'),AES.MODE_CBC,Config.IV.encode('utf-8'))
#且Block大小为16Byte. 加密的key大小为:16,24,32,对应到128bit, 192bit, 256bit加密
text=pkcs5Pad(text,16)
cipher_text = cryptos.encrypt(text.encode('utf-8'))
# 因为AES加密后的字符串不一定是ascii字符集的,输出保存可能存在问题,所以这里转为16进制字符串
return cipher_text#b2a_hex(cipher_text)
# 解密后去掉空格
def decrypt(text):
key = '9999999999999999'.encode('utf-8')
mode = AES.MODE_CBC
iv = b'qqqqqqqqqqqqqqqq'
cryptos = AES.new(key, mode, iv)
plain_text = cryptos.decrypt(a2b_hex(text))
return bytes.decode(plain_text).rstrip('\0')
def pkcs5Pad(text, blocksize):
#16分组数据长度,填充补齐最后一块数据,
#例如需要补5个字节,在后面填充5个\x05
#补12个字节则填充12个\x0c
pad = blocksize - (len(text) % blocksize)
pk=""
n=chr(pad)
for k in range(pad):
pk=pk+n
return text + pk
dbMgr.py
import sys
import time
import json
import re
import datetime
import traceback
import redis
sys.path.append("..")
from libraries.mysql import MySQL
import pymysql
from config import Config
import math
import numpy as np
import hashlib
class DbMgr(object):
cursor = None
connection = None
# redis_time = datetime.datetime.now().strftime('%Y-%m-%d')#时间字符串
# timeArray = time.strptime(redis_time, "%Y-%m-%d")#时间数组
# timeStamp = int(time.mktime(timeArray))#时间戳
def timeStamp(self):
redis_time = datetime.datetime.now().strftime('%Y-%m-%d')#时间字符串
timeArray = time.strptime(redis_time, "%Y-%m-%d")#时间数组
timeStamp = int(1616688000)#int(time.mktime(timeArray))#时间戳
return timeStamp
def timeStampms(self):
redis_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')#时间字符串
timeArray = time.strptime(redis_time, "%Y-%m-%d %H:%M:%S")#时间数组
timeStamp = int(1616688000)#int(time.mktime(timeArray))#时间戳
return timeStamp
def __init__(self):
self.cursor, self.connection = MySQL.get_instance()
def reconnect(self):
self.cursor, self.connection = MySQL.get_instance()
def 1(self,content):
receive_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
sql = "sql语句"
r = redis.StrictRedis(host=Config.REDIS_HOST, port=Config.REDIS_PORT, db=13)
sign=hashlib.md5(content['123'].encode("utf-8"))
sign=sign.hexdigest()
status=r.exists("STATUS_HOTEL_SUCCESS_"+sign)
if status:
return 3 #订单已经入库
try:
self.cursor.execute(sql)
self.connection.commit()
r.set("STATUS_HOTEL_SUCCESS_"+sign,sql)
if r.exists("STATUS_HOTEL_FAIL_"+sign):
r.delete("STATUS_HOTEL_FAIL_"+sign)
return 1 #入库成功
except Exception as e:
self.reconnect()
message=str(e)
if message.find("Duplicate entry"):
r.set("STATUS_HOTEL_SUCCESS_"+sign,sql)
r.delete("STATUS_HOTEL_FAIL_"+sign)
return 3 #订单已经入库
else:
r.set("STATUS_HOTEL_FAIL_"+sign,sql+str(e))
return 2 # 入库失败
def 2(self,content):
receive_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
orderItems_str=json.dumps(content['orderItems'])
orderItems=content['orderItems']
sql = "sql语句"
query = None
r = redis.StrictRedis(host=Config.REDIS_HOST, port=Config.REDIS_PORT, db=13)
sign=hashlib.md5(content['456'].encode("utf-8"))
sign=sign.hexdigest()
status=r.exists("STATUS_ORDER_SUCCESS_"+sign)
if status:
return 3 #订单已经入库
try:
self.cursor.execute(sql)
self.connection.commit()
r.set("STATUS_ORDER_SUCCESS_"+sign,sql)
if r.exists("STATUS_ORDER_FAIL_"+sign):
r.delete("STATUS_ORDER_FAIL_"+sign)
return 1 #入库成功
except Exception as e:
self.reconnect()
message=str(e)
if message.find("Duplicate entry"):
r.set("STATUS_ORDER_SUCCESS_"+sign,sql)
r.delete("STATUS_ORDER_FAIL_"+sign)
return 3 #订单已经入库
else:
r.set("STATUS_ORDER_FAIL_"+sign,sql+message)
return 2 # 入库失败
config.py
from datetime import timedelta
class Config(object):
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'root'
MYSQL_DB = 'test'
URL=""
APPID=""
AGENTID=""
AGENTKEY=""
IV=""
REDIS_HOST='127.0.0.1'
REDIS_PORT=6379
REDIS_DB=4
REDIS_PASSWORD=''
ES_HOST='127.0.0.1:9200'
BROKER_URL = 'redis://:@127.0.0.1/14' # 使用Redis作为消息代理测试用7 线上用11
CELERY_RESULT_BACKEND = 'redis://:@127.0.0.1:6379/15' # 把任务结果存在了Redis测试用8 线上用12
CELERY_TASK_SERIALIZER = 'json' # 任务序列化和反序列化使用msgpack方案
CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 1 # 任务过期时间
CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型
CELERYD_POOL = 'gevent'
# # 配置定时任务的调度器
# CELERYBEAT_SCHEDULE={
# # 任务名字
# 'insert':{
# # 任务启动的函数
# 'task':'schedule.insert.ds',
# # 定时时间设置,每10秒一次
# 'schedule':timedelta(seconds=10),
# # 传递的参数
# #'args':(2,8)
# }
# }
insert.py
#!/usr/bin/env python
# encoding: utf-8
import requests
from config import Config
from service.aes import encrypt
from schedule.db import add
import time
import json
import hashlib
import base64
import requests
import math
from pymysql import NULL
from service.dbMgr import DbMgr
from celery.result import AsyncResult
while True:
#任务分发
v={"id":"123","name":"你好"}
task=add.delay(v,"")#异步队列
print("完成...")
time.sleep(180)#缓冲3分钟
包
? ? pip ?install celery
? ? pip install redis
? ? pip install event
? ? pip install requests
? ? pip install datetime
? ? pip install pymysql
? ? pip install psutil
前台执行celery 服务
celery -A schedule worker --loglevel=info -c 100 -P gevent
sudo nohup python insert.py>insert.log?2>&1 &
后台执行celery
sudo nohup celery -A schedule worker --loglevel=info -c 100 -P gevent -E>celery.log?2>&1 &
sudo nohup python insert.py>insert.log?2>&1 &
|