12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- # -*- coding: utf-8 -*-
- from filelock import FileLock
- from apirun.Log.Log import logger
- import pytest
- import logging
- import allure
- import os
- import jsonpath
- import json
- from apirun.parse.CaseParser import case_parser
- from xdist.scheduler import LoadScopeScheduling
- class CustomScopeScheduler(LoadScopeScheduling):
- """
- 自定义调度器类,用于根据测试范围分发测试用例。
- 重写该方法
- """
- def _split_scope(self, nodeid: str) -> str:
- """
- 根据节点ID分割测试范围。
- :param nodeid: 测试的唯一标识符。
- :return: 测试范围。
- """
- # 返回 nodeid 的作为测试范围
- # 根据实际需求修改这个逻辑
- return nodeid.split("--P", 1)[0]
- def pytest_xdist_make_scheduler(config, log):
- return CustomScopeScheduler(config, log)
- def pytest_addoption(parser):
- """
- 增加pytest运行的配置项
- :param parser:
- :return:
- """
- parser.addoption("--type", action="store", default="yaml", help="测试用例类型")
- parser.addoption("--cases", action="store", default="./examples", help="测试用例目录")
- def pytest_generate_tests(metafunc):
- """
- 主要用来生成测试用例的,相当于 参数化。
- :param metafunc:
- :return:
- """
- # 读取用户传过来的参数
- case_type = metafunc.config.getoption("type") # 类型
- cases_dir = metafunc.config.getoption("cases") # 路径
- # 调用方法
- data = case_parser(case_type, cases_dir)
- # 进行测试用例进行参数化,自动交给runner去进行执行执行
- if "caseinfo" in metafunc.fixturenames:
- metafunc.parametrize("caseinfo", data["case_infos"], ids=data["case_process"])
- @pytest.hookimpl(hookwrapper=True, tryfirst=True)
- def pytest_runtest_makereport(item, call):
- # 通过 out = yield 定义了一个生成器。在生成器中,res = out.get_result() 获取了测试结果对象。
- out = yield # 类似于return,但是它返回之后执行完毕会自动回来
- res = out.get_result()
- # res.when == "call":表示正在运行调用测试函数的阶段。
- if res.when == "call":
- logging.info(f"用例ID:{res.nodeid}")
- logging.info(f"测试结果:{res.outcome}")
- logging.info(f"故障表示:{res.longrepr}")
- logging.info(f"异常:{call.excinfo}")
- logging.info(f"用例耗时:{res.duration}")
- logging.info("**************************************")
- # def login():
- # pass
- # @pytest.fixture(scope="session")
- # def token_fix(tmp_path_factory, worker_id):
- # # 代表是单机运行
- # if worker_id == "master":
- # token = login()
- # print("fixture:请求登录接口,获取token", token)
- # os.environ['token'] = token
- # return token
- # # 分布式运行
- # # 获取所有子节点共享的临时目录,无需修改【不可删除、修改】
- # root_tmp_dir = tmp_path_factory.getbasetemp().parent
- # fn = root_tmp_dir / "data.json"
- # with FileLock(str(fn) + ".lock"):
- # if fn.is_file(): # 代表已经有进程执行过该fixture
- # token = json.loads(fn.read_text())
- # else: # 代表该fixture第一次被执行
- # token = login()
- # fn.write_text(json.dumps(token))
- # # 最好将后续需要保留的数据存在某个地方,比如这里是os的环境变量
- # os.environ['token'] = token
- # return token
|