
工程目录

二、Case文件设计

三、基础包 base
3.1 封装get/post请求(runmethon.py)
- imp
-
- import json
- class RunMethod:
- def post_main(self,url,data,header=None):
- res = None
- if header !=None:
- res = requests.post(url=url,data=data,headers=header)
- else:
- res = requests.post(url=url,data=data)
- return res.json()
-
- def get_main(self,url,data=None,header=None):
- res = None
- if header !=None:
- res = requests.get(url=url,data=data,headers=header,verify=False)
- else:
- res = requests.get(url=url,data=data,verify=False)
- return res.json()
- def run_main(self,method,url,data=None,header=None):
- res = None
- if method == 'Post':
- res = self.post_main(url,data,header)
- else:
- res = self.get_main(url,data,header)
- return json.dumps(res,ensure_ascii=False,sort_keys=True,indent=2)
3.2 封装mock(mock.py)
- from mock import mock
- #模拟mock 封装
- def mock_test(mock_method,request_data,url,method,response_data):
- mock_method = mock.Mock(return_value=response_data)
- res = mock_method(url,method,request_data)
- return res
四、数据操作包 operation_data4.1 获取excel单元格中的内容(get_data.py)
-
- #coding:utf-8
- from tool.operation_excel import OperationExcel
- import data_config
- from tool.operation_json import OperetionJson
- from tool.connect_db import OperationMysql
- class GetData:
- def __init__(self):
- self.opera_excel = OperationExcel()
-
- #去获取excel行数,就是case的个数
- def get_case_lines(self):
- return self.opera_excel.get_lines()
-
- #获取是否执行
- def get_is_run(self,row):
- flag = None
- col = int(data_config.get_run())
- run_model = self.opera_excel.get_cell_value(row,col)
- if run_model == 'yes':
- flag = True
- else:
- flag = False
- return flag
-
- #是否携带header
- def is_header(self,row):
- col = int(data_config.get_header())
- header = self.opera_excel.get_cell_value(row,col)
- if header != '':
- return header
- else:
- return None
-
- #获取请求方式
- def get_request_method(self,row):
- col = int(data_config.get_run_way())
- request_method = self.opera_excel.get_cell_value(row,col)
- return request_method
-
- #获取url
- def get_request_url(self,row):
- col = int(data_config.get_url())
- url = self.opera_excel.get_cell_value(row,col)
- return url
-
- #获取请求数据
- def get_request_data(self,row):
- col = int(data_config.get_data())
- data = self.opera_excel.get_cell_value(row,col)
- if data == '':
- return None
- return data
-
- #通过获取关键字拿到data数据
- def get_data_for_json(self,row):
- opera_json = OperetionJson()
- request_data = opera_json.get_data(self.get_request_data(row))
- return request_data
-
- #获取预期结果
- def get_expcet_data(self,row):
- col = int(data_config.get_expect())
- expect = self.opera_excel.get_cell_value(row,col)
- if expect == '':
- return None
- return expect
-
- #通过sql获取预期结果
- def get_expcet_data_for_mysql(self,row):
- op_mysql = OperationMysql()
- sql = self.get_expcet_data(row)
- res = op_mysql.search_one(sql)
- return res.decode('unicode-escape')
-
- def write_result(self,row,value):
- col = int(data_config.get_result())
- self.opera_excel.write_value(row,col,value)
-
- #获取依赖数据的key
- def get_depend_key(self,row):
- col = int(data_config.get_data_depend())
- depent_key = self.opera_excel.get_cell_value(row,col)
- if depent_key == "":
- return None
- else:
- return depent_key
-
- #判断是否有case依赖
- def is_depend(self,row):
- col = int(data_config.get_case_depend())
- depend_case_id = self.opera_excel.get_cell_value(row,col)
- if depend_case_id == "":
- return None
- else:
- return depend_case_id
-
- #获取数据依赖字段
- def get_depend_field(self,row):
- col = int(data_config.get_field_depend())
- data = self.opera_excel.get_cell_value(row,col)
- if data == "":
- return None
- else:
- return data
4.2 获取excel中每个列(data_config.py)
- #coding:utf-8
- class global_var:
- #case_id
- Id = '0'
- request_name = '1'
- url = '2'
- run = '3'
- request_way = '4'
- header = '5'
- case_depend = '6'
- data_depend = '7'
- field_depend = '8'
- data = '9'
- expect = '10'
- result = '11'
- #获取caseid
- def get_id():
- return global_var.Id
-
- #获取url
- def get_url():
- return global_var.url
-
- def get_run():
- return global_var.run
-
- def get_run_way():
- return global_var.request_way
-
- def get_header():
- return global_var.header
-
- def get_case_depend():
- return global_var.case_depend
-
- def get_data_depend():
- return global_var.data_depend
-
- def get_field_depend():
- return global_var.field_depend
-
- def get_data():
- return global_var.data
-
- def get_expect():
- return global_var.expect
-
- def get_result():
- return global_var.result
-
- def get_header_value():
- return global_var.header
4.3 解决数据依赖(dependent.py )
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
|
五、工具类包 tool5.1 操作excel (operation_excel.py)
-
- #coding:utf-8
- import xlrd
- from xlutils.copy import copy
- class OperationExcel:
- def __init__(self,file_name=None,sheet_id=None):
- if file_name:
- self.file_name = file_name
- self.sheet_id = sheet_id
- else:
- self.file_name = '../dataconfig/case1.xls'
- self.sheet_id = 0
- self.data = self.get_data()
-
- #获取sheets的内容
- def get_data(self):
- data = xlrd.open_workbook(self.file_name)
- tables = data.sheets()[self.sheet_id]
- return tables
-
- #获取单元格的行数
- def get_lines(self):
- tables = self.data
- return tables.nrows
-
- #获取某一个单元格的内容
- def get_cell_value(self,row,col):
- return self.data.cell_value(row,col)
-
- #写入数据
- def write_value(self,row,col,value):
- '''
- 写入excel数据
- row,col,value
- '''
- read_data = xlrd.open_workbook(self.file_name)
- write_data = copy(read_data)
- sheet_data = write_data.get_sheet(0)
- sheet_data.write(row,col,value)
- write_data.save(self.file_name)
-
- #根据对应的caseid 找到对应行的内容
- def get_rows_data(self,case_id):
- row_num = self.get_row_num(case_id)
- rows_data = self.get_row_values(row_num)
- return rows_data
-
- #根据对应的caseid找到对应的行号
- def get_row_num(self,case_id):
- num = 0
- clols_data = self.get_cols_data()
- for col_data in clols_data:
- if case_id in col_data:
- return num
- num = num+1
-
-
- #根据行号,找到该行的内容
- def get_row_values(self,row):
- tables = self.data
- row_data = tables.row_values(row)
- return row_data
-
- #获取某一列的内容
- def get_cols_data(self,col_id=None):
- if col_id != None:
- cols = self.data.col_values(col_id)
- else:
- cols = self.data.col_values(0)
- return cols
-
-
- if __name__ == '__main__':
- opers = OperationExcel()
- print opers.get_cell_value(1,2)
5.2判断字符串包含,判断字典是否相等(common_util.py)
-
- #coding:utf-8
- import json
- class CommonUtil:
- def is_contain(self,str_one,str_two):
- '''
- 判断一个字符串是否再另外一个字符串中
- str_one:查找的字符串
- str_two:被查找的字符串
- '''
- flag = None
- if isinstance(str_one,unicode):
- str_one = str_one.encode('unicode-escape').decode('string_escape')
- return cmp(str_one,str_two)
- if str_one in str_two:
- flag = True
- else:
- flag = False
- return flag
-
-
- def is_equal_dict(self,dict_one,dict_two):
- '''
- 判断两个字典是否相等
- '''
- if isinstance(dict_one,str):
- dict_one = json.loads(dict_one)
- if isinstance(dict_two,str):
- dict_two = json.loads(dict_two)
- return cmp(dict_one,dict_two)
5.3 操作header(operation_herder.py)
-
- #coding:utf-8
- import requests
- import json
- from operation_json import OperetionJson
- class OperationHeader:
- def __init__(self,response):
- self.response = json.loads(response)
- def get_response_url(self):
- '''
- 获取登录返回的token的url
- '''
- url = self.response['data']['url'][0]
- return url
-
- def get_cookie(self):
- '''
- 获取cookie的jar文件
- '''
- url = self.get_response_url()+"&callback=jQuery21008240514814031887_1508666806688&_=1508666806689"
- cookie = requests.get(url).cookies
- return cookie
-
- def write_cookie(self):
- cookie = requests.utils.dict_from_cookiejar(self.get_cookie())
- op_json = OperetionJson()
- op_json.write_data(cookie)
-
- if __name__ == '__main__':
-
- url = "http://www.jd.com/passport/user/login"
- data = {
- "username":"18513199586",
- "password":"111111",
- "verify":"",
- "referer":"https://www.jd.com"
- }
- res = json.dumps(requests.post(url,data).json())
- op_header = OperationHeader(res)
- op_header.write_cookie()
5.4 操作json文件(operation_json.py)
-
- #coding:utf-8
- import json
- class OperetionJson:
-
- def __init__(self,file_path=None):
- if file_path == None:
- self.file_path = '../dataconfig/user.json'
- else:
- self.file_path = file_path
- self.data = self.read_data()
-
- #读取json文件
- def read_data(self):
- with open(self.file_path) as fp:
- data = json.load(fp)
- return data
-
- #根据关键字获取数据
- def get_data(self,id):
- print type(self.data)
- return self.data[id]
-
- #写json
- def write_data(self,data):
- with open('../dataconfig/cookie.json','w') as fp:
- fp.write(json.dumps(data))
-
-
- if __name__ == '__main__':
- opjson = OperetionJson()
- print opjson.get_data('shop')
5.5 操作数据库(connect_db.py)
- #coding:utf-8
- import MySQLdb.cursors
- import json
- class OperationMysql:
- def __init__(self):
- self.conn = MySQLdb.connect(
- host='localhost',
- port=3306,
- user='root',
- passwd='123456',
- db='le_study',
- charset='utf8',
- cursorclass=MySQLdb.cursors.DictCursor
- )
- self.cur = self.conn.cursor()
-
- #查询一条数据
- def search_one(self,sql):
- self.cur.execute(sql)
- result = self.cur.fetchone()
- result = json.dumps(result)
- return result
-
- if __name__ == '__main__':
- op_mysql = OperationMysql()
- res = op_mysql.search_one("select * from web_user where Name='ailiailan'")
- print res
5.6 发送报告邮件(send_email.py)
- #coding:utf-8
- import smtplib
- from email.mime.text import MIMEText
- class SendEmail:
- global send_user
- global email_host
- global password
- email_host = "smtp.163.com"
- send_user = "jiaxiaonan666@163.com"
- password = "jia_668"
- def send_mail(self,user_list,sub,content):
- user = "jiaxiaonan"+"<"+send_user+">"
- message = MIMEText(content,_subtype='plain',_charset='utf-8')
- message['Subject'] = sub
- message['From'] = user
- message['To'] = ";".join(user_list)
- server = smtplib.SMTP()
- server.connect(email_host)
- server.login(send_user,password)
- server.sendmail(user,user_list,message.as_string())
- server.close()
-
- def send_main(self,pass_list,fail_list):
- pass_num = float(len(pass_list))
- fail_num = float(len(fail_list))
- count_num = pass_num+fail_num
- #90%
- pass_result = "%.2f%%" %(pass_num/count_num*100)
- fail_result = "%.2f%%" %(fail_num/count_num*100)
-
-
- user_list = ['609037724@qq.com']
- sub = "接口自动化测试报告"
- content = "此次一共运行接口个数为%s个,通过个数为%s个,失败个数为%s,通过率为%s,失败率为%s" %(count_num,pass_num,fail_num,pass_result,fail_result )
- self.send_mail(user_list,sub,content)
-
- if __name__ == '__main__':
- sen = SendEmail()
- sen.send_main([1,2,3,4],[2,3,4,5,6,7])
六、主函数
- run_test.py
-
- #coding:utf-8
- import sys
- sys.path.append("C:/Users/lxz/Desktop/InterFace_JIA")
- from base.runmethod import RunMethod
- from operation_data.get_data import GetData
- from tool.common_util import CommonUtil
- from operation_data.dependent_data import DependdentData
- from tool.send_email import SendEmail
- from tool.operation_header import OperationHeader
- from tool.operation_json import OperetionJson
- class RunTest:
- def __init__(self):
- self.run_method = RunMethod()
- self.data = GetData()
- self.com_util = CommonUtil()
- self.send_mai = SendEmail()
-
- #程序执行的
- def go_on_run(self):
- res = None
- pass_count = []
- fail_count = []
- #10 0,1,2,3
- rows_count = self.data.get_case_lines()
- for i in range(1,rows_count):
- is_run = self.data.get_is_run(i)
- if is_run:
- url = self.data.get_request_url(i)
- method = self.data.get_request_method(i)
- request_data = self.data.get_data_for_json(i)
- expect = self.data.get_expcet_data_for_mysql(i)
- header = self.data.is_header(i)
- depend_case = self.data.is_depend(i)
- if depend_case != None:
- self.depend_data = DependdentData(depend_case)
- #获取的依赖响应数据
- depend_response_data = self.depend_data.get_data_for_key(i)
- #获取依赖的key
- depend_key = self.data.get_depend_field(i)
- request_data[depend_key] = depend_response_data
- if header == 'write':
- res = self.run_method.run_main(method,url,request_data)
- op_header = OperationHeader(res)
- op_header.write_cookie()
-
- elif header == 'yes':
- op_json = OperetionJson('../dataconfig/cookie.json')
- cookie = op_json.get_data('apsid')
- cookies = {
- 'apsid':cookie
- }
- res = self.run_method.run_main(method,url,request_data,cookies)
- else:
- res = self.run_method.run_main(method,url,request_data)
-
- if self.com_util.is_equal_dict(expect,res) == 0:
- self.data.write_result(i,'pass')
- pass_count.append(i)
- else:
- self.data.write_result(i,res)
- fail_count.append(i)
- self.send_mai.send_main(pass_count,fail_count)
-
- #将执行判断封装
- #def get_cookie_run(self,header):
-
- if __name__ == '__main__':
- run = RunTest()
- run.go_on_run()
Python接口自动化测试零基础入门到精通(2023最新版)