mirror of https://github.com/citusdata/citus.git
Test a schedule under different setups
parent
a4944a2102
commit
77a1ee93c8
|
@ -6,6 +6,7 @@
|
|||
/tmp_upgrade/
|
||||
/tmp_citus_upgrade/
|
||||
/tmp_citus_tarballs/
|
||||
/tmp_citus_test/
|
||||
/build/
|
||||
/results/
|
||||
/log/
|
||||
|
|
|
@ -29,6 +29,7 @@ MULTI_REGRESS_OPTS = --inputdir=$(citus_abs_srcdir) $(pg_regress_locale_flags) -
|
|||
|
||||
pg_upgrade_check = $(citus_abs_srcdir)/upgrade/pg_upgrade_test.py
|
||||
citus_upgrade_check = $(citus_abs_srcdir)/upgrade/citus_upgrade_test.py
|
||||
custom_citus_check = $(citus_abs_srcdir)/upgrade/custom_citus.py
|
||||
|
||||
template_isolation_files = $(shell find $(citus_abs_srcdir)/spec/ -name '*.spec')
|
||||
generated_isolation_files = $(patsubst $(citus_abs_srcdir)/spec/%,$(citus_abs_srcdir)/build/specs/%,$(template_isolation_files))
|
||||
|
@ -197,6 +198,9 @@ check-failure-base: all
|
|||
check-pg-upgrade:
|
||||
$(pg_upgrade_check) --old-bindir=$(old-bindir) --new-bindir=$(new-bindir) --pgxsdir=$(pgxsdir)
|
||||
|
||||
check-custom-citus:
|
||||
${custom_citus_check} --bindir=$(bindir) --pgxsdir=$(pgxsdir)
|
||||
|
||||
check-citus-upgrade:
|
||||
$(citus_upgrade_check) \
|
||||
--bindir=$(bindir) \
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
test: custom_citus_test
|
|
@ -0,0 +1,22 @@
|
|||
CREATE TABLE dist(a int, b int);
|
||||
SELECT create_distributed_table('dist', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE table local(a int, b int);
|
||||
INSERT INTO dist SELECT *,* FROM generate_series(1,100);
|
||||
INSERT INTO local SELECT *,* FROM generate_series(1,100);
|
||||
SELECT COUNT(*) FROM dist join local USING (a);
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
100
|
||||
(1 row)
|
||||
|
||||
SELECT COUNT(*) FROM dist join local USING (a) WHERE dist.a =5;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
CREATE TABLE dist(a int, b int);
|
||||
SELECT create_distributed_table('dist', 'a');
|
||||
|
||||
CREATE table local(a int, b int);
|
||||
|
||||
INSERT INTO dist SELECT *,* FROM generate_series(1,100);
|
||||
INSERT INTO local SELECT *,* FROM generate_series(1,100);
|
||||
|
||||
SELECT COUNT(*) FROM dist join local USING (a);
|
||||
SELECT COUNT(*) FROM dist join local USING (a) WHERE dist.a =5;
|
|
@ -37,7 +37,7 @@ def main(config):
|
|||
install_citus(config.pre_tar_path)
|
||||
common.initialize_temp_dir(config.temp_dir)
|
||||
common.initialize_citus_cluster(
|
||||
config.bindir, config.datadir, config.settings)
|
||||
config.bindir, config.datadir, config.settings, config)
|
||||
|
||||
report_initial_version(config)
|
||||
before_upgrade_schedule = get_before_upgrade_schedule(config.mixed_mode)
|
||||
|
|
|
@ -1,9 +1,12 @@
|
|||
from os.path import expanduser
|
||||
import upgrade_common as common
|
||||
|
||||
|
||||
BEFORE_PG_UPGRADE_SCHEDULE = './before_pg_upgrade_schedule'
|
||||
AFTER_PG_UPGRADE_SCHEDULE = './after_pg_upgrade_schedule'
|
||||
|
||||
CUSTOM_CITUS_SCHEDULE = './custom_citus_schedule'
|
||||
|
||||
AFTER_CITUS_UPGRADE_COORD_SCHEDULE = './after_citus_upgrade_coord_schedule'
|
||||
BEFORE_CITUS_UPGRADE_COORD_SCHEDULE = './before_citus_upgrade_coord_schedule'
|
||||
MIXED_BEFORE_CITUS_UPGRADE_SCHEDULE = './mixed_before_citus_upgrade_schedule'
|
||||
|
@ -26,6 +29,7 @@ class CitusUpgradeConfig():
|
|||
self.pre_tar_path = arguments['--citus-pre-tar']
|
||||
self.post_tar_path = arguments['--citus-post-tar']
|
||||
self.pg_srcdir = arguments['--pgxsdir']
|
||||
self.worker_amount = 2
|
||||
self.temp_dir = './tmp_citus_upgrade'
|
||||
self.datadir = self.temp_dir + '/data'
|
||||
self.settings = {
|
||||
|
@ -35,6 +39,84 @@ class CitusUpgradeConfig():
|
|||
}
|
||||
self.mixed_mode = arguments['--mixed']
|
||||
|
||||
|
||||
class CitusBaseClusterConfig():
|
||||
def __init__(self, arguments):
|
||||
self.bindir = arguments['--bindir']
|
||||
self.pg_srcdir = arguments['--pgxsdir']
|
||||
self.temp_dir = './tmp_citus_test'
|
||||
self.worker_amount = 2
|
||||
self.datadir = self.temp_dir + '/data'
|
||||
self.settings = {
|
||||
'shared_preload_libraries': 'citus',
|
||||
'citus.node_conninfo': 'sslmode=prefer',
|
||||
}
|
||||
|
||||
def setup_steps(self):
|
||||
pass
|
||||
|
||||
class CitusDefaultClusterConfig(CitusBaseClusterConfig):
|
||||
pass
|
||||
|
||||
class CitusSingleNodeClusterConfig(CitusBaseClusterConfig):
|
||||
|
||||
def __init__(self, arguments):
|
||||
super().__init__(arguments)
|
||||
self.worker_amount = 0
|
||||
|
||||
|
||||
class CitusSingleNodeSingleShardClusterConfig(CitusBaseClusterConfig):
|
||||
|
||||
def __init__(self, arguments):
|
||||
super().__init__(arguments)
|
||||
self.worker_amount = 0
|
||||
self.new_settings = {
|
||||
'citus.shard_count': 1
|
||||
}
|
||||
self.settings.update(self.new_settings)
|
||||
|
||||
class CitusSingleShardClusterConfig(CitusBaseClusterConfig):
|
||||
|
||||
def __init__(self, arguments):
|
||||
super().__init__(arguments)
|
||||
self.new_settings = {
|
||||
'citus.shard_count': 1
|
||||
}
|
||||
self.settings.update(self.new_settings)
|
||||
|
||||
class CitusMxClusterConfig(CitusBaseClusterConfig):
|
||||
def __init__(self, arguments):
|
||||
super().__init__(arguments)
|
||||
|
||||
def setup_steps(self):
|
||||
common.sync_metadata_to_workers(self.bindir)
|
||||
|
||||
class CitusManyShardsClusterConfig(CitusBaseClusterConfig):
|
||||
def __init__(self, arguments):
|
||||
super().__init__(arguments)
|
||||
self.new_settings = {
|
||||
'citus.shard_count': 500
|
||||
}
|
||||
self.settings.update(self.new_settings)
|
||||
|
||||
class CitusSingleNodeSingleConnectionClusterConfig(CitusBaseClusterConfig):
|
||||
def __init__(self, arguments):
|
||||
super().__init__(arguments)
|
||||
self.new_settings = {
|
||||
'citus.max_adaptive_executor_pool_size': 1
|
||||
}
|
||||
self.settings.update(self.new_settings)
|
||||
|
||||
class CitusSingleNodeSingleSharedPoolSizeClusterConfig(CitusBaseClusterConfig):
|
||||
def __init__(self, arguments):
|
||||
super().__init__(arguments)
|
||||
self.new_settings = {
|
||||
'citus.max_shared_pool_size': 1
|
||||
}
|
||||
self.settings.update(self.new_settings)
|
||||
|
||||
|
||||
|
||||
class PGUpgradeConfig():
|
||||
def __init__(self, arguments):
|
||||
self.old_bindir = arguments['--old-bindir']
|
||||
|
@ -43,6 +125,8 @@ class PGUpgradeConfig():
|
|||
self.temp_dir = './tmp_upgrade'
|
||||
self.old_datadir = self.temp_dir + '/oldData'
|
||||
self.new_datadir = self.temp_dir + '/newData'
|
||||
self.worker_amount = 2
|
||||
|
||||
self.settings = {
|
||||
'shared_preload_libraries': 'citus',
|
||||
'citus.node_conninfo': 'sslmode=prefer'
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
"""custom_citus
|
||||
Usage:
|
||||
custom_citus --bindir=<bindir> --pgxsdir=<pgxsdir>
|
||||
|
||||
Options:
|
||||
--bindir=<bindir> The PostgreSQL executable directory(ex: '~/.pgenv/pgsql-11.3/bin')
|
||||
--pgxsdir=<pgxsdir> Path to the PGXS directory(ex: ~/.pgenv/src/postgresql-11.3)
|
||||
"""
|
||||
|
||||
import upgrade_common as common
|
||||
import atexit
|
||||
from docopt import docopt
|
||||
|
||||
from config import (
|
||||
CitusDefaultClusterConfig, CitusSingleNodeClusterConfig,
|
||||
CitusSingleNodeSingleShardClusterConfig,
|
||||
CitusSingleShardClusterConfig,
|
||||
CitusMxClusterConfig,
|
||||
CitusManyShardsClusterConfig,
|
||||
CitusSingleNodeSingleSharedPoolSizeClusterConfig,
|
||||
CitusSingleNodeSingleConnectionClusterConfig,
|
||||
USER, NODE_PORTS,
|
||||
NODE_NAMES, DBNAME, COORDINATOR_NAME,
|
||||
WORKER_PORTS, CUSTOM_CITUS_SCHEDULE
|
||||
)
|
||||
|
||||
testResults = {}
|
||||
|
||||
def run_for_config(config, name):
|
||||
print ('Running test for: {}'.format(name))
|
||||
common.initialize_temp_dir(config.temp_dir)
|
||||
common.initialize_citus_cluster(config.bindir, config.datadir, config.settings, config)
|
||||
|
||||
exitCode = common.run_pg_regress_without_exit(config.bindir, config.pg_srcdir,
|
||||
NODE_PORTS[COORDINATOR_NAME], CUSTOM_CITUS_SCHEDULE)
|
||||
testResults[name] = "success" if exitCode == 0 else "fail"
|
||||
common.stop_databases(config.bindir, config.datadir)
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
docoptRes = docopt(__doc__)
|
||||
configs = [
|
||||
(CitusDefaultClusterConfig(docoptRes), "default citus cluster"),
|
||||
(CitusSingleNodeClusterConfig(docoptRes), "single node citus cluster"),
|
||||
(CitusSingleNodeSingleShardClusterConfig(docoptRes), "single node single shard cluster"),
|
||||
(CitusSingleShardClusterConfig(docoptRes), "single shard multiple workers cluster"),
|
||||
(CitusMxClusterConfig(docoptRes), "mx citus cluster"),
|
||||
(CitusManyShardsClusterConfig(docoptRes), "citus cluster with many shards"),
|
||||
(CitusSingleNodeSingleSharedPoolSizeClusterConfig(docoptRes), "citus single node single shared pool cluster"),
|
||||
(CitusSingleNodeSingleConnectionClusterConfig(docoptRes), "citus single node single connection cluster"),
|
||||
|
||||
|
||||
]
|
||||
for config, testName in configs:
|
||||
run_for_config(config, testName)
|
||||
|
||||
for testName, testResult in testResults.items():
|
||||
print('{}: {}'.format(testName, testResult))
|
|
@ -60,7 +60,7 @@ def stop_all_databases(old_bindir, new_bindir, old_datadir, new_datadir):
|
|||
|
||||
def main(config):
|
||||
common.initialize_temp_dir(config.temp_dir)
|
||||
common.initialize_citus_cluster(config.old_bindir, config.old_datadir, config.settings)
|
||||
common.initialize_citus_cluster(config.old_bindir, config.old_datadir, config.settings, config)
|
||||
common.run_pg_regress(config.old_bindir, config.pg_srcdir,
|
||||
NODE_PORTS[COORDINATOR_NAME], BEFORE_PG_UPGRADE_SCHEDULE)
|
||||
common.run_pg_regress(config.old_bindir, config.pg_srcdir,
|
||||
|
|
|
@ -6,7 +6,7 @@ import subprocess
|
|||
|
||||
import utils
|
||||
|
||||
from config import NODE_NAMES, NODE_PORTS, COORDINATOR_NAME, USER, WORKER_PORTS, DBNAME
|
||||
from config import NODE_NAMES, NODE_PORTS, COORDINATOR_NAME, USER, WORKER_PORTS, DBNAME, CitusBaseClusterConfig
|
||||
|
||||
|
||||
def initialize_temp_dir(temp_dir):
|
||||
|
@ -63,6 +63,14 @@ def create_citus_extension(pg_path):
|
|||
utils.psql(pg_path, port, "CREATE EXTENSION citus;")
|
||||
|
||||
def run_pg_regress(pg_path, pg_srcdir, port, schedule):
|
||||
should_exit = True
|
||||
_run_pg_regress(pg_path, pg_srcdir, port, schedule, should_exit)
|
||||
|
||||
def run_pg_regress_without_exit(pg_path, pg_srcdir, port, schedule):
|
||||
should_exit = False
|
||||
return _run_pg_regress(pg_path, pg_srcdir, port, schedule, should_exit)
|
||||
|
||||
def _run_pg_regress(pg_path, pg_srcdir, port, schedule, should_exit):
|
||||
command = [
|
||||
os.path.join(pg_srcdir, 'src/test/regress/pg_regress'),
|
||||
'--port', str(port),
|
||||
|
@ -73,11 +81,19 @@ def run_pg_regress(pg_path, pg_srcdir, port, schedule):
|
|||
'--use-existing'
|
||||
]
|
||||
exit_code = subprocess.call(command)
|
||||
subprocess.run('bin/copy_modified', check=True)
|
||||
if exit_code != 0:
|
||||
# subprocess.run('bin/copy_modified', check=True)
|
||||
if should_exit and exit_code != 0:
|
||||
sys.exit(exit_code)
|
||||
return exit_code
|
||||
|
||||
|
||||
def sync_metadata_to_workers(pg_path):
|
||||
for port in WORKER_PORTS:
|
||||
command = "SELECT * from start_metadata_sync_to_node('localhost', {port});".format(
|
||||
port=port)
|
||||
utils.psql(pg_path, NODE_PORTS[COORDINATOR_NAME], command)
|
||||
|
||||
|
||||
def add_workers(pg_path):
|
||||
for port in WORKER_PORTS:
|
||||
command = "SELECT * from master_add_node('localhost', {port});".format(
|
||||
|
@ -97,8 +113,11 @@ def stop_databases(pg_path, rel_data_path):
|
|||
subprocess.call(command)
|
||||
|
||||
|
||||
def initialize_citus_cluster(old_bindir, old_datadir, settings):
|
||||
def initialize_citus_cluster(old_bindir, old_datadir, settings, config):
|
||||
initialize_db_for_cluster(old_bindir, old_datadir, settings)
|
||||
start_databases(old_bindir, old_datadir)
|
||||
create_citus_extension(old_bindir)
|
||||
add_workers(old_bindir)
|
||||
if config.worker_amount > 0:
|
||||
add_workers(old_bindir)
|
||||
if isinstance(config, CitusBaseClusterConfig):
|
||||
config.setup_steps()
|
||||
|
|
Loading…
Reference in New Issue