Python 接口自动化
公司项目有不少接口, 后期还会有更多的接口需要测试, 为了提升工作效率, 在新代码提交后, 尽早确保已有接口的正常工作, 所以安排上了接口自动化.
思维导图
具体实现
env config 文件
将环境数据放入yaml 文件管理, 方便在不同环境, 只需要修改配置文件就能跑用例. 考虑到以后要是有另外的接口什么的需要测试, 就写成了下面格式.
wechat_api:
base_URL: https://qyapi.weixin.qq.com/
define_path: config/api_relate/api_define/demo_wechat.yaml
test_data_path: testdata/api_cases/api_test_cases_demo.xlsx
sheet_name: WeChat_APIs
certificate_check: True
timeout: 2
API 相关
使用了类似于 UI 自动化 page object 的思维, 写了 BaseAPI 模块, 集成 BaseAPI 模块的子接口模块.
-
base api 模块实现以下功能
def run_cases(self, case_data, **kwargs):
"""
根据测试数据来发送响应的request. 做断言.
:param case_data: 测试用例数据
:param kwargs: 其他的一些可能用到的参数
:return:
"""
if isinstance(case_data, dict):
logger.debug('case data is a dict')
self.update_temp_variables(case_data)
run_api = case_data.get('api_name')
self.update_request_by_case(case_data)
getattr(self, run_api)(**kwargs)
logger.debug('self resp value is: ')
logger.debug(self.resp)
self.common_asserts.assert_response(self.resp, case_data, self.env_data)
if case_data.get('export_data'):
pass
else:
logger.debug("case data type is: ")
logger.debug(type(case_data))
```
```python
def send_request(self, api_define) -> Response:
"""
发送 request 请求
:param api_define: api 的相关定义数据
:return: 返回 response
"""
self.update_req_var_api_define(api_define)
self.req = self.replace_req_variables(self.req)
logger.info('send request as: ')
logger.info(self.req)
resp = requests.request(
url=self.req.get('url'),
method=self.req.get('method'),
params=self.req.get('params'),
data=self.req.get('data'),
json=self.req.get('json'),
files=self.req.get('files'),
auth=self.req.get('auth'),
timeout=self.req.get('timeout'),
verify=self.req.get('verify')
)
self.format_resp(resp)
self.req.clear()
return resp
-
xxx_api 模块 每一个 xxx_api 都有对应的接口, 在调用时, 会从对应的配置文件中取到接口配置信息. WeChat:
/cgi-bin/user/simplelist:
endpoint: /cgi-bin/user/simplelist
method: get
/cgi-bin/user/get:
endpoint: /cgi-bin/user/get
method: get
def simplelist(self):
self.send_request(self.api_define['WeChat'].get('/cgi-bin/user/simplelist'))
-
接口响应的断言 将接口响应的断言单独写了个模块, 在这里实现了将测试结果回写进测试数据文件(这里可能会存在一个问题, 在用例很多的时候, 并发跑用例, 可能会出现多个线程都要写入结果, 那这个时候会出现只有一个线程能更新, 其余的都会报权限错误. ) from requests import Response
import json
import jsonpath as jsonpath
import logging
from src.common.file_path_generator import GenerateFilePath
from src.common.file_operation import FileOperation
logger = logging.getLogger(__name__)
class CommonAsserts:
"""
断言类
提供各类的断言方法
"""
def __init__(self):
self.file_path = GenerateFilePath()
self.file_opr = FileOperation()
def assert_response(self, resp: Response, case_data: dict, env_data: dict):
"""
response 的断言方法.
:param resp: 接口响应
:param case_data: 测试用例数据
:param env_data: 环境数据
:return:
"""
checkpoint = case_data.get('check_point')
if checkpoint:
resp_json = resp.json()
checkpoints = self.check_status_code(resp, checkpoint, case_data, env_data)
if isinstance(checkpoints, dict):
for key in checkpoints.keys():
if isinstance(checkpoints[key], list):
check_point_list = checkpoints[key]
for value in check_point_list:
path = value.get('path')
actually_list = self.jsonpath_data(resp_json, path, case_data,
env_data)
match_type = value.get('match_type')
expect_value = value.get('expect_value')
self.validate_by(actually_list, match_type, expect_value, case_data, env_data)
else:
logger.info('check value for key: %s' % key)
path = '$..' + key
logger.debug('checkpoint key value is: ')
logger.debug(checkpoints[key])
actually_list = self.jsonpath_data(resp_json, path, case_data, env_data)
self.any_of(checkpoints[key], actually_list, case_data, env_data)
else:
logger.warning('No checkpoint defined in test data.')
msg_list = ['No checkpoint defined in test data.']
self.assert_result(False, case_data, env_data, msg_list)
assert False
def validate_by(self, actually_list, match_type, expect_value, case_data, env_data):
"""
稍微麻烦点的断言方法, 封装部分断言.
:param actually_list: 取到的实际结果
:param match_type: 匹配方法
:param expect_value: 期待结果
:param case_data: 测试用例数据
:param env_data: 环境数据
:return:
"""
getattr(self, match_type)(expect_value, actually_list, case_data, env_data)
def all_of(self, expect_value, actually_list, case_data, env_data):
"""
遍历取到的实际结果集, 将预期结果与每一个实际结果对比, 所有都匹配返回True, 其中一个不匹配返回False.
:param expect_value: 预期结果
:param actually_list: 实际结果集
:param case_data: 测试用例数据
:param env_data: 环境数据
:return:
"""
logger.info("run check point with all of type")
result = True
msg_list = []
for value in actually_list:
if expect_value != value:
msg_list.append('actually value is ' + value)
msg_list.append('except value is: ' + expect_value)
result = False
break
self.assert_result(result, case_data, env_data, msg_list)
def any_of(self, expect_value, actually_list, case_data, env_data):
"""
预期结果只要在实际结果中能找到, 则断言成功.
:param expect_value: 预期结果
:param actually_list: 实际结果集
:param case_data: 测试用例数据
:param env_data: 环境数据
:return:
"""
logger.info("run check point with any of type")
result = False
msg_list = []
if expect_value in actually_list:
result = True
else:
msg_list.append("expect value is not in actually value")
msg_list.append("expect value is %s" % expect_value)
msg_list.append('actually value is: %s' % str(actually_list))
self.assert_result(result, case_data, env_data, msg_list)
def equal_to_ignoring_case(self, expect_value, actually_list, case_data, env_data):
"""
忽略大小写的匹配
:param expect_value: 期待结果
:param actually_list: 实际结果
:param case_data: 测试用例数据
:param env_data: 环境数据
:return:
"""
result = False
for value in actually_list:
if expect_value.lower() == value.lower():
result = True
else:
result = False
break
self.assert_result(result, case_data, env_data)
def jsonpath_data(self, src_dict, path, case_data: dict, env_data: dict) -> list:
"""
通过 jsonpath 获取response 中的数据.
:param src_dict: 目标词典.
:param path: jsonpath
:param case_data: 测试用例数据
:param env_data: 环境数据
:return: 通过jsonpath 取得的数据都是list, 返回一个list集合.
"""
result_list = jsonpath.jsonpath(src_dict, path)
msg_list = []
if result_list is False:
msg_list.append("The json path: {} in test data is incorrect, please check".format(str(path)))
self.assert_result(False, case_data, env_data, msg_list)
raise TypeError("The json path: {} in test data is incorrect, please check".format(str(path)))
return result_list
def check_status_code(self, resp: Response, check_points, case_data, env_data):
"""
断言 response 的status code
:param resp: 接口的响应
:param check_points: 测试数据, 如果有status_code, 则验证指定的status_code, 如果没有就不断言.
:param case_data: 测试用例数据
:param env_data: 环境数据
:return:
"""
result = False
check_points = json.loads(check_points)
msg_list = []
if 'status_code' in check_points:
logger.info("Customize status code: %s" % check_points['status_code'])
if int(check_points['status_code']) == resp.status_code:
result = True
check_points.pop('status_code')
else:
msg_list.append('Expect status code is %s' % str(check_points['status_code']))
msg_list.append('Actually status code is %s' % str(resp.status_code))
result = False
else:
logger.info('check default status code 200.')
if resp.status_code == 200:
result = True
else:
msg_list.append('Expect status code is 200')
msg_list.append('Actually status code is %s' % str(resp.status_code))
result = False
self.assert_result(result, case_data, env_data, msg_list)
return check_points
def assert_result(self, result: bool, case_data: dict, env_data: dict, msg_list=None):
"""
对结果做校验, 并且记录测试结果到测试用例数据中.
:param result: 布尔值, Ture 则checkpoint 检查通过, False checkpoint 检查不通过.
:param env_data: 环境数据, 主要用于取得测试用例数据的位置, sheet名称
:param case_data: 测试数据
:param msg_list: 消息list
:return:
"""
try:
if msg_list:
logger.warning(msg_list)
assert result
self.update_test_result('Pass', case_data, env_data)
except AssertionError as e:
logger.info('Assert Fail, case is failed.', exc_info=True)
self.update_test_result('Failed', case_data, env_data)
raise
def update_test_result(self, finally_result: str, case_data: dict, env_data: dict):
"""
记录测试结果到测试用例数据中.
:param finally_result: assert 结果, Pass, Failed
:param env_data: 环境数据, 主要用于取得测试用例数据的位置, sheet名称
:param case_data: 测试数据
:return:
"""
test_data_path = env_data.get('test_data_path')
print('********************************path value is')
print(test_data_path)
test_data_abs_path = self.file_path.file_absolute_path(test_data_path)
case_data['result'] = finally_result
write_data_list = []
for value in case_data.values():
write_data_list.append(value)
logger.info('Write test result to data file: %s', test_data_abs_path)
logger.info(write_data_list)
self.file_opr.write_to_excel(file_path=test_data_abs_path,
sheet_name=env_data.get('sheet_name'),
case_index=case_data.get('case_id'),
data_to_write=write_data_list)
测试数据相关
-
测试数据来源 现在是将测试数据放入excel中管理的. 大致结构如下: 由api_data_provider 模块组装并提供数据集合, 指定sheet_name, 文件路径 file_path 注意: excel里面的数据如果增加了列, 那么这里的代码也需要响应的改动. class ProvideAPIData:
@staticmethod
def single_case_4_excel(src_data: tuple) -> dict:
"""
根据传入的 excel 数据生成一条测试用例. 使用 zip 的方法将两个元组压缩, 并返回词典
:param src_data: 元组类型的数据.
:return: 返回词典类型的数据
"""
key_value = (
'case_id', 'api_name', 'api_desc', 'request_url', 'method', 'priority', 'request_data_type', 'request_data',
'encode',
'check_point', 'export_data', 'relate_data', 'active', 'result')
if len(src_data) != len(key_value):
raise ValueError("len of src_data should be {}".format(str(len(key_value))))
case = zip(key_value, src_data)
return dict(case)
def cases_data_4_excel(self, excel_data: list) -> list:
"""
将 excel sheet 表中多行数据转换为
:param excel_data: 格式为 [(tuple), (tuple)] 的数据
:return: list 数据, 但是格式为 [dict, dict]
"""
cases_list = []
for index in range(len(excel_data)):
cases_list.append(self.single_case_4_excel(excel_data[index]))
return cases_list
def provide_excel_data(self, file_path: str, sheet_name: str) -> list:
"""
根据传入的 excel 文件路径, sheet 名称, 获取指定 sheet 表格的数据.
:param file_path: excel 文件路径
:param sheet_name: 指定的工作表 sheet 名称
:return: list 数据, 格式为 [dict, dict]
"""
total_cases = FileOperation.load_excel_file(file_path, sheet_name)
target_data = self.cases_data_4_excel(total_cases)
logger.info('Loading test data as: %s' % target_data)
return target_data
-
fixture 提供测试数据 使用fixture来为测试提供excel 中的测试用例数据. fixture conftest.py中写法 def case_index(fixture_value):
"""
提供 case id, 将测试数据中的 api_name 取出来作为每一个 case 的执行 ID.
:param fixture_value: 测试用例数据
:return: api_name 的value.
"""
return fixture_value.get('api_name')
def wechat_data():
"""
放在 fixture 中, 优雅的提供测试数据.
:return:
"""
print('run fixture?')
case_data_relate_path = (load_config.loading_config())['wechat_api'].get('test_data_path')
case_data_path = file_path.file_absolute_path(case_data_relate_path)
case_data = data_provider.provide_excel_data(case_data_path, "WeChat_APIs")
print('case data is?')
print(case_data)
return case_data
@pytest.fixture(scope="class", autouse=False, params=wechat_data(), ids=case_index)
def wechat_data_provider(request):
return request.param
测试套件中只需要使用pytest 的mark usefixtures 即可使用测试数据 @pytest.mark.usefixtures('wechat_data_provider')
def test_wechat_api(self, wechat_data_provider):
self.wechat.run_cases(wechat_data_provider)
关于log
由于使用的是pytest, 所以直接在项目根目录下面写了pytest.ini 文件管理log的处理.
[pytest]
log_cli = 1
log_cli_level = DEBUG
log_cli_date_format = %Y-%m-%d-%H-%M-%S
log_cli_format = %(asctime)s - %(filename)s - %(module)s - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s
log_file = ../results/logs/logs.log
log_file_level = DEBUG
log_file_date_format = %Y-%m-%d-%H-%M-%S
log_file_format = %(asctime)s - %(filename)s - %(module)s - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s
run.py文件
为了捡懒, 将执行pytest, allure 生成报告的命令放到了run.py 模块中.
import sys
import os
import shutil
import logging
import time
from typing import Union
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
sys.path.append(rootPath)
from src.common.file_path_generator import GenerateFilePath
from src.common.file_operation import FileOperation
logger = logging.getLogger(__name__)
class Run:
allure_report_path = 'results/report/allure_result'
allure_html_path = 'results/report/allure_html'
allure_cmd_path = 'results/report'
environment_properties_path = 'environment.properties'
def __init__(self):
self.generate_abs_path = GenerateFilePath()
self.file_opr = FileOperation()
def run_test_by_pytest(self, test_path, rerun=4, xdist='auto'):
allure_report_abs_path = self.generate_abs_path.file_absolute_path(self.allure_report_path)
allure_html_abs_path = self.generate_abs_path.file_absolute_path(self.allure_html_path)
env_prop_abs_path = self.generate_abs_path.file_absolute_path(self.environment_properties_path)
if os.path.exists(allure_report_abs_path):
shutil.rmtree(allure_report_abs_path)
test_path = self.generate_abs_path.file_absolute_path(test_path)
pytest_cmd = f'pytest -s --rootdir={test_path} ' \
f'-o junit_family=xunit2 --junitxml=../results/report/test_result.xml ' \
f'--alluredir={allure_report_abs_path} ' \
f'--clean-alluredir --reruns={rerun} -n={xdist} --dist=loadscope'
os.system(pytest_cmd)
if os.path.exists(allure_report_abs_path):
if os.path.exists(env_prop_abs_path):
shutil.copy(env_prop_abs_path, allure_report_abs_path)
history_trend_path = self.generate_abs_path.file_absolute_path(
self.allure_html_path + '/widgets/history-trend.json')
temp_history_trend = None
retry_trend_path = self.generate_abs_path.file_absolute_path(
self.allure_html_path + '/widgets/retry-trend.json')
temp_retry_trend = None
if os.path.exists(allure_html_abs_path):
logger.debug("allure_html_abs_path is existing")
if os.path.exists(history_trend_path):
logger.debug("history_trend_path is existing")
temp_history_trend = self.get_trend_value(history_trend_path)
logger.debug(temp_history_trend)
if os.path.exists(retry_trend_path):
temp_retry_trend = self.get_trend_value(retry_trend_path)
logger.debug(temp_retry_trend)
shutil.rmtree(allure_html_abs_path)
allure_cmd = f'allure generate {allure_report_abs_path} -o {allure_html_abs_path} --clean'
os.system(allure_cmd)
if temp_history_trend:
current_history_trend = self.get_trend_value(history_trend_path)
new_history_trend = self.replace_trend_value(current_history_trend, temp_history_trend)
self.file_opr.generate_json_file(history_trend_path,
new_history_trend)
if temp_retry_trend:
current_retry_trend = self.get_trend_value(retry_trend_path)
new_retry_trend = self.replace_trend_value(current_retry_trend, temp_retry_trend)
self.file_opr.generate_json_file(retry_trend_path, new_retry_trend)
@staticmethod
def replace_trend_value(current_tend: list, history_trend=None):
"""
拼装 *-trend.json 的数据, 将最新的放最后面.
:param current_tend:
:param history_trend:
:return:
"""
expect_trend = []
if history_trend:
expect_trend = history_trend
history_trend.append(current_tend[0])
return expect_trend
def get_trend_value(self, trend_path) -> Union[list, None]:
"""
从trend 文件中获取 trend数据.
:param trend_path: trend 文件的绝对路径
:return:
"""
return self.file_opr.load_json_file(trend_path)
@staticmethod
def back_folder(folder_to_backup_path):
"""
最开始设想将 allure_html_path 文件夹给整个备份了. 以 folder_name+time 的方式
发现这样备份后, 数据有点大. 所以将这个方法暂时禁用.
:param folder_to_backup_path:
:return:
"""
current_time = time.strftime('%Y%m%d%H%M%S')
html_report_backup_path = folder_to_backup_path + '_' + current_time
if os.path.exists(folder_to_backup_path):
if not os.path.exists(html_report_backup_path):
os.mkdir(html_report_backup_path)
for root, dirs, files in os.walk(folder_to_backup_path):
for file in files:
src_file = os.path.join(root, file)
shutil.copy(src_file, html_report_backup_path)
os.remove(folder_to_backup_path)
if __name__ == "__main__":
run_pytest = Run()
run_pytest.run_test_by_pytest('src/testsuite', rerun=6, xdist='1')
报告
allure 报告
总结
暂时先写这样了, 还有其他的要优化的后面想到了再来优化了. 一直都说会接口自动化, 到了真的要搞的时候, 才发现需要学的东西还很多.
测试数据来源这一块想了很久, 翻了很多帖子, 从最开始想用 yaml 管理测试数据, 到最后换成了 excel 管理, 中间 YY 纠结了很久. 结果发现还是要直接撸代码才来得快. 光是YY没啥卵用.
run.py的代码在网上找了大神的代码作为参考. 忘了原文是哪儿了, 就记得一个代码连接: https://gitee.com/nemo1122/pytest_demo/blob/master/run.py
|