diff --git a/src/test/regress/citus_tests/arbitrary_configs/citus_arbitrary_configs.py b/src/test/regress/citus_tests/arbitrary_configs/citus_arbitrary_configs.py index b5f71f945..53c6750f4 100755 --- a/src/test/regress/citus_tests/arbitrary_configs/citus_arbitrary_configs.py +++ b/src/test/regress/citus_tests/arbitrary_configs/citus_arbitrary_configs.py @@ -55,7 +55,6 @@ def run_for_config(config, lock, sql_schedule_name): if config.user == cfg.REGULAR_USER_NAME: common.create_role( config.bindir, - config.coordinator_port(), config.node_name_to_ports.values(), config.user, ) diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index 49cb1bfae..c6e551fa1 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -3,6 +3,7 @@ import shutil import sys import subprocess import atexit +import concurrent.futures import utils from utils import USER, cd @@ -24,9 +25,19 @@ def initialize_temp_dir_if_not_exists(temp_dir): os.chmod(temp_dir, 0o777) +def parallel_run(function, items, *args, **kwargs): + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit(function, item, *args, **kwargs) + for item in items + ] + for future in futures: + future.result() + def initialize_db_for_cluster(pg_path, rel_data_path, settings, node_names): subprocess.run(["mkdir", rel_data_path], check=True) - for node_name in node_names: + + def initialize(node_name): abs_data_path = os.path.abspath(os.path.join(rel_data_path, node_name)) command = [ os.path.join(pg_path, "initdb"), @@ -38,6 +49,8 @@ def initialize_db_for_cluster(pg_path, rel_data_path, settings, node_names): subprocess.run(command, check=True) add_settings(abs_data_path, settings) + parallel_run(initialize, node_names) + def add_settings(abs_data_path, settings): conf_path = os.path.join(abs_data_path, "postgresql.conf") @@ -49,8 +62,8 @@ def add_settings(abs_data_path, settings): conf_file.write(setting) -def create_role(pg_path, port, node_ports, user_name): - for port in node_ports: +def create_role(pg_path, node_ports, user_name): + def create(port): command = "SELECT worker_create_or_alter_role('{}', 'CREATE ROLE {} WITH LOGIN CREATEROLE CREATEDB;', NULL)".format( user_name, user_name ) @@ -58,6 +71,8 @@ def create_role(pg_path, port, node_ports, user_name): command = "GRANT CREATE ON DATABASE postgres to {}".format(user_name) utils.psql(pg_path, port, command) + parallel_run(create, node_ports) + def coordinator_should_haveshards(pg_path, port): command = "SELECT citus_set_node_property('localhost', {}, 'shouldhaveshards', true)".format( @@ -67,7 +82,7 @@ def coordinator_should_haveshards(pg_path, port): def start_databases(pg_path, rel_data_path, node_name_to_ports, logfile_prefix, env_variables): - for node_name in node_name_to_ports.keys(): + def start(node_name): abs_data_path = os.path.abspath(os.path.join(rel_data_path, node_name)) node_port = node_name_to_ports[node_name] command = [ @@ -89,6 +104,11 @@ def start_databases(pg_path, rel_data_path, node_name_to_ports, logfile_prefix, subprocess.run(command, check=True) + parallel_run(start, node_name_to_ports.keys()) + + # We don't want parallel shutdown here because that will fail when it's + # tried in this atexit call with an error like: + # cannot schedule new futures after interpreter shutdown atexit.register( stop_databases, pg_path, @@ -96,13 +116,16 @@ def start_databases(pg_path, rel_data_path, node_name_to_ports, logfile_prefix, node_name_to_ports, logfile_prefix, no_output=True, + parallel=False, ) def create_citus_extension(pg_path, node_ports): - for port in node_ports: + def create(port): utils.psql(pg_path, port, "CREATE EXTENSION citus;") + parallel_run(create, node_ports) + def run_pg_regress(pg_path, pg_srcdir, port, schedule): should_exit = True @@ -215,9 +238,9 @@ def logfile_name(logfile_prefix, node_name): def stop_databases( - pg_path, rel_data_path, node_name_to_ports, logfile_prefix, no_output=False + pg_path, rel_data_path, node_name_to_ports, logfile_prefix, no_output=False, parallel=True ): - for node_name in node_name_to_ports.keys(): + def stop(node_name): abs_data_path = os.path.abspath(os.path.join(rel_data_path, node_name)) node_port = node_name_to_ports[node_name] command = [ @@ -239,6 +262,12 @@ def stop_databases( else: subprocess.call(command) + if parallel: + parallel_run(stop, node_name_to_ports.keys()) + else: + for node_name in node_name_to_ports.keys(): + stop(node_name) + def initialize_citus_cluster(bindir, datadir, settings, config): # In case there was a leftover from previous runs, stop the databases