Refactor run_test.py (#6816)

Over the last few months run_test.py got more and more complex. This
refactors the code in `run_test.py` to be better understandable. Mostly
this splits up separate pieces of logic into separate functions.
pull/6831/head
Jelte Fennema 2023-04-05 11:11:30 +02:00 committed by GitHub
parent d4f9de7875
commit e5e5eb35c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 277 additions and 247 deletions

View File

@ -35,3 +35,6 @@ norecursedirs = [
'data',
'__pycache__',
]
# Don't find files with test at the end such as run_test.py
python_files = ['test_*.py']

View File

@ -2,6 +2,7 @@ import asyncio
import atexit
import concurrent.futures
import os
import pathlib
import platform
import random
import re
@ -43,6 +44,8 @@ BSD = MACOS or FREEBSD or OPENBSD
TIMEOUT_DEFAULT = timedelta(seconds=int(os.getenv("PG_TEST_TIMEOUT_DEFAULT", "10")))
FORCE_PORTS = os.getenv("PG_FORCE_PORTS", "NO").lower() not in ("no", "0", "n", "")
REGRESS_DIR = pathlib.Path(os.path.realpath(__file__)).parent.parent
def initialize_temp_dir(temp_dir):
if os.path.exists(temp_dir):

View File

@ -9,68 +9,30 @@ import re
import shutil
import sys
from collections import OrderedDict
from contextlib import contextmanager
from typing import Optional
import common
from common import REGRESS_DIR, capture, run
from config import ARBITRARY_SCHEDULE_NAMES, MASTER_VERSION, CitusDefaultClusterConfig
# Returns true if given test_schedule_line is of the form:
# "test: upgrade_ ... _after .."
def schedule_line_is_upgrade_after(test_schedule_line: str) -> bool:
return (
test_schedule_line.startswith("test: upgrade_")
and "_after" in test_schedule_line
)
def main():
args = parse_arguments()
test_name = get_test_name(args)
# All python tests start with test_ and all other tests don't. This is by
# convention.
if test_name.startswith("test_"):
run_python_test(test_name, args)
# above function never returns
else:
run_regress_test(test_name, args)
def run_python_test(test_file_name, repeat):
"""Runs the test using pytest
This function never returns as it usese os.execlp to replace the current
process with a new pytest process.
"""
test_path = regress_dir / "citus_tests" / "test" / f"{test_file_name}.py"
if not test_path.exists():
raise Exception("Test could not be found in any schedule")
os.execlp(
"pytest",
"pytest",
"--numprocesses",
"auto",
"--count",
str(repeat),
str(test_path),
)
def run_schedule_with_python(schedule):
bindir = common.capture("pg_config --bindir").rstrip()
pgxs_path = pathlib.Path(common.capture("pg_config --pgxs").rstrip())
os.chdir(regress_dir)
os.environ["PATH"] = str(regress_dir / "bin") + os.pathsep + os.environ["PATH"]
os.environ["PG_REGRESS_DIFF_OPTS"] = "-dU10 -w"
os.environ["CITUS_OLD_VERSION"] = f"v{MASTER_VERSION}.0"
args = {
"--pgxsdir": str(pgxs_path.parent.parent.parent),
"--bindir": bindir,
}
config = CitusDefaultClusterConfig(args)
common.initialize_temp_dir(config.temp_dir)
common.initialize_citus_cluster(
config.bindir, config.datadir, config.settings, config
)
common.run_pg_regress(
config.bindir, config.pg_srcdir, config.coordinator_port(), schedule
)
if __name__ == "__main__":
def parse_arguments():
args = argparse.ArgumentParser()
args.add_argument(
"test_name", help="Test name (must be included in a schedule.)", nargs="?"
@ -106,208 +68,125 @@ if __name__ == "__main__":
action="store_true",
)
args = vars(args.parse_args())
return vars(args.parse_args())
regress_dir = pathlib.Path(
os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
)
test_file_path = args["path"]
test_file_name = args["test_name"]
use_base_schedule = args["use_base_schedule"]
use_whole_schedule_line = args["use_whole_schedule_line"]
class TestDeps:
schedule: Optional[str]
direct_extra_tests: list[str]
class TestDeps:
schedule: Optional[str]
direct_extra_tests: list[str]
def __init__(self, schedule, extra_tests=None, repeatable=True, worker_count=2):
self.schedule = schedule
self.direct_extra_tests = extra_tests or []
self.repeatable = repeatable
self.worker_count = worker_count
def __init__(self, schedule, extra_tests=None, repeatable=True, worker_count=2):
self.schedule = schedule
self.direct_extra_tests = extra_tests or []
self.repeatable = repeatable
self.worker_count = worker_count
def extra_tests(self):
all_deps = OrderedDict()
for direct_dep in self.direct_extra_tests:
if direct_dep in deps:
for indirect_dep in deps[direct_dep].extra_tests():
all_deps[indirect_dep] = True
all_deps[direct_dep] = True
def extra_tests(self):
all_deps = OrderedDict()
for direct_dep in self.direct_extra_tests:
if direct_dep in DEPS:
for indirect_dep in DEPS[direct_dep].extra_tests():
all_deps[indirect_dep] = True
all_deps[direct_dep] = True
return list(all_deps.keys())
return list(all_deps.keys())
deps = {
"multi_cluster_management": TestDeps(
None, ["multi_test_helpers_superuser"], repeatable=False
),
"create_role_propagation": TestDeps(None, ["multi_cluster_management"]),
"single_node_enterprise": TestDeps(None),
"single_node": TestDeps(None),
"single_node_truncate": TestDeps(None),
"multi_extension": TestDeps(None, repeatable=False),
"multi_test_helpers": TestDeps(None),
"multi_insert_select": TestDeps("base_schedule"),
"multi_mx_create_table": TestDeps(
None,
[
"multi_test_helpers_superuser",
"multi_mx_node_metadata",
"multi_cluster_management",
"multi_mx_function_table_reference",
],
),
"background_rebalance_parallel": TestDeps(
None,
[
"multi_test_helpers",
"multi_cluster_management",
],
worker_count=6,
),
"multi_mx_modifying_xacts": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_router_planner": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_copy_data": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_schema_support": TestDeps(None, ["multi_mx_copy_data"]),
"multi_simple_queries": TestDeps("base_schedule"),
}
if not (test_file_name or test_file_path):
print("FATAL: No test given.")
sys.exit(2)
DEPS = {
"multi_cluster_management": TestDeps(
None, ["multi_test_helpers_superuser"], repeatable=False
),
"create_role_propagation": TestDeps(None, ["multi_cluster_management"]),
"single_node_enterprise": TestDeps(None),
"single_node": TestDeps(None),
"single_node_truncate": TestDeps(None),
"multi_extension": TestDeps(None, repeatable=False),
"multi_test_helpers": TestDeps(None),
"multi_insert_select": TestDeps("base_schedule"),
"multi_mx_create_table": TestDeps(
None,
[
"multi_test_helpers_superuser",
"multi_mx_node_metadata",
"multi_cluster_management",
"multi_mx_function_table_reference",
],
),
"background_rebalance_parallel": TestDeps(
None,
[
"multi_test_helpers",
"multi_cluster_management",
],
worker_count=6,
),
"multi_mx_modifying_xacts": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_router_planner": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_copy_data": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_schema_support": TestDeps(None, ["multi_mx_copy_data"]),
"multi_simple_queries": TestDeps("base_schedule"),
}
if test_file_path:
test_file_path = os.path.join(os.getcwd(), args["path"])
if not os.path.isfile(test_file_path):
print(f"ERROR: test file '{test_file_path}' does not exist")
sys.exit(2)
def run_python_test(test_name, args):
"""Runs the test using pytest
test_file_extension = pathlib.Path(test_file_path).suffix
test_file_name = pathlib.Path(test_file_path).stem
if test_file_extension not in (".spec", ".sql", ".py"):
print(
"ERROR: Unrecognized test extension. Valid extensions are: .sql, .spec, and .py"
)
sys.exit(1)
test_schedule = ""
dependencies = []
if test_file_name.startswith("test_"):
run_python_test(test_file_name, args["repeat"])
# find related schedule
for schedule_file_path in sorted(regress_dir.glob("*_schedule")):
for schedule_line in open(schedule_file_path, "r"):
if re.search(r"\b" + test_file_name + r"\b", schedule_line):
test_schedule = pathlib.Path(schedule_file_path).stem
if use_whole_schedule_line:
test_schedule_line = schedule_line
else:
test_schedule_line = f"test: {test_file_name}\n"
break
else:
continue
break
else:
This function never returns as it usese os.execlp to replace the current
process with a new pytest process.
"""
test_path = REGRESS_DIR / "citus_tests" / "test" / f"{test_name}.py"
if not test_path.exists():
raise Exception("Test could not be found in any schedule")
def default_base_schedule(test_schedule):
if "isolation" in test_schedule:
return "base_isolation_schedule"
if "failure" in test_schedule:
return "failure_base_schedule"
if "enterprise" in test_schedule:
return "enterprise_minimal_schedule"
if "split" in test_schedule:
return "minimal_schedule"
if "mx" in test_schedule:
if use_base_schedule:
return "mx_base_schedule"
return "mx_minimal_schedule"
if "operations" in test_schedule:
return "minimal_schedule"
if "after_citus_upgrade" in test_schedule:
print(
f"WARNING: After citus upgrade schedule ({test_schedule}) is not supported."
)
sys.exit(0)
if "citus_upgrade" in test_schedule:
return None
if "pg_upgrade" in test_schedule:
return "minimal_schedule"
if test_schedule in ARBITRARY_SCHEDULE_NAMES:
print(
f"WARNING: Arbitrary config schedule ({test_schedule}) is not supported."
)
sys.exit(0)
if use_base_schedule:
return "base_schedule"
return "minimal_schedule"
# we run the tests with 2 workers by default.
# If we find any dependency which requires more workers, we update the worker count.
def worker_count_for(test_name):
if test_name in deps:
return deps[test_name].worker_count
return 2
test_worker_count = max(worker_count_for(test_file_name), 2)
if test_file_name in deps:
dependencies = deps[test_file_name]
elif schedule_line_is_upgrade_after(test_schedule_line):
dependencies = TestDeps(
default_base_schedule(test_schedule),
[test_file_name.replace("_after", "_before")],
)
else:
dependencies = TestDeps(default_base_schedule(test_schedule))
if "before_" in test_schedule:
dependencies.repeatable = False
# copy base schedule to a temp file and append test_schedule_line
# to be able to run tests in parallel (if test_schedule_line is a parallel group.)
tmp_schedule_path = os.path.join(
regress_dir, f"tmp_schedule_{ random.randint(1, 10000)}"
os.execlp(
"pytest",
"pytest",
"--numprocesses",
"auto",
"--count",
str(args["repeat"]),
str(test_path),
)
# some tests don't need a schedule to run
# e.g tests that are in the first place in their own schedule
if dependencies.schedule:
shutil.copy2(
os.path.join(regress_dir, dependencies.schedule), tmp_schedule_path
)
with open(tmp_schedule_path, "a") as myfile:
for dependency in dependencies.extra_tests():
myfile.write(f"test: {dependency}\n")
test_worker_count = max(worker_count_for(dependency), test_worker_count)
repetition_cnt = args["repeat"]
if repetition_cnt > 1 and not dependencies.repeatable:
repetition_cnt = 1
print(f"WARNING: Cannot repeatably run this test: '{test_file_name}'")
for _ in range(repetition_cnt):
myfile.write(test_schedule_line)
if "upgrade" in test_schedule_line:
try:
run_schedule_with_python(pathlib.Path(tmp_schedule_path).stem)
finally:
# remove temp schedule file
os.remove(tmp_schedule_path)
sys.exit(0)
def run_regress_test(test_name, args):
original_schedule, schedule_line = find_test_schedule_and_line(test_name, args)
dependencies = test_dependencies(test_name, original_schedule, schedule_line, args)
with tmp_schedule(test_name, dependencies, schedule_line, args) as schedule:
if "upgrade" in original_schedule:
run_schedule_with_python(schedule)
else:
run_schedule_with_multiregress(test_name, schedule, dependencies, args)
def run_schedule_with_python(schedule):
bindir = capture("pg_config --bindir").rstrip()
pgxs_path = pathlib.Path(capture("pg_config --pgxs").rstrip())
os.chdir(REGRESS_DIR)
os.environ["PATH"] = str(REGRESS_DIR / "bin") + os.pathsep + os.environ["PATH"]
os.environ["PG_REGRESS_DIFF_OPTS"] = "-dU10 -w"
os.environ["CITUS_OLD_VERSION"] = f"v{MASTER_VERSION}.0"
args = {
"--pgxsdir": str(pgxs_path.parent.parent.parent),
"--bindir": bindir,
}
config = CitusDefaultClusterConfig(args)
common.initialize_temp_dir(config.temp_dir)
common.initialize_citus_cluster(
config.bindir, config.datadir, config.settings, config
)
common.run_pg_regress(
config.bindir, config.pg_srcdir, config.coordinator_port(), schedule
)
def run_schedule_with_multiregress(test_name, schedule, dependencies, args):
worker_count = needed_worker_count(test_name, dependencies)
# find suitable make recipe
if dependencies.schedule == "base_isolation_schedule":
@ -322,15 +201,160 @@ if __name__ == "__main__":
# prepare command to run tests
test_command = (
f"make -C {regress_dir} {make_recipe} "
f"WORKERCOUNT={test_worker_count} "
f"SCHEDULE='{pathlib.Path(tmp_schedule_path).stem}'"
f"make -C {REGRESS_DIR} {make_recipe} "
f"WORKERCOUNT={worker_count} "
f"SCHEDULE='{schedule}'"
)
run(test_command)
def default_base_schedule(test_schedule, args):
if "isolation" in test_schedule:
return "base_isolation_schedule"
if "failure" in test_schedule:
return "failure_base_schedule"
if "enterprise" in test_schedule:
return "enterprise_minimal_schedule"
if "split" in test_schedule:
return "minimal_schedule"
if "mx" in test_schedule:
if args["use_base_schedule"]:
return "mx_base_schedule"
return "mx_minimal_schedule"
if "operations" in test_schedule:
return "minimal_schedule"
if "after_citus_upgrade" in test_schedule:
print(
f"WARNING: After citus upgrade schedule ({test_schedule}) is not supported."
)
sys.exit(0)
if "citus_upgrade" in test_schedule:
return None
if "pg_upgrade" in test_schedule:
return "minimal_schedule"
if test_schedule in ARBITRARY_SCHEDULE_NAMES:
print(f"WARNING: Arbitrary config schedule ({test_schedule}) is not supported.")
sys.exit(0)
if args["use_base_schedule"]:
return "base_schedule"
return "minimal_schedule"
# we run the tests with 2 workers by default.
# If we find any dependency which requires more workers, we update the worker count.
def worker_count_for(test_name):
if test_name in DEPS:
return DEPS[test_name].worker_count
return 2
def get_test_name(args):
if args["test_name"]:
return args["test_name"]
if not args["path"]:
print("FATAL: No test given.")
sys.exit(2)
absolute_test_path = os.path.join(os.getcwd(), args["path"])
if not os.path.isfile(absolute_test_path):
print(f"ERROR: test file '{absolute_test_path}' does not exist")
sys.exit(2)
if pathlib.Path(absolute_test_path).suffix not in (".spec", ".sql", ".py"):
print(
"ERROR: Unrecognized test extension. Valid extensions are: .sql, .spec, and .py"
)
sys.exit(1)
return pathlib.Path(absolute_test_path).stem
def find_test_schedule_and_line(test_name, args):
for schedule_file_path in sorted(REGRESS_DIR.glob("*_schedule")):
for schedule_line in open(schedule_file_path, "r"):
if re.search(r"\b" + test_name + r"\b", schedule_line):
test_schedule = pathlib.Path(schedule_file_path).stem
if args["use_whole_schedule_line"]:
return test_schedule, schedule_line
return test_schedule, f"test: {test_name}\n"
raise Exception("Test could not be found in any schedule")
def test_dependencies(test_name, test_schedule, schedule_line, args):
if test_name in DEPS:
return DEPS[test_name]
if schedule_line_is_upgrade_after(schedule_line):
# upgrade_xxx_after tests always depend on upgrade_xxx_before
return TestDeps(
default_base_schedule(test_schedule, args),
[test_name.replace("_after", "_before")],
)
# before_ tests leave stuff around on purpose for the after tests. So they
# are not repeatable by definition.
if "before_" in test_schedule:
repeatable = False
else:
repeatable = True
return TestDeps(default_base_schedule(test_schedule, args), repeatable=repeatable)
# Returns true if given test_schedule_line is of the form:
# "test: upgrade_ ... _after .."
def schedule_line_is_upgrade_after(test_schedule_line: str) -> bool:
return (
test_schedule_line.startswith("test: upgrade_")
and "_after" in test_schedule_line
)
# run test command n times
@contextmanager
def tmp_schedule(test_name, dependencies, schedule_line, args):
tmp_schedule_path = REGRESS_DIR / f"tmp_schedule_{random.randint(1, 10000)}"
# Prefill the temporary schedule with the base schedule that this test
# depends on. Some tests don't need a base schedule to run though,
# e.g tests that are in the first place in their own schedule
if dependencies.schedule:
shutil.copy2(REGRESS_DIR / dependencies.schedule, tmp_schedule_path)
with open(tmp_schedule_path, "a") as myfile:
# Add any specific dependencies
for dependency in dependencies.extra_tests():
myfile.write(f"test: {dependency}\n")
repetition_cnt = args["repeat"]
if repetition_cnt > 1 and not dependencies.repeatable:
repetition_cnt = 1
print(f"WARNING: Cannot repeatably run this test: '{test_name}'")
for _ in range(repetition_cnt):
myfile.write(schedule_line)
try:
print(f"Executing.. {test_command}")
result = common.run(test_command)
yield tmp_schedule_path.stem
finally:
# remove temp schedule file
os.remove(tmp_schedule_path)
def needed_worker_count(test_name, dependencies):
worker_count = worker_count_for(test_name)
for dependency in dependencies.extra_tests():
worker_count = max(worker_count_for(dependency), worker_count)
if __name__ == "__main__":
main()