# -*- coding: utf-8 -*- from filelock import FileLock from apirun.Log.Log import logger import pytest import logging import allure import os import jsonpath import json from apirun.parse.CaseParser import case_parser from xdist.scheduler import LoadScopeScheduling class CustomScopeScheduler(LoadScopeScheduling): """ 自定义调度器类,用于根据测试范围分发测试用例。 重写该方法 """ def _split_scope(self, nodeid: str) -> str: """ 根据节点ID分割测试范围。 :param nodeid: 测试的唯一标识符。 :return: 测试范围。 """ # 返回 nodeid 的作为测试范围 # 根据实际需求修改这个逻辑 return nodeid.split("--P", 1)[0] def pytest_xdist_make_scheduler(config, log): return CustomScopeScheduler(config, log) def pytest_addoption(parser): """ 增加pytest运行的配置项 :param parser: :return: """ parser.addoption("--type", action="store", default="yaml", help="测试用例类型") parser.addoption("--cases", action="store", default="./examples", help="测试用例目录") def pytest_generate_tests(metafunc): """ 主要用来生成测试用例的,相当于 参数化。 :param metafunc: :return: """ # 读取用户传过来的参数 case_type = metafunc.config.getoption("type") # 类型 cases_dir = metafunc.config.getoption("cases") # 路径 # 调用方法 data = case_parser(case_type, cases_dir) # 进行测试用例进行参数化,自动交给runner去进行执行执行 if "caseinfo" in metafunc.fixturenames: metafunc.parametrize("caseinfo", data["case_infos"], ids=data["case_process"]) @pytest.hookimpl(hookwrapper=True, tryfirst=True) def pytest_runtest_makereport(item, call): # 通过 out = yield 定义了一个生成器。在生成器中,res = out.get_result() 获取了测试结果对象。 out = yield # 类似于return,但是它返回之后执行完毕会自动回来 res = out.get_result() # res.when == "call":表示正在运行调用测试函数的阶段。 if res.when == "call": logging.info(f"用例ID:{res.nodeid}") logging.info(f"测试结果:{res.outcome}") logging.info(f"故障表示:{res.longrepr}") logging.info(f"异常:{call.excinfo}") logging.info(f"用例耗时:{res.duration}") logging.info("**************************************") # def login(): # pass # @pytest.fixture(scope="session") # def token_fix(tmp_path_factory, worker_id): # # 代表是单机运行 # if worker_id == "master": # token = login() # print("fixture:请求登录接口,获取token", token) # os.environ['token'] = token # return token # # 分布式运行 # # 获取所有子节点共享的临时目录,无需修改【不可删除、修改】 # root_tmp_dir = tmp_path_factory.getbasetemp().parent # fn = root_tmp_dir / "data.json" # with FileLock(str(fn) + ".lock"): # if fn.is_file(): # 代表已经有进程执行过该fixture # token = json.loads(fn.read_text()) # else: # 代表该fixture第一次被执行 # token = login() # fn.write_text(json.dumps(token)) # # 最好将后续需要保留的数据存在某个地方,比如这里是os的环境变量 # os.environ['token'] = token # return token