diff --git a/pyproject.toml b/pyproject.toml index 997fb3801..020b36cb4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,3 +35,6 @@ norecursedirs = [ 'data', '__pycache__', ] + +# Don't find files with test at the end such as run_test.py +python_files = ['test_*.py'] diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index 121166a3e..d751bad5a 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -2,6 +2,7 @@ import asyncio import atexit import concurrent.futures import os +import pathlib import platform import random import re @@ -43,6 +44,8 @@ BSD = MACOS or FREEBSD or OPENBSD TIMEOUT_DEFAULT = timedelta(seconds=int(os.getenv("PG_TEST_TIMEOUT_DEFAULT", "10"))) FORCE_PORTS = os.getenv("PG_FORCE_PORTS", "NO").lower() not in ("no", "0", "n", "") +REGRESS_DIR = pathlib.Path(os.path.realpath(__file__)).parent.parent + def initialize_temp_dir(temp_dir): if os.path.exists(temp_dir): diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 9c180271f..24a9d8b36 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -9,68 +9,30 @@ import re import shutil import sys from collections import OrderedDict +from contextlib import contextmanager from typing import Optional import common +from common import REGRESS_DIR, capture, run from config import ARBITRARY_SCHEDULE_NAMES, MASTER_VERSION, CitusDefaultClusterConfig -# Returns true if given test_schedule_line is of the form: -# "test: upgrade_ ... _after .." -def schedule_line_is_upgrade_after(test_schedule_line: str) -> bool: - return ( - test_schedule_line.startswith("test: upgrade_") - and "_after" in test_schedule_line - ) +def main(): + args = parse_arguments() + + test_name = get_test_name(args) + + # All python tests start with test_ and all other tests don't. This is by + # convention. + if test_name.startswith("test_"): + run_python_test(test_name, args) + # above function never returns + else: + run_regress_test(test_name, args) -def run_python_test(test_file_name, repeat): - """Runs the test using pytest - - This function never returns as it usese os.execlp to replace the current - process with a new pytest process. - """ - test_path = regress_dir / "citus_tests" / "test" / f"{test_file_name}.py" - if not test_path.exists(): - raise Exception("Test could not be found in any schedule") - - os.execlp( - "pytest", - "pytest", - "--numprocesses", - "auto", - "--count", - str(repeat), - str(test_path), - ) - - -def run_schedule_with_python(schedule): - bindir = common.capture("pg_config --bindir").rstrip() - pgxs_path = pathlib.Path(common.capture("pg_config --pgxs").rstrip()) - - os.chdir(regress_dir) - os.environ["PATH"] = str(regress_dir / "bin") + os.pathsep + os.environ["PATH"] - os.environ["PG_REGRESS_DIFF_OPTS"] = "-dU10 -w" - os.environ["CITUS_OLD_VERSION"] = f"v{MASTER_VERSION}.0" - - args = { - "--pgxsdir": str(pgxs_path.parent.parent.parent), - "--bindir": bindir, - } - - config = CitusDefaultClusterConfig(args) - common.initialize_temp_dir(config.temp_dir) - common.initialize_citus_cluster( - config.bindir, config.datadir, config.settings, config - ) - common.run_pg_regress( - config.bindir, config.pg_srcdir, config.coordinator_port(), schedule - ) - - -if __name__ == "__main__": +def parse_arguments(): args = argparse.ArgumentParser() args.add_argument( "test_name", help="Test name (must be included in a schedule.)", nargs="?" @@ -106,208 +68,125 @@ if __name__ == "__main__": action="store_true", ) - args = vars(args.parse_args()) + return vars(args.parse_args()) - regress_dir = pathlib.Path( - os.path.dirname(os.path.dirname(os.path.realpath(__file__))) - ) - test_file_path = args["path"] - test_file_name = args["test_name"] - use_base_schedule = args["use_base_schedule"] - use_whole_schedule_line = args["use_whole_schedule_line"] - class TestDeps: - schedule: Optional[str] - direct_extra_tests: list[str] +class TestDeps: + schedule: Optional[str] + direct_extra_tests: list[str] - def __init__(self, schedule, extra_tests=None, repeatable=True, worker_count=2): - self.schedule = schedule - self.direct_extra_tests = extra_tests or [] - self.repeatable = repeatable - self.worker_count = worker_count + def __init__(self, schedule, extra_tests=None, repeatable=True, worker_count=2): + self.schedule = schedule + self.direct_extra_tests = extra_tests or [] + self.repeatable = repeatable + self.worker_count = worker_count - def extra_tests(self): - all_deps = OrderedDict() - for direct_dep in self.direct_extra_tests: - if direct_dep in deps: - for indirect_dep in deps[direct_dep].extra_tests(): - all_deps[indirect_dep] = True - all_deps[direct_dep] = True + def extra_tests(self): + all_deps = OrderedDict() + for direct_dep in self.direct_extra_tests: + if direct_dep in DEPS: + for indirect_dep in DEPS[direct_dep].extra_tests(): + all_deps[indirect_dep] = True + all_deps[direct_dep] = True - return list(all_deps.keys()) + return list(all_deps.keys()) - deps = { - "multi_cluster_management": TestDeps( - None, ["multi_test_helpers_superuser"], repeatable=False - ), - "create_role_propagation": TestDeps(None, ["multi_cluster_management"]), - "single_node_enterprise": TestDeps(None), - "single_node": TestDeps(None), - "single_node_truncate": TestDeps(None), - "multi_extension": TestDeps(None, repeatable=False), - "multi_test_helpers": TestDeps(None), - "multi_insert_select": TestDeps("base_schedule"), - "multi_mx_create_table": TestDeps( - None, - [ - "multi_test_helpers_superuser", - "multi_mx_node_metadata", - "multi_cluster_management", - "multi_mx_function_table_reference", - ], - ), - "background_rebalance_parallel": TestDeps( - None, - [ - "multi_test_helpers", - "multi_cluster_management", - ], - worker_count=6, - ), - "multi_mx_modifying_xacts": TestDeps(None, ["multi_mx_create_table"]), - "multi_mx_router_planner": TestDeps(None, ["multi_mx_create_table"]), - "multi_mx_copy_data": TestDeps(None, ["multi_mx_create_table"]), - "multi_mx_schema_support": TestDeps(None, ["multi_mx_copy_data"]), - "multi_simple_queries": TestDeps("base_schedule"), - } - if not (test_file_name or test_file_path): - print("FATAL: No test given.") - sys.exit(2) +DEPS = { + "multi_cluster_management": TestDeps( + None, ["multi_test_helpers_superuser"], repeatable=False + ), + "create_role_propagation": TestDeps(None, ["multi_cluster_management"]), + "single_node_enterprise": TestDeps(None), + "single_node": TestDeps(None), + "single_node_truncate": TestDeps(None), + "multi_extension": TestDeps(None, repeatable=False), + "multi_test_helpers": TestDeps(None), + "multi_insert_select": TestDeps("base_schedule"), + "multi_mx_create_table": TestDeps( + None, + [ + "multi_test_helpers_superuser", + "multi_mx_node_metadata", + "multi_cluster_management", + "multi_mx_function_table_reference", + ], + ), + "background_rebalance_parallel": TestDeps( + None, + [ + "multi_test_helpers", + "multi_cluster_management", + ], + worker_count=6, + ), + "multi_mx_modifying_xacts": TestDeps(None, ["multi_mx_create_table"]), + "multi_mx_router_planner": TestDeps(None, ["multi_mx_create_table"]), + "multi_mx_copy_data": TestDeps(None, ["multi_mx_create_table"]), + "multi_mx_schema_support": TestDeps(None, ["multi_mx_copy_data"]), + "multi_simple_queries": TestDeps("base_schedule"), +} - if test_file_path: - test_file_path = os.path.join(os.getcwd(), args["path"]) - if not os.path.isfile(test_file_path): - print(f"ERROR: test file '{test_file_path}' does not exist") - sys.exit(2) +def run_python_test(test_name, args): + """Runs the test using pytest - test_file_extension = pathlib.Path(test_file_path).suffix - test_file_name = pathlib.Path(test_file_path).stem - - if test_file_extension not in (".spec", ".sql", ".py"): - print( - "ERROR: Unrecognized test extension. Valid extensions are: .sql, .spec, and .py" - ) - sys.exit(1) - - test_schedule = "" - dependencies = [] - - if test_file_name.startswith("test_"): - run_python_test(test_file_name, args["repeat"]) - - # find related schedule - for schedule_file_path in sorted(regress_dir.glob("*_schedule")): - for schedule_line in open(schedule_file_path, "r"): - if re.search(r"\b" + test_file_name + r"\b", schedule_line): - test_schedule = pathlib.Path(schedule_file_path).stem - if use_whole_schedule_line: - test_schedule_line = schedule_line - else: - test_schedule_line = f"test: {test_file_name}\n" - break - else: - continue - break - else: + This function never returns as it usese os.execlp to replace the current + process with a new pytest process. + """ + test_path = REGRESS_DIR / "citus_tests" / "test" / f"{test_name}.py" + if not test_path.exists(): raise Exception("Test could not be found in any schedule") - def default_base_schedule(test_schedule): - if "isolation" in test_schedule: - return "base_isolation_schedule" - - if "failure" in test_schedule: - return "failure_base_schedule" - - if "enterprise" in test_schedule: - return "enterprise_minimal_schedule" - - if "split" in test_schedule: - return "minimal_schedule" - - if "mx" in test_schedule: - if use_base_schedule: - return "mx_base_schedule" - return "mx_minimal_schedule" - - if "operations" in test_schedule: - return "minimal_schedule" - - if "after_citus_upgrade" in test_schedule: - print( - f"WARNING: After citus upgrade schedule ({test_schedule}) is not supported." - ) - sys.exit(0) - - if "citus_upgrade" in test_schedule: - return None - - if "pg_upgrade" in test_schedule: - return "minimal_schedule" - - if test_schedule in ARBITRARY_SCHEDULE_NAMES: - print( - f"WARNING: Arbitrary config schedule ({test_schedule}) is not supported." - ) - sys.exit(0) - - if use_base_schedule: - return "base_schedule" - return "minimal_schedule" - - # we run the tests with 2 workers by default. - # If we find any dependency which requires more workers, we update the worker count. - def worker_count_for(test_name): - if test_name in deps: - return deps[test_name].worker_count - return 2 - - test_worker_count = max(worker_count_for(test_file_name), 2) - - if test_file_name in deps: - dependencies = deps[test_file_name] - elif schedule_line_is_upgrade_after(test_schedule_line): - dependencies = TestDeps( - default_base_schedule(test_schedule), - [test_file_name.replace("_after", "_before")], - ) - else: - dependencies = TestDeps(default_base_schedule(test_schedule)) - - if "before_" in test_schedule: - dependencies.repeatable = False - - # copy base schedule to a temp file and append test_schedule_line - # to be able to run tests in parallel (if test_schedule_line is a parallel group.) - tmp_schedule_path = os.path.join( - regress_dir, f"tmp_schedule_{ random.randint(1, 10000)}" + os.execlp( + "pytest", + "pytest", + "--numprocesses", + "auto", + "--count", + str(args["repeat"]), + str(test_path), ) - # some tests don't need a schedule to run - # e.g tests that are in the first place in their own schedule - if dependencies.schedule: - shutil.copy2( - os.path.join(regress_dir, dependencies.schedule), tmp_schedule_path - ) - with open(tmp_schedule_path, "a") as myfile: - for dependency in dependencies.extra_tests(): - myfile.write(f"test: {dependency}\n") - test_worker_count = max(worker_count_for(dependency), test_worker_count) - repetition_cnt = args["repeat"] - if repetition_cnt > 1 and not dependencies.repeatable: - repetition_cnt = 1 - print(f"WARNING: Cannot repeatably run this test: '{test_file_name}'") - for _ in range(repetition_cnt): - myfile.write(test_schedule_line) - if "upgrade" in test_schedule_line: - try: - run_schedule_with_python(pathlib.Path(tmp_schedule_path).stem) - finally: - # remove temp schedule file - os.remove(tmp_schedule_path) - sys.exit(0) +def run_regress_test(test_name, args): + original_schedule, schedule_line = find_test_schedule_and_line(test_name, args) + + dependencies = test_dependencies(test_name, original_schedule, schedule_line, args) + + with tmp_schedule(test_name, dependencies, schedule_line, args) as schedule: + if "upgrade" in original_schedule: + run_schedule_with_python(schedule) + else: + run_schedule_with_multiregress(test_name, schedule, dependencies, args) + + +def run_schedule_with_python(schedule): + bindir = capture("pg_config --bindir").rstrip() + pgxs_path = pathlib.Path(capture("pg_config --pgxs").rstrip()) + + os.chdir(REGRESS_DIR) + os.environ["PATH"] = str(REGRESS_DIR / "bin") + os.pathsep + os.environ["PATH"] + os.environ["PG_REGRESS_DIFF_OPTS"] = "-dU10 -w" + os.environ["CITUS_OLD_VERSION"] = f"v{MASTER_VERSION}.0" + + args = { + "--pgxsdir": str(pgxs_path.parent.parent.parent), + "--bindir": bindir, + } + + config = CitusDefaultClusterConfig(args) + common.initialize_temp_dir(config.temp_dir) + common.initialize_citus_cluster( + config.bindir, config.datadir, config.settings, config + ) + common.run_pg_regress( + config.bindir, config.pg_srcdir, config.coordinator_port(), schedule + ) + + +def run_schedule_with_multiregress(test_name, schedule, dependencies, args): + worker_count = needed_worker_count(test_name, dependencies) # find suitable make recipe if dependencies.schedule == "base_isolation_schedule": @@ -322,15 +201,160 @@ if __name__ == "__main__": # prepare command to run tests test_command = ( - f"make -C {regress_dir} {make_recipe} " - f"WORKERCOUNT={test_worker_count} " - f"SCHEDULE='{pathlib.Path(tmp_schedule_path).stem}'" + f"make -C {REGRESS_DIR} {make_recipe} " + f"WORKERCOUNT={worker_count} " + f"SCHEDULE='{schedule}'" + ) + run(test_command) + + +def default_base_schedule(test_schedule, args): + if "isolation" in test_schedule: + return "base_isolation_schedule" + + if "failure" in test_schedule: + return "failure_base_schedule" + + if "enterprise" in test_schedule: + return "enterprise_minimal_schedule" + + if "split" in test_schedule: + return "minimal_schedule" + + if "mx" in test_schedule: + if args["use_base_schedule"]: + return "mx_base_schedule" + return "mx_minimal_schedule" + + if "operations" in test_schedule: + return "minimal_schedule" + + if "after_citus_upgrade" in test_schedule: + print( + f"WARNING: After citus upgrade schedule ({test_schedule}) is not supported." + ) + sys.exit(0) + + if "citus_upgrade" in test_schedule: + return None + + if "pg_upgrade" in test_schedule: + return "minimal_schedule" + + if test_schedule in ARBITRARY_SCHEDULE_NAMES: + print(f"WARNING: Arbitrary config schedule ({test_schedule}) is not supported.") + sys.exit(0) + + if args["use_base_schedule"]: + return "base_schedule" + return "minimal_schedule" + + +# we run the tests with 2 workers by default. +# If we find any dependency which requires more workers, we update the worker count. +def worker_count_for(test_name): + if test_name in DEPS: + return DEPS[test_name].worker_count + return 2 + + +def get_test_name(args): + if args["test_name"]: + return args["test_name"] + + if not args["path"]: + print("FATAL: No test given.") + sys.exit(2) + + absolute_test_path = os.path.join(os.getcwd(), args["path"]) + + if not os.path.isfile(absolute_test_path): + print(f"ERROR: test file '{absolute_test_path}' does not exist") + sys.exit(2) + + if pathlib.Path(absolute_test_path).suffix not in (".spec", ".sql", ".py"): + print( + "ERROR: Unrecognized test extension. Valid extensions are: .sql, .spec, and .py" + ) + sys.exit(1) + + return pathlib.Path(absolute_test_path).stem + + +def find_test_schedule_and_line(test_name, args): + for schedule_file_path in sorted(REGRESS_DIR.glob("*_schedule")): + for schedule_line in open(schedule_file_path, "r"): + if re.search(r"\b" + test_name + r"\b", schedule_line): + test_schedule = pathlib.Path(schedule_file_path).stem + if args["use_whole_schedule_line"]: + return test_schedule, schedule_line + return test_schedule, f"test: {test_name}\n" + raise Exception("Test could not be found in any schedule") + + +def test_dependencies(test_name, test_schedule, schedule_line, args): + if test_name in DEPS: + return DEPS[test_name] + + if schedule_line_is_upgrade_after(schedule_line): + # upgrade_xxx_after tests always depend on upgrade_xxx_before + return TestDeps( + default_base_schedule(test_schedule, args), + [test_name.replace("_after", "_before")], + ) + + # before_ tests leave stuff around on purpose for the after tests. So they + # are not repeatable by definition. + if "before_" in test_schedule: + repeatable = False + else: + repeatable = True + + return TestDeps(default_base_schedule(test_schedule, args), repeatable=repeatable) + + +# Returns true if given test_schedule_line is of the form: +# "test: upgrade_ ... _after .." +def schedule_line_is_upgrade_after(test_schedule_line: str) -> bool: + return ( + test_schedule_line.startswith("test: upgrade_") + and "_after" in test_schedule_line ) - # run test command n times + +@contextmanager +def tmp_schedule(test_name, dependencies, schedule_line, args): + tmp_schedule_path = REGRESS_DIR / f"tmp_schedule_{random.randint(1, 10000)}" + + # Prefill the temporary schedule with the base schedule that this test + # depends on. Some tests don't need a base schedule to run though, + # e.g tests that are in the first place in their own schedule + if dependencies.schedule: + shutil.copy2(REGRESS_DIR / dependencies.schedule, tmp_schedule_path) + + with open(tmp_schedule_path, "a") as myfile: + # Add any specific dependencies + for dependency in dependencies.extra_tests(): + myfile.write(f"test: {dependency}\n") + + repetition_cnt = args["repeat"] + if repetition_cnt > 1 and not dependencies.repeatable: + repetition_cnt = 1 + print(f"WARNING: Cannot repeatably run this test: '{test_name}'") + for _ in range(repetition_cnt): + myfile.write(schedule_line) + try: - print(f"Executing.. {test_command}") - result = common.run(test_command) + yield tmp_schedule_path.stem finally: - # remove temp schedule file os.remove(tmp_schedule_path) + + +def needed_worker_count(test_name, dependencies): + worker_count = worker_count_for(test_name) + for dependency in dependencies.extra_tests(): + worker_count = max(worker_count_for(dependency), worker_count) + + +if __name__ == "__main__": + main()