Python3+Requests+Excel完整接口自动化测试框架的实现
框架整体使用Python3+Requests+Excel:包含对实时token的获取
1、------base
-------runmethond.py
runmethond:对不同的请求方式进行封装
import json import requests requests.packages.urllib3.disable_warnings() class RunMethod: def post_main(self, url, data, header=None): res = None if header != None: res = requests.post(url=url, data=data, headers=header,verify=False) else: res = requests.post(url=url, data=data,verify=False) return res.json() def get_main(self, url, data=None, header=None): res = None if header != None: res = requests.get(url=url, params=data, headers=header, verify=False) else: res = requests.get(url=url, params=data, verify=False) return res.json() def run_main(self, method, url, data=None, header=None): res = None if method == 'Post': res = self.post_main(url, data, header) else: res = self.get_main(url, data, header) return json.dumps(res, indent=2, sort_keys=True, ensure_ascii=False) if __name__ == '__main__': url = 'http://httpbin.org/post' data = { 'cart': '11' } run = RunMethod() run_test = run.run_main(method="Post", url=url, data=data) print(run_test)
2、------data
------data_config.py
data_config:获取excel模块中数据
class global_val: Id = '0' request_name = '1' url = '2' run = '3' request_way = '4' header = '5' case_depend = '6' data_depend = '7' field_depend = '8' data = '9' expect = '10' result = '11' def get_id(): """获取case_id""" return global_val.Id def get_request_name(): """获取请求模块名称""" return global_val.request_name def get_url(): """获取请求url""" return global_val.url def get_run(): """获取是否运行""" return global_val.run def get_run_way(): """获取请求方式""" return global_val.request_way def get_header(): """获取是否携带header""" return global_val.header def get_case_depend(): """case依赖""" return global_val.case_depend def get_data_depend(): """依赖的返回数据""" return global_val.data_depend def get_field_depend(): """数据依赖字段""" return global_val.field_depend def get_data(): """获取请求数据""" return global_val.data def get_expect(): """获取预期结果""" return global_val.expect def get_result(): """获取返回结果""" return global_val.result
3、-----data
-----dependent_data.py
dependent_data:解决数据依赖问题
from util.operation_excel import OperationExcel from base.runmethod import RunMethod from data.get_data import GetData from jsonpath_rw import jsonpath, parse import json class DependentData: """解决数据依赖问题""" def __init__(self, case_id): self.case_id = case_id self.opera_excel = OperationExcel() self.data = GetData() def get_case_line_data(self): """ 通过case_id去获取该case_id的整行数据 :param case_id: 用例ID :return: """ rows_data = self.opera_excel.get_row_data(self.case_id) return rows_data def run_dependent(self): """ 执行依赖测试,获取结果 :return: """ run_method = RunMethod() row_num = self.opera_excel.get_row_num(self.case_id) request_data = self.data.get_data_for_json(row_num) # header = self.data.is_header(row_num) method = self.data.get_request_method(row_num) url = self.data.get_request_url(row_num) res = run_method.run_main(method, url, request_data) return json.loads(res) def get_data_for_key(self, row): """ 根据依赖的key去获取执行依赖case的响应然后返回 :return: """ depend_data = self.data.get_depend_key(row) response_data = self.run_dependent() return [match.value for match in parse(depend_data).find(response_data)][0]
4、-----data
-----get_data.py
get_data:获取excel数据
from util.operation_excel import OperationExcel from data import data_config from util.operation_json import OperationJson class GetData: """获取excel数据""" def __init__(self): self.opera_excel = OperationExcel() def get_case_lines(self): """获取excel行数,即case的个数""" return self.opera_excel.get_lines() def get_is_run(self, row): """获取是否执行""" flag = None col = int(data_config.get_run()) run_model = self.opera_excel.get_cell_value(row, col) if run_model == 'yes': flag = True else: flag = False return flag def is_header(self, row): """ 是否携带header :param row: 行号 :return: """ col = int(data_config.get_header()) header = self.opera_excel.get_cell_value(row, col) if header != '': return header else: return None def get_request_method(self, row): """ 获取请求方式 :param row: 行号 :return: """ # col 列 col = int(data_config.get_run_way()) request_method = self.opera_excel.get_cell_value(row, col) return request_method def get_request_url(self, row): """ 获取url :param row: 行号 :return: """ col = int(data_config.get_url()) url = self.opera_excel.get_cell_value(row, col) return url def get_request_data(self, row): """ 获取请求数据 :param row:行号 :return: """ col = int(data_config.get_data()) data = self.opera_excel.get_cell_value(row, col) if data == '': return None return data def get_data_for_json(self, row): """ 通过关键字拿到data数据 :param row: :return: """ opera_json = OperationJson() request_data = opera_json.get_data(self.get_request_data(row)) return request_data def get_expcet_data(self, row): """ 获取预期结果 :param row: :return: """ col = int(data_config.get_expect()) expect = self.opera_excel.get_cell_value(row, col) if expect == "": return None else: return expect def write_result(self, row, value): """ 写入结果数据 :param row: :param col: :return: """ col = int(data_config.get_result()) self.opera_excel.write_value(row, col, value) def get_depend_key(self, row): """ 获取依赖数据的key :param row:行号 :return: """ col = int(data_config.get_data_depend()) depend_key = self.opera_excel.get_cell_value(row, col) if depend_key == "": return None else: return depend_key def is_depend(self, row): """ 判断是否有case依赖 :param row:行号 :return: """ col = int(data_config.get_case_depend()) # 获取是否存在数据依赖列 depend_case_id = self.opera_excel.get_cell_value(row, col) if depend_case_id == "": return None else: return depend_case_id def get_depend_field(self, row): """ 获取依赖字段 :param row: :return: """ col = int(data_config.get_field_depend()) data = self.opera_excel.get_cell_value(row, col) if data == "": return None else: return data
5、-----dataconfig
-----case.xls
case.xls:用例数据
6、-----dataconfig
-----data.json
data.json:请求数据,根据自己实际业务,且与case层的请求数据列是关联的
{ "user": { "username": "1111111", "password": "123456" }, "filtrate": { "type_id": "2", "brand_id": "1", "model_id": "111" }, "search": { "page": "1", "keyword": "oppo", "type": "12" }, "token": { "token": "" }
7、-----dataconfig
-----token.json
token.json:实时自动将获取的token写入到该文件
{"data": {"token": "db6f0abee4e5040f5337f5c47a82879"}}
8、-----main
-----run_test.py
run_test:主运行程序
from base.runmethod import RunMethod from data.get_data import GetData from util.common_util import CommonUtil from data.dependent_data import DependentData # from util.send_email import SendEmail from util.operation_header import OperationHeader from util.operation_json import OperationJson class RunTest: def __init__(self): self.run_method = RunMethod() self.data = GetData() self.com_util = CommonUtil() # self.send_email = SendEmail() def go_on_run(self): """程序执行""" pass_count = [] fail_count = [] res = None # 获取用例数 rows_count = self.data.get_case_lines() # 第一行索引为0 for i in range(1, rows_count): is_run = self.data.get_is_run(i) if is_run: url = self.data.get_request_url(i) method = self.data.get_request_method(i) request_data = self.data.get_data_for_json(i) expect = self.data.get_expcet_data(i) header = self.data.is_header(i) depend_case = self.data.is_depend(i) if depend_case != None: self.depend_data = DependentData(depend_case) # 获取依赖的响应数据 depend_response_data = self.depend_data.get_data_for_key(i) # 获取依赖的key depend_key = self.data.get_depend_field(i) # 更新请求字段 request_data[depend_key] = depend_response_data # 如果header字段值为write则将该接口的返回的token写入到token.json文件,如果为yes则读取token.json文件 if header == "write": res = self.run_method.run_main(method, url, request_data) op_header = OperationHeader(res) op_header.write_token() elif header == 'yes': op_json = OperationJson("../dataconfig/token.json") token = op_json.get_data('data') request_data = dict(request_data, **token) # 把请求数据与登录token合并,并作为请求数据 res = self.run_method.run_main(method, url, request_data) else: res = self.run_method.run_main(method, url, request_data) if expect != None: if self.com_util.is_contain(expect, res): self.data.write_result(i, "Pass") pass_count.append(i) else: self.data.write_result(i, res) fail_count.append(i) else: print(f"用例ID:case-{i},预期结果不能为空") # 发送邮件 # self.send_email.send_main(pass_count, fail_count) print(f"通过用例数:{len(pass_count)}") print(f"失败用例数:{len(fail_count)}") if __name__ == '__main__': run = RunTest() run.go_on_run()
9、-----util
-----common_util.py
common_util:用于断言
class CommonUtil: def is_contain(self, str_one, str_two): """ 判断一个字符串是否在另一个字符串中 :param str_one: :param str_two: :return: """ flag = None if str_one in str_two: flag = True else: flag = False return flag
10、-----util
-----operation_excel.py
operation_excel:操作excel
import xlrd from xlutils.copy import copy class OperationExcel: """操作excel""" def __init__(self, file_name=None, sheet_id=None): if file_name: self.file_name = file_name self.sheet_id = sheet_id else: self.file_name ='../dataconfig/case1.xls' self.sheet_id = 0 self.data = self.get_data() def get_data(self): """ 获取sheets的内容 :return: """ data = xlrd.open_workbook(self.file_name) tables = data.sheets()[self.sheet_id] return tables def get_lines(self): """ 获取单元格行数 :return: """ tables = self.data return tables.nrows def get_cell_value(self, row, col): """ 获取单元格数据 :param row: 行 :param col: 列 :return: """ tables = self.data cell = tables.cell_value(row, col) return cell def write_value(self, row, col, value): """ 回写数据到excel :param row:行 :param col:列 :param value:值 :return: """ read_data = xlrd.open_workbook(self.file_name) write_data = copy(read_data) sheet_data = write_data.get_sheet(0) sheet_data.write(row, col, value) write_data.save(self.file_name) def get_row_data(self, case_id): """ 根据对应的case_id获取对应行的内容 :param case_id: 用例id :return: """ row_num = self.get_row_num(case_id) row_data = self.get_row_value(row_num) return row_data def get_row_num(self, case_id): """ 根据case_id获取对应行号 :param case_id: :return: """ num = 0 cols_data = self.get_cols_data() for col_data in cols_data: if case_id in col_data: return num num = num + 1 def get_row_value(self, row): """ 根据行号,找到该行的内容 :param row:行号 :return: """ tables = self.data row_data = tables.row_values(row) return row_data def get_cols_data(self, col_id=None): """ 获取某一列的内容 :param col_id:列号 :return: """ if col_id != None: cols = self.data.col_values(col_id) else: cols = self.data.col_values(0) return cols if __name__ == '__main__': opera = OperationExcel() opera.get_data() print(opera.get_data().nrows) print(opera.get_lines()) print(opera.get_cell_value(1, 2))
11、-----util
-----operation_header.py
operation_header:实时获取登录token及将token写入到token.json文件
import json from util.operation_json import OperationJson from base.runmethod import RunMethod class OperationHeader: def __init__(self, response): self.response = json.loads(response) def get_response_token(self): ''' 获取登录返回的token ''' token = {"data":{"token":self.response['data']['token']}} return token def write_token(self): op_json = OperationJson() op_json.write_data(self.get_response_token()) if __name__ == '__main__': url = "http://xxxxx" data = { "username": "1111", "password": "123456" } run_method=RunMethod() # res = json.dumps(requests.post(url, data).json()) res=run_method.run_main('Post', url, data) op = OperationHeader(res) op.write_token()
12、-----util
-----operation_json.py
operation_json:操作json文件
import json class OperationJson: """操作json文件""" def __init__(self,file_path=None): if file_path==None: self.file_path="../dataconfig/data.json" else: self.file_path=file_path self.data = self.read_data() def read_data(self): """ 读取json文件 :param file_name:文件路径 :return: """ with open(self.file_path) as fp: data = json.load(fp) return data def get_data(self, id): """根据关键字获取对应数据""" return self.data[id] # 写入json def write_data(self, data): with open("../dataconfig/token.json", 'w') as fp: fp.write(json.dumps(data)) if __name__ == '__main__': # file_path = "../dataconfig/data.json" opejson = OperationJson() print(opejson.read_data()) print(opejson.get_data('filtrate'))
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
下一篇:pandas将多个dataframe以多个sheet的形式保存到一个excel文件中