mirror of https://github.com/citusdata/citus.git
Parallelize cluster setup in arbitrary config tests (#5738)
Cluster setup time is significant in arbitrary configs. We can parallelize this a bit more. Runtime of the following command decreases from ~25 seconds to ~22 seconds on my machine with this change: ``` make -C src/test/regress/ check-arbitrary-base CONFIGS=CitusDefaultClusterConfig EXTRA_TESTS=prepared_statements_1 ``` Currently we can only run different configs in parallel. However, when working on a feature or trying to fix a bug this is not important. In those cases you simply want to run a single test file on a single config. And you want to run that every time you made a change to the code that you think fixes the issue. This PR allows parallelising running of bash commands. So `initdb` and `pg_ctl start` is run in parallel for all nodes in the cluster. Instead of one waiting for the other. When you run the above command nothing is being run in parallel. After this PR, cluster setup is being run in parallel.velioglu/unmark_objects
parent
5063257252
commit
41c6393e82
|
@ -55,7 +55,6 @@ def run_for_config(config, lock, sql_schedule_name):
|
|||
if config.user == cfg.REGULAR_USER_NAME:
|
||||
common.create_role(
|
||||
config.bindir,
|
||||
config.coordinator_port(),
|
||||
config.node_name_to_ports.values(),
|
||||
config.user,
|
||||
)
|
||||
|
|
|
@ -3,6 +3,7 @@ import shutil
|
|||
import sys
|
||||
import subprocess
|
||||
import atexit
|
||||
import concurrent.futures
|
||||
|
||||
import utils
|
||||
from utils import USER, cd
|
||||
|
@ -24,9 +25,19 @@ def initialize_temp_dir_if_not_exists(temp_dir):
|
|||
os.chmod(temp_dir, 0o777)
|
||||
|
||||
|
||||
def parallel_run(function, items, *args, **kwargs):
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
futures = [
|
||||
executor.submit(function, item, *args, **kwargs)
|
||||
for item in items
|
||||
]
|
||||
for future in futures:
|
||||
future.result()
|
||||
|
||||
def initialize_db_for_cluster(pg_path, rel_data_path, settings, node_names):
|
||||
subprocess.run(["mkdir", rel_data_path], check=True)
|
||||
for node_name in node_names:
|
||||
|
||||
def initialize(node_name):
|
||||
abs_data_path = os.path.abspath(os.path.join(rel_data_path, node_name))
|
||||
command = [
|
||||
os.path.join(pg_path, "initdb"),
|
||||
|
@ -38,6 +49,8 @@ def initialize_db_for_cluster(pg_path, rel_data_path, settings, node_names):
|
|||
subprocess.run(command, check=True)
|
||||
add_settings(abs_data_path, settings)
|
||||
|
||||
parallel_run(initialize, node_names)
|
||||
|
||||
|
||||
def add_settings(abs_data_path, settings):
|
||||
conf_path = os.path.join(abs_data_path, "postgresql.conf")
|
||||
|
@ -49,8 +62,8 @@ def add_settings(abs_data_path, settings):
|
|||
conf_file.write(setting)
|
||||
|
||||
|
||||
def create_role(pg_path, port, node_ports, user_name):
|
||||
for port in node_ports:
|
||||
def create_role(pg_path, node_ports, user_name):
|
||||
def create(port):
|
||||
command = "SELECT worker_create_or_alter_role('{}', 'CREATE ROLE {} WITH LOGIN CREATEROLE CREATEDB;', NULL)".format(
|
||||
user_name, user_name
|
||||
)
|
||||
|
@ -58,6 +71,8 @@ def create_role(pg_path, port, node_ports, user_name):
|
|||
command = "GRANT CREATE ON DATABASE postgres to {}".format(user_name)
|
||||
utils.psql(pg_path, port, command)
|
||||
|
||||
parallel_run(create, node_ports)
|
||||
|
||||
|
||||
def coordinator_should_haveshards(pg_path, port):
|
||||
command = "SELECT citus_set_node_property('localhost', {}, 'shouldhaveshards', true)".format(
|
||||
|
@ -67,7 +82,7 @@ def coordinator_should_haveshards(pg_path, port):
|
|||
|
||||
|
||||
def start_databases(pg_path, rel_data_path, node_name_to_ports, logfile_prefix, env_variables):
|
||||
for node_name in node_name_to_ports.keys():
|
||||
def start(node_name):
|
||||
abs_data_path = os.path.abspath(os.path.join(rel_data_path, node_name))
|
||||
node_port = node_name_to_ports[node_name]
|
||||
command = [
|
||||
|
@ -89,6 +104,11 @@ def start_databases(pg_path, rel_data_path, node_name_to_ports, logfile_prefix,
|
|||
|
||||
subprocess.run(command, check=True)
|
||||
|
||||
parallel_run(start, node_name_to_ports.keys())
|
||||
|
||||
# We don't want parallel shutdown here because that will fail when it's
|
||||
# tried in this atexit call with an error like:
|
||||
# cannot schedule new futures after interpreter shutdown
|
||||
atexit.register(
|
||||
stop_databases,
|
||||
pg_path,
|
||||
|
@ -96,13 +116,16 @@ def start_databases(pg_path, rel_data_path, node_name_to_ports, logfile_prefix,
|
|||
node_name_to_ports,
|
||||
logfile_prefix,
|
||||
no_output=True,
|
||||
parallel=False,
|
||||
)
|
||||
|
||||
|
||||
def create_citus_extension(pg_path, node_ports):
|
||||
for port in node_ports:
|
||||
def create(port):
|
||||
utils.psql(pg_path, port, "CREATE EXTENSION citus;")
|
||||
|
||||
parallel_run(create, node_ports)
|
||||
|
||||
|
||||
def run_pg_regress(pg_path, pg_srcdir, port, schedule):
|
||||
should_exit = True
|
||||
|
@ -215,9 +238,9 @@ def logfile_name(logfile_prefix, node_name):
|
|||
|
||||
|
||||
def stop_databases(
|
||||
pg_path, rel_data_path, node_name_to_ports, logfile_prefix, no_output=False
|
||||
pg_path, rel_data_path, node_name_to_ports, logfile_prefix, no_output=False, parallel=True
|
||||
):
|
||||
for node_name in node_name_to_ports.keys():
|
||||
def stop(node_name):
|
||||
abs_data_path = os.path.abspath(os.path.join(rel_data_path, node_name))
|
||||
node_port = node_name_to_ports[node_name]
|
||||
command = [
|
||||
|
@ -239,6 +262,12 @@ def stop_databases(
|
|||
else:
|
||||
subprocess.call(command)
|
||||
|
||||
if parallel:
|
||||
parallel_run(stop, node_name_to_ports.keys())
|
||||
else:
|
||||
for node_name in node_name_to_ports.keys():
|
||||
stop(node_name)
|
||||
|
||||
|
||||
def initialize_citus_cluster(bindir, datadir, settings, config):
|
||||
# In case there was a leftover from previous runs, stop the databases
|
||||
|
|
Loading…
Reference in New Issue