conftest.py 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. # -*- coding: utf-8 -*-
  2. from filelock import FileLock
  3. from apirun.Log.Log import logger
  4. import pytest
  5. import logging
  6. import allure
  7. import os
  8. import jsonpath
  9. import json
  10. from apirun.parse.CaseParser import case_parser
  11. from xdist.scheduler import LoadScopeScheduling
  12. class CustomScopeScheduler(LoadScopeScheduling):
  13. """
  14. 自定义调度器类,用于根据测试范围分发测试用例。
  15. 重写该方法
  16. """
  17. def _split_scope(self, nodeid: str) -> str:
  18. """
  19. 根据节点ID分割测试范围。
  20. :param nodeid: 测试的唯一标识符。
  21. :return: 测试范围。
  22. """
  23. # 返回 nodeid 的作为测试范围
  24. # 根据实际需求修改这个逻辑
  25. return nodeid.split("--P", 1)[0]
  26. def pytest_xdist_make_scheduler(config, log):
  27. return CustomScopeScheduler(config, log)
  28. def pytest_addoption(parser):
  29. """
  30. 增加pytest运行的配置项
  31. :param parser:
  32. :return:
  33. """
  34. parser.addoption("--type", action="store", default="yaml", help="测试用例类型")
  35. parser.addoption("--cases", action="store", default="./examples", help="测试用例目录")
  36. def pytest_generate_tests(metafunc):
  37. """
  38. 主要用来生成测试用例的,相当于 参数化。
  39. :param metafunc:
  40. :return:
  41. """
  42. # 读取用户传过来的参数
  43. case_type = metafunc.config.getoption("type") # 类型
  44. cases_dir = metafunc.config.getoption("cases") # 路径
  45. # 调用方法
  46. data = case_parser(case_type, cases_dir)
  47. # 进行测试用例进行参数化,自动交给runner去进行执行执行
  48. if "caseinfo" in metafunc.fixturenames:
  49. metafunc.parametrize("caseinfo", data["case_infos"], ids=data["case_process"])
  50. @pytest.hookimpl(hookwrapper=True, tryfirst=True)
  51. def pytest_runtest_makereport(item, call):
  52. # 通过 out = yield 定义了一个生成器。在生成器中,res = out.get_result() 获取了测试结果对象。
  53. out = yield # 类似于return,但是它返回之后执行完毕会自动回来
  54. res = out.get_result()
  55. # res.when == "call":表示正在运行调用测试函数的阶段。
  56. if res.when == "call":
  57. logging.info(f"用例ID:{res.nodeid}")
  58. logging.info(f"测试结果:{res.outcome}")
  59. logging.info(f"故障表示:{res.longrepr}")
  60. logging.info(f"异常:{call.excinfo}")
  61. logging.info(f"用例耗时:{res.duration}")
  62. logging.info("**************************************")
  63. # def login():
  64. # pass
  65. # @pytest.fixture(scope="session")
  66. # def token_fix(tmp_path_factory, worker_id):
  67. # # 代表是单机运行
  68. # if worker_id == "master":
  69. # token = login()
  70. # print("fixture:请求登录接口,获取token", token)
  71. # os.environ['token'] = token
  72. # return token
  73. # # 分布式运行
  74. # # 获取所有子节点共享的临时目录,无需修改【不可删除、修改】
  75. # root_tmp_dir = tmp_path_factory.getbasetemp().parent
  76. # fn = root_tmp_dir / "data.json"
  77. # with FileLock(str(fn) + ".lock"):
  78. # if fn.is_file(): # 代表已经有进程执行过该fixture
  79. # token = json.loads(fn.read_text())
  80. # else: # 代表该fixture第一次被执行
  81. # token = login()
  82. # fn.write_text(json.dumps(token))
  83. # # 最好将后续需要保留的数据存在某个地方,比如这里是os的环境变量
  84. # os.environ['token'] = token
  85. # return token