目录: base.py
import json
import re
from string import Template
def find(data):
if isinstance(data, dict):
data = json.dumps(data)
patter = "\\${(.*?)}"
return re.findall(patter, data)
def replace(ori_data, replace_data):
ori_data = json.dumps(ori_data)
s = Template(ori_data)
return s.safe_substitute(replace_data)
def parse_relation(var, resdata):
if not var:
return resdata
else:
resdata = resdata.get(var[0])
print('resdata=', resdata)
del var[0]
return parse_relation(var, resdata)
if __name__ == '__main__':
ori_data = {"admin-token": "${token}"}
replace_data = {'token': 'ssseeewwwdff'}
print(replace(ori_data, replace_data))
red = find({'id': '${id_name}', "title": 'everthing will be ok', 'alias': '${alias_name}'})
print(red)
mark = {'id': '123', "title": 'everthing will be ok', 'alias': {'567': 'mall'}}
find_value = ["alias",'567']
print(parse_relation(find_value, mark))
settings.py
import os.path
abs_path = os.path.abspath(__file__)
project_path = os.path.dirname(os.path.dirname(abs_path))
_conf_path = project_path + os.sep + 'config'
_log_path = project_path + os.sep + "log"
_report_path = project_path + os.sep + "report"
DB_CONFIG = {
"host": "192.111.11.111",
"user": "root",
"password": "123456",
"database": "test",
"port": 3306,
"charset": "utf8"
}
def get_log_path():
return _log_path
def get_report_path():
return _report_path
def get_config_path():
return _conf_path
class DynamicParam:
pass
if __name__ == '__main__':
print(get_report_path())
logutil.py
import logging
import os
import time
from config.settings import get_log_path
STREAM = True
class LogUtil:
def __init__(self):
self.logger = logging.getLogger("logger")
self.logger.setLevel(logging.DEBUG)
if not self.logger.handlers:
self.log_name = '{}.log'.format(time.strftime("%Y_%m_%d", time.localtime()))
self.log_path_file = os.path.join(get_log_path(), self.log_name)
fh = logging.FileHandler(self.log_path_file, encoding='utf-8', mode='w')
fh.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
fh.setFormatter(formatter)
self.logger.addHandler(fh)
fh.close()
if STREAM:
fh_stream = logging.StreamHandler()
fh_stream.setLevel(logging.DEBUG)
fh_stream.setFormatter(formatter)
self.logger.addHandler(fh_stream)
def log(self):
return self.logger
logger = LogUtil().log()
if __name__ == '__main__':
logger.info('test')
mysqlutil.py
import pymysql
from config.settings import DB_CONFIG
from utils.logutil import logger
class MysqlUtil:
def __init__(self):
self.db = pymysql.connect(**DB_CONFIG)
self.cursor = self.db.cursor(cursor=pymysql.cursors.DictCursor)
def get_fetchone(self, sql):
self.cursor.execute(sql)
return self.cursor.fetchall()
def get_fetchall(self, sql):
self.cursor.execute(sql)
return self.cursor.fetchall()
def sql_execute(self, sql):
try:
if self.db and self.cursor:
self.cursor.execute(sql)
self.db.commit()
except Exception as e:
self.db.rollback()
logger.error("SQL执行错误,已执行回滚")
return False
@staticmethod
def close(self):
if self.cursor is not None:
self.cursor.close()
if self.db is not None:
self.db.close()
readmysql.py
import datetime
from utils.mysqlutil import MysqlUtil
from utils.logutil import logger
mysql = MysqlUtil()
class RdTestcase:
def load_all_case(self, web):
sql = f"select * from test_case_list where web='{web}'"
results = mysql.get_fetchall(sql)
return results
def is_run_data(self, web):
run_list = [case for case in self.load_all_case(web) if case['isdel'] == 1]
return run_list
def loadConfkey(self, web, key):
sql = f"select * from test_config where web='{web}' and key='{key}'"
results = mysql.get_fetchone(sql)
return results
def updateResults(self, response, is_pass, case_id):
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
sql = f"insert into `test_result_record`(case_id,times,response,result) values ('{case_id}','{current_time}','{json.dumps(response, ensure_ascii=False)}','{is_pass}')"
rows = mysql.sql_execute(sql)
logger.debug(sql)
return rows
requestsutil.py
import requests
from utils.logutil import logger
class RequestSend:
def api_run(self, url, method, data=None, headers=None, cookies=None):
res = None
logger.info("请求的url为{},类型为{}".format(url, type(url)))
logger.info("请求的method为{},类型为{}".format(method, type(method)))
logger.info("请求的data为{},类型为{}".format(data, type(data)))
logger.info("请求的headers为{},类型为{}".format(headers, type(headers)))
logger.info("请求的cookies为{},类型为{}".format(cookies, type(cookies)))
if method == "get":
res = requests.get(url, data=data, headers=headers, cookies=cookies)
elif method == "post":
if headers == {"Content-Type": "application/json"}:
res = requests.post(url, json=data, headers=headers, cookies=cookies)
elif headers == {"Content-Type": "application/x-www-urlencoded"}:
res = requests.post(url, data=data, headers=headers, cookies=cookies)
code = res.status_code
cookies = res.cookies.get_dict()
dict1 = dict()
try:
body = res.json()
except:
body = res.text
dict1['code'] = code
dict1['body'] = body
dict1['cookies'] = cookies
return dict1
def send(self, url, method, **kwargs):
return self.api_run(url=url, method=method, **kwargs)
test_run.py
import datetime
import json
import pytest
import commom.base as Base
from utils.logutil import logger
from config.settings import DynamicParam
from utils.readmysql import RdTestcase
from utils.requestsutil import RequestSend
attribute = DynamicParam()
case_data = RdTestcase()
case_list = case_data.is_run_data('dw')
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
class TestApi:
def setup_class(self):
logger.info(f"******开始用例执行,开始时间为:{current_time}*******")
def teardown_class(self):
logger.info(f"*********执行用例完成,完成时间为:{current_time}*******")
@pytest.mark.parametrize('case', case_list)
def test_run(self, case):
res_data = None
url = case_data.loadConfkey('zrlog', 'url_api')['value'] + case['url']
method = case['method']
headers = eval(case['headers'])
cookies = eval(case['cookies'])
data = eval(case['request_body'])
relation = eval(case['relation'])
case_name = case['title']
headers = self.correlation(headers)
cookies = self.correlation(cookies)
data = self.correlation(data)
try:
logger.info("正在执行{}用例".format(case_name))
res_data = RequestSend().send(url, method, data=data, haders=headers, cookies=cookies)
logger.info("用例执行完成,请求的结果为{}".format(res_data))
except:
logger.info("用例执行失败,请查看日志找原因")
assert False
if res_data:
if relation != "None":
self.set_relation(relation, res_data)
self.assert_response(case, res_data)
return res_data
def set_relation(self, relation, res_data):
try:
if relation:
relation = relation.split(',')
for i in relation:
var = i.split('=')
var_name = var[0]
var_tmp = var[1].split(".")
res = Base.parse_relation(var_tmp, res_data)
print(f"{var_name}={res}")
setattr(DynamicParam, var_name, res)
except Exception as e:
print(e)
def correlation(self, data):
res_data = Base.find(data)
if res_data:
replace_dict = {}
for i in res_data:
data_tmp = getattr(DynamicParam, str(i), 'None')
replace_dict.update({str(i): data_tmp})
data = json.loads(Base.replace(data, replace_dict))
return data
def assert_response(self, case, res_data):
is_pass = False
try:
assert int(res_data['body']['error']) == int(case['expected_code'])
logger.info("用例断言成功")
is_pass = True
except:
is_pass = False
logger.info("用例断言失败")
finally:
case_data.updateResults(res_data, is_pass, str(case['id']))
assert is_pass
return is_pass
|