Parallelize cluster setup in arbitrary config tests

Cluster setup time is significant in arbitrary configs. We can
parallelize this a bit more.

Runtime of the following command decreases from ~25 seconds to ~22
seconds on my machine with this change:

```
make -C src/test/regress/ check-arbitrary-base CONFIGS=CitusDefaultClusterConfig EXTRA_TESTS=prepared_statements_1
```
all-test-improvements
Jelte Fennema 2022-02-23 13:06:28 +01:00
parent 242c4a3feb
commit 3bc953404e
1 changed files with 36 additions and 7 deletions

View File

@ -3,6 +3,7 @@ import shutil
import sys import sys
import subprocess import subprocess
import atexit import atexit
import concurrent.futures
import utils import utils
from utils import USER, cd from utils import USER, cd
@ -24,9 +25,19 @@ def initialize_temp_dir_if_not_exists(temp_dir):
os.chmod(temp_dir, 0o777) os.chmod(temp_dir, 0o777)
def parallel_run(function, items, *args, **kwargs):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(function, item, *args, **kwargs)
for item in items
]
for future in futures:
future.result()
def initialize_db_for_cluster(pg_path, rel_data_path, settings, node_names): def initialize_db_for_cluster(pg_path, rel_data_path, settings, node_names):
subprocess.run(["mkdir", rel_data_path], check=True) subprocess.run(["mkdir", rel_data_path], check=True)
for node_name in node_names:
def initialize(node_name):
abs_data_path = os.path.abspath(os.path.join(rel_data_path, node_name)) abs_data_path = os.path.abspath(os.path.join(rel_data_path, node_name))
command = [ command = [
os.path.join(pg_path, "initdb"), os.path.join(pg_path, "initdb"),
@ -38,6 +49,8 @@ def initialize_db_for_cluster(pg_path, rel_data_path, settings, node_names):
subprocess.run(command, check=True) subprocess.run(command, check=True)
add_settings(abs_data_path, settings) add_settings(abs_data_path, settings)
parallel_run(initialize, node_names)
def add_settings(abs_data_path, settings): def add_settings(abs_data_path, settings):
conf_path = os.path.join(abs_data_path, "postgresql.conf") conf_path = os.path.join(abs_data_path, "postgresql.conf")
@ -49,8 +62,8 @@ def add_settings(abs_data_path, settings):
conf_file.write(setting) conf_file.write(setting)
def create_role(pg_path, port, node_ports, user_name): def create_role(pg_path, coordinator_port, node_ports, user_name):
for port in node_ports: def create(port):
command = "SELECT worker_create_or_alter_role('{}', 'CREATE ROLE {} WITH LOGIN CREATEROLE CREATEDB;', NULL)".format( command = "SELECT worker_create_or_alter_role('{}', 'CREATE ROLE {} WITH LOGIN CREATEROLE CREATEDB;', NULL)".format(
user_name, user_name user_name, user_name
) )
@ -58,6 +71,8 @@ def create_role(pg_path, port, node_ports, user_name):
command = "GRANT CREATE ON DATABASE postgres to {}".format(user_name) command = "GRANT CREATE ON DATABASE postgres to {}".format(user_name)
utils.psql(pg_path, port, command) utils.psql(pg_path, port, command)
parallel_run(create, node_ports)
def coordinator_should_haveshards(pg_path, port): def coordinator_should_haveshards(pg_path, port):
command = "SELECT citus_set_node_property('localhost', {}, 'shouldhaveshards', true)".format( command = "SELECT citus_set_node_property('localhost', {}, 'shouldhaveshards', true)".format(
@ -67,7 +82,7 @@ def coordinator_should_haveshards(pg_path, port):
def start_databases(pg_path, rel_data_path, node_name_to_ports, logfile_prefix, env_variables): def start_databases(pg_path, rel_data_path, node_name_to_ports, logfile_prefix, env_variables):
for node_name in node_name_to_ports.keys(): def start(node_name):
abs_data_path = os.path.abspath(os.path.join(rel_data_path, node_name)) abs_data_path = os.path.abspath(os.path.join(rel_data_path, node_name))
node_port = node_name_to_ports[node_name] node_port = node_name_to_ports[node_name]
command = [ command = [
@ -89,6 +104,11 @@ def start_databases(pg_path, rel_data_path, node_name_to_ports, logfile_prefix,
subprocess.run(command, check=True) subprocess.run(command, check=True)
parallel_run(start, node_name_to_ports.keys())
# We don't want parallel shutdown, here because that will fail when it's
# tried in this atexit call with an error like:
# cannot schedule new futures after interpreter shutdown
atexit.register( atexit.register(
stop_databases, stop_databases,
pg_path, pg_path,
@ -96,13 +116,16 @@ def start_databases(pg_path, rel_data_path, node_name_to_ports, logfile_prefix,
node_name_to_ports, node_name_to_ports,
logfile_prefix, logfile_prefix,
no_output=True, no_output=True,
parallel=False,
) )
def create_citus_extension(pg_path, node_ports): def create_citus_extension(pg_path, node_ports):
for port in node_ports: def create(port):
utils.psql(pg_path, port, "CREATE EXTENSION citus;") utils.psql(pg_path, port, "CREATE EXTENSION citus;")
parallel_run(create, node_ports)
def run_pg_regress(pg_path, pg_srcdir, port, schedule): def run_pg_regress(pg_path, pg_srcdir, port, schedule):
should_exit = True should_exit = True
@ -215,9 +238,9 @@ def logfile_name(logfile_prefix, node_name):
def stop_databases( def stop_databases(
pg_path, rel_data_path, node_name_to_ports, logfile_prefix, no_output=False pg_path, rel_data_path, node_name_to_ports, logfile_prefix, no_output=False, parallel=True
): ):
for node_name in node_name_to_ports.keys(): def stop(node_name):
abs_data_path = os.path.abspath(os.path.join(rel_data_path, node_name)) abs_data_path = os.path.abspath(os.path.join(rel_data_path, node_name))
node_port = node_name_to_ports[node_name] node_port = node_name_to_ports[node_name]
command = [ command = [
@ -239,6 +262,12 @@ def stop_databases(
else: else:
subprocess.call(command) subprocess.call(command)
if parallel:
parallel_run(stop, node_name_to_ports.keys())
else:
for node_name in node_name_to_ports.keys():
stop(node_name)
def initialize_citus_cluster(bindir, datadir, settings, config): def initialize_citus_cluster(bindir, datadir, settings, config):
# In case there was a leftover from previous runs, stop the databases # In case there was a leftover from previous runs, stop the databases