Are you sure you want to delete this task? Once this task is deleted, it cannot be recovered.
King15800 712b2e7066 | 2 years ago | |
---|---|---|
common_utils | 2 years ago | |
config | 2 years ago | |
data | 2 years ago | |
outputs | 2 years ago | |
request_utils | 2 years ago | |
test_case | 2 years ago | |
.gitignore | 2 years ago | |
LICENSE | 2 years ago | |
README.md | 2 years ago | |
pytest.ini | 2 years ago | |
requirements.txt | 2 years ago | |
run.py | 2 years ago |
对于框架任何问题,欢迎联系我!
解决痛点:
通过session会话方式,解决了登录之后cookie关联处理
框架天然支持接口动态传参、关联灵活处理
支持Excel、Yaml文件格式编写接口用例,通过简单配置框架自动读取并执行
执行环境一键切换,解决多环境相互影响问题
支持http/https协议各种请求、传参类型接口
响应数据格式支持json、str类型的提取操作
断言方式支持等于、包含、大于、小于、不等于等方
框架可以直接交给不懂代码的功能测试人员使用,只需要安装规范编写接口用例就行
框架使用说明:
pip install -r requirements.txt
run.py
文件Excel
或者Yaml
文件里面,按照示例编写即可,也可以在test_case
目录下通过python脚本编写casejsonpath
、正则表达式
提取数据Excel
和test_case
目录下用例def assert_result(response: Response, expected: str) -> None:
""" 断言方法
:param response: 实际响应对象
:param expected: 预期响应内容,从excel中或者yaml读取、或者手动传入
return None
"""
if expected is None:
logging.info("当前用例无断言!")
return
if isinstance(expected, str):
expect_dict = eval(expected)
else:
expect_dict = expected
index = 0
for k, v in expect_dict.items():
# 获取需要断言的实际结果部分
for _k, _v in v.items():
if _k == "http_code":
actual = response.status_code
else:
if response_type(response) == "json":
actual = json_extractor(response.json(), _k)
else:
actual = re_extract(response.text, _k)
index += 1
logging.info(f'第{index}个断言数据,实际结果:{actual} | 预期结果:{_v} 断言方式:{k}')
allure_step(f'第{index}个断言数据', f'实际结果:{actual} = 预期结果:{v}')
try:
if k == "eq": # 相等
assert actual == _v
elif k == "in": # 包含关系
assert _v in actual
elif k == "gt": # 判断大于,值应该为数值型
assert actual > _v
elif k == "lt": # 判断小于,值应该为数值型
assert actual < _v
elif k == "not": # 不等于,非
assert actual != _v
else:
logging.exception(f"判断关键字: {k} 错误!")
except AssertionError:
raise AssertionError(f'第{index}个断言失败 -|- 断言方式:{k} 实际结果:{actual} || 预期结果: {_v}')
def get_case_data():
case_type = ReadYaml(config_path + "config.yaml").read_yaml["case"]
if case_type == CaseType.EXCEL.value:
cases = []
for file in [excel for excel in os.listdir(data_path) if os.path.splitext(excel)[1] == ".xlsx"]:
data = ReadExcel(data_path + file).read_excel()
name = os.path.splitext(file)[0]
class_name = name.split("_")[0].title() + name.split("_")[1].title()
gen_case(name, data, class_name)
cases.extend(data)
return cases
elif case_type == CaseType.YAML.value:
cases = []
for yaml_file in [yaml for yaml in os.listdir(data_path) if
os.path.splitext(yaml)[1] in [".yaml", "yml"]]:
data = ReadYaml(data_path + yaml_file).read_yaml
name = os.path.splitext(yaml_file)[0]
class_name = name.split("_")[0].title() + name.split("_")[1].title()
gen_case(name, data, class_name)
cases.extend(data)
return cases
else:
cases = []
for file in [excel for excel in os.listdir(data_path) if
os.path.splitext(excel)[1] in [".yaml", "yml", ".xlsx"]]:
if os.path.splitext(file)[1] == ".xlsx":
data = ReadExcel(data_path + file).read_excel()
name = os.path.splitext(file)[0]
cases.extend(data)
else:
data = ReadYaml(data_path + file).read_yaml
name = os.path.splitext(file)[0]
cases.extend(data)
class_name = name.split("_")[0].title() + name.split("_")[1].title()
gen_case(name, data, class_name)
return cases
class ReadExcel:
def __init__(self, filename):
self.filename = filename
# 打开文件
self.workbook = openpyxl.load_workbook(self.filename)
# 获取sheet
self.sheets = self.workbook.sheetnames
def read_excel(self, sheet: str = "") -> list:
case_data = []
if sheet == "":
sheets = self.sheets
else:
sheets = [sheet]
for sheet in sheets:
wb = self.workbook[sheet]
max_row = self.workbook[sheet].max_row
for i in range(2, max_row + 1):
_dict = {}
if wb.cell(row=i, column=CaseEnum.API_EXEC.value).value == '是':
_dict["id"] = wb.cell(row=i, column=CaseEnum.CASE_ID.value).value
_dict["feature"] = wb.cell(row=i, column=CaseEnum.CASE_FEATURE.value).value
_dict["title"] = wb.cell(row=i, column=CaseEnum.CASE_TITLE.value).value
_dict["url"] = wb.cell(row=i, column=CaseEnum.API_PATH.value).value
_dict["header"] = wb.cell(row=i, column=CaseEnum.API_HEADER.value).value
_dict["method"] = wb.cell(row=i, column=CaseEnum.API_METHOD.value).value
_dict["pk"] = wb.cell(row=i, column=CaseEnum.API_PK.value).value
_dict["data"] = wb.cell(row=i, column=CaseEnum.API_DATA.value).value
_dict["file"] = wb.cell(row=i, column=CaseEnum.API_FILE.value).value
_dict["extract"] = wb.cell(row=i, column=CaseEnum.API_EXTRACT.value).value
_dict["validate"] = wb.cell(row=i, column=CaseEnum.API_EXPECTED.value).value
case_data.append(_dict)
return case_data
class ReadYaml:
def __init__(self, filename):
self.filename = filename
@property
def read_yaml(self) -> object:
with open(file=self.filename, mode="r", encoding="utf-8") as fp:
case_data = yaml.safe_load(fp.read())
return case_data
# 服务器器地址
host: http://localhost:8091/
case: 1 # 0代表执行Excel和yaml两种格式的用例, 1 代表Excel用例,2 代表 yaml文件用例
日志输出目录
import logging
import time
import os
def get_log(logger_name):
"""
:param logger_name: 日志名称
:return: 返回logger handle
"""
# 创建一个logger
logger = logging.getLogger(logger_name)
logger.setLevel(logging.INFO)
# 获取本地时间,转换为设置的格式
rq = time.strftime('%Y%m%d', time.localtime(time.time()))
# 设置所有日志和错误日志的存放路径
path = os.path.dirname(os.path.abspath(__file__))
all_log_path = os.path.join(path, 'interface_logs\\All_Logs\\')
if not os.path.exists(all_log_path):
os.makedirs(all_log_path)
error_log_path = os.path.join(path, 'interface_logs\\Error_Logs\\')
if not os.path.exists(error_log_path):
os.makedirs(error_log_path)
# 设置日志文件名
all_log_name = all_log_path + rq + '.log'
error_log_name = error_log_path + rq + '.log'
if not logger.handlers:
# 创建一个handler写入所有日志
fh = logging.FileHandler(all_log_name, encoding='utf-8')
fh.setLevel(logging.INFO)
# 创建一个handler写入错误日志
eh = logging.FileHandler(error_log_name, encoding='utf-8')
eh.setLevel(logging.ERROR)
# 创建一个handler输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# 以时间-日志器名称-日志级别-文件名-函数行号-错误内容
all_log_formatter = logging.Formatter(
'[%(asctime)s] %(filename)s - %(levelname)s - %(lineno)s - %(message)s')
# 以时间-日志器名称-日志级别-文件名-函数行号-错误内容
error_log_formatter = logging.Formatter(
'[%(asctime)s] %(filename)s - %(levelname)s - %(lineno)s - %(message)s')
# 将定义好的输出形式添加到handler
fh.setFormatter(all_log_formatter)
ch.setFormatter(all_log_formatter)
eh.setFormatter(error_log_formatter)
# 给logger添加handler
logger.addHandler(fh)
logger.addHandler(eh)
logger.addHandler(ch)
return logger
报告目录
执行case后自动生成,执行之前自动删除
allure 数据目录
执行case后自动生成,执行之前自动删除
base_request.py 请求封装工具类
class BaseRequest:
session = None
@classmethod
def get_session(cls):
if cls.session is None:
cls.session = requests.Session()
return cls.session
@classmethod
def send_request(cls, case: dict) -> Response:
"""
处理case数据,转换成可用数据发送请求
:param case: 读取出来的每一行用例内容
return: 响应对象
"""
log.info("开始执行用例: {}".format(case.get("title")))
req_data = RequestPreDataHandle(case).to_request_data
res = cls.send_api(
url=req_data["url"],
method=req_data["method"],
pk=req_data["pk"],
header=req_data.get("header", None),
data=req_data.get("data", None),
file=req_data.get("file", None)
)
allure_step('请求响应数据', res.text)
after_extract(res, req_data.get("extract", None))
return res
@classmethod
def send_api(cls, url, method, pk, header=None, data=None, file=None) -> Response:
"""
:param method: 请求方法
:param url: 请求url
:param pk: 入参关键字, params(查询参数类型,明文传输,一般在url?参数名=参数值), data(一般用于form表单类型参数)
json(一般用于json类型请求参数)
:param data: 参数数据,默认等于None
:param file: 文件对象
:param header: 请求头
:return: 返回res对象
"""
session = cls.get_session()
pk = pk.lower()
if pk == 'params':
res = session.request(method=method, url=url, params=data, headers=header)
elif pk == 'data':
res = session.request(method=method, url=url, data=data, files=file, headers=header)
elif pk == 'json':
res = session.request(method=method, url=url, json=data, files=file, headers=header)
else:
raise ValueError('pk可选关键字为params, json, data')
return res
pre_handle_utils.py 请求前置处理工具类
def pre_expr_handle(content) -> object:
"""
:param content: 原始的字符串内容
return content: 替换表达式后的字符串
"""
if content is None:
return None
if len(content) != 0:
log.info(f"开始进行字符串替换: 替换字符串为:{content}")
content = Template(str(content)).safe_substitute(GLOBAL_VARS)
for func in re.findall('\\${(.*?)}', content):
try:
content = content.replace('${%s}' % func, exec_func(func))
except Exception as e:
log.exception(e)
log.info(f"字符串替换完成: 替换字符串后为:{content}")
return content
after_handle_utils.py 后置操作处理工具类
def after_extract(response: Response, exp: str) -> None:
"""
:param response: request 响应对象
:param exp: 需要提取的参数字典 '{"k1": "$.data"}' 或 '{"k1": "data:(.*?)$"}'
:return:
"""
if exp:
if response_type(response) == "json":
res = response.json()
for k, v in exp.items():
GLOBAL_VARS[k] = json_extractor(res, v)
else:
res = response.text
for k, v in exp.items():
GLOBAL_VARS[k] = re_extract(res, v)
@allure.feature("登录")
class TestLogin:
@allure.story("正常登录成功")
@allure.severity(allure.severity_level.BLOCKER)
def test_login(self):
allure_title("正常登录")
data = {
"url": "api/login",
"method": "post",
"pk": "data",
"data": {"userName": "king", "pwd": 123456}
}
expected = {
"$.msg": "登录成功!"
}
# 发送请求
response = BaseRequest.send_request(data)
# 断言操作
assert_result(response, expected)
def run():
# 生成case在执行
if os.path.exists(auto_gen_case_path):
shutil.rmtree(auto_gen_case_path)
get_case_data()
if os.path.exists('outputs/reports/'):
shutil.rmtree(path='outputs/reports/')
# 本地调式执行
pytest.main(args=['-s', '--alluredir=outputs/reports'])
# 自动以服务形式打开报告
# os.system('allure serve outputs/reports')
# 本地生成报告
os.system('allure generate outputs/reports -o outputs/html --clean')
shutil.rmtree(auto_gen_case_path)
if __name__ == '__main__':
run()
[2022-01-11 22:36:04,164] base_request.py - INFO - 42 - 开始执行用例: 正常登录
[2022-01-11 22:36:04,165] pre_handle_utils.py - INFO - 37 - 开始进行字符串替换: 替换字符串为:bank/api/login
[2022-01-11 22:36:04,165] pre_handle_utils.py - INFO - 44 - 字符串替换完成: 替换字符串后为:bank/api/login
[2022-01-11 22:36:04,165] pre_handle_utils.py - INFO - 68 - 处理请求前url:bank/api/login
[2022-01-11 22:36:04,165] pre_handle_utils.py - INFO - 78 - 处理请求后 url:http://localhost:8091/bank/api/login
[2022-01-11 22:36:04,165] pre_handle_utils.py - INFO - 90 - 处理请求前Data: {'password': '123456', 'userName': 'king'}
[2022-01-11 22:36:04,165] pre_handle_utils.py - INFO - 37 - 开始进行字符串替换: 替换字符串为:{'password': '123456', 'userName': 'king'}
[2022-01-11 22:36:04,166] pre_handle_utils.py - INFO - 44 - 字符串替换完成: 替换字符串后为:{'password': '123456', 'userName': 'king'}
[2022-01-11 22:36:04,166] pre_handle_utils.py - INFO - 92 - 处理请求后Data: {'password': '123456', 'userName': 'king'}
[2022-01-11 22:36:04,166] pre_handle_utils.py - INFO - 100 - 处理请求前files: None
[2022-01-11 22:36:04,175] base_request.py - INFO - 53 - 请求响应数据{"code":"0","message":"success","data":null}
[2022-01-11 22:36:04,176] data_handle.py - INFO - 29 - 提取响应内容成功,提取表达式为: $.code 提取值为 0
[2022-01-11 22:36:04,176] assert_util.py - INFO - 49 - 第1个断言数据,实际结果:0 | 预期结果:0 断言方式:eq
[2022-01-11 22:36:04,176] data_handle.py - INFO - 29 - 提取响应内容成功,提取表达式为: $.message 提取值为 success
[2022-01-11 22:36:04,176] assert_util.py - INFO - 49 - 第2个断言数据,实际结果:success | 预期结果:success 断言方式:eq
No Description
Python Text
Dear OpenI User
Thank you for your continuous support to the Openl Qizhi Community AI Collaboration Platform. In order to protect your usage rights and ensure network security, we updated the Openl Qizhi Community AI Collaboration Platform Usage Agreement in January 2024. The updated agreement specifies that users are prohibited from using intranet penetration tools. After you click "Agree and continue", you can continue to use our services. Thank you for your cooperation and understanding.
For more agreement content, please refer to the《Openl Qizhi Community AI Collaboration Platform Usage Agreement》