diff --git a/.circleci/config.yml b/.circleci/config.yml index 995832515..d5eadd94f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,7 +6,7 @@ orbs: parameters: image_suffix: type: string - default: '-v4b2ae97' + default: '-v087ecd7' pg13_version: type: string default: '13.10' @@ -269,6 +269,41 @@ jobs: - coverage: flags: 'test_<< parameters.old_pg_major >>_<< parameters.new_pg_major >>,upgrade' + test-pytest: + description: Runs pytest based tests + parameters: + pg_major: + description: 'postgres major version' + type: integer + image: + description: 'docker image to use as for the tests' + type: string + default: citus/failtester + image_tag: + description: 'docker image tag to use' + type: string + docker: + - image: '<< parameters.image >>:<< parameters.image_tag >><< pipeline.parameters.image_suffix >>' + working_directory: /home/circleci/project + steps: + - checkout + - attach_workspace: + at: . + - install_extension: + pg_major: << parameters.pg_major >> + - configure + - enable_core + - run: + name: 'Run pytest' + command: | + gosu circleci \ + make -C src/test/regress check-pytest + no_output_timeout: 2m + - stack_trace + - coverage: + flags: 'test_<< parameters.pg_major >>,pytest' + + test-arbitrary-configs: description: Runs tests on arbitrary configs parallelism: 6 @@ -558,7 +593,7 @@ jobs: testForDebugging="<< parameters.test >>" if [ -z "$testForDebugging" ]; then - detected_changes=$(git diff origin/main... --name-only --diff-filter=AM | (grep 'src/test/regress/sql/.*.sql\|src/test/regress/spec/.*.spec' || true)) + detected_changes=$(git diff origin/main... --name-only --diff-filter=AM | (grep 'src/test/regress/sql/.*\.sql\|src/test/regress/spec/.*\.spec\|src/test/regress/citus_tests/test/test_.*\.py' || true)) tests=${detected_changes} else tests=$testForDebugging; @@ -861,42 +896,30 @@ workflows: image: citus/failtester make: check-failure - - tap-test-citus: &tap-test-citus-13 - name: 'test-13_tap-recovery' - suite: recovery + - test-pytest: + name: 'test-13_pytest' pg_major: 13 image_tag: '<< pipeline.parameters.pg13_version >>' requires: [build-13] - - tap-test-citus: - <<: *tap-test-citus-13 - name: 'test-13_tap-columnar-freezing' - suite: columnar_freezing - - tap-test-citus: &tap-test-citus-14 - name: 'test-14_tap-recovery' - suite: recovery + - test-pytest: + name: 'test-14_pytest' pg_major: 14 image_tag: '<< pipeline.parameters.pg14_version >>' requires: [build-14] - - tap-test-citus: - <<: *tap-test-citus-14 - name: 'test-14_tap-columnar-freezing' - suite: columnar_freezing - - tap-test-citus: &tap-test-citus-15 - name: 'test-15_tap-recovery' - suite: recovery + - test-pytest: + name: 'test-15_pytest' pg_major: 15 image_tag: '<< pipeline.parameters.pg15_version >>' requires: [build-15] + - tap-test-citus: - <<: *tap-test-citus-15 - name: 'test-15_tap-columnar-freezing' - suite: columnar_freezing - - tap-test-citus: - <<: *tap-test-citus-15 name: 'test-15_tap-cdc' suite: cdc + pg_major: 15 + image_tag: '<< pipeline.parameters.pg15_version >>' + requires: [build-15] - test-arbitrary-configs: name: 'test-13_check-arbitrary-configs' @@ -947,8 +970,6 @@ workflows: - test-13_check-follower-cluster - test-13_check-columnar - test-13_check-columnar-isolation - - test-13_tap-recovery - - test-13_tap-columnar-freezing - test-13_check-failure - test-13_check-enterprise - test-13_check-enterprise-isolation @@ -967,8 +988,6 @@ workflows: - test-14_check-follower-cluster - test-14_check-columnar - test-14_check-columnar-isolation - - test-14_tap-recovery - - test-14_tap-columnar-freezing - test-14_check-failure - test-14_check-enterprise - test-14_check-enterprise-isolation @@ -987,8 +1006,6 @@ workflows: - test-15_check-follower-cluster - test-15_check-columnar - test-15_check-columnar-isolation - - test-15_tap-recovery - - test-15_tap-columnar-freezing - test-15_check-failure - test-15_check-enterprise - test-15_check-enterprise-isolation diff --git a/.editorconfig b/.editorconfig index 8091a1f57..698e94d2b 100644 --- a/.editorconfig +++ b/.editorconfig @@ -17,7 +17,7 @@ trim_trailing_whitespace = true insert_final_newline = unset trim_trailing_whitespace = unset -[*.{sql,sh,py}] +[*.{sql,sh,py,toml}] indent_style = space indent_size = 4 tab_width = 4 diff --git a/pyproject.toml b/pyproject.toml index a470b2d92..997fb3801 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,3 +3,35 @@ profile = 'black' [tool.black] include = '(src/test/regress/bin/diff-filter|\.pyi?|\.ipynb)$' + +[tool.pytest.ini_options] +addopts = [ + "--import-mode=importlib", + "--showlocals", + "--tb=short", +] +pythonpath = 'src/test/regress/citus_tests' +asyncio_mode = 'auto' + +# Make test discovery quicker from the root dir of the repo +testpaths = ['src/test/regress/citus_tests/test'] + +# Make test discovery quicker from other directories than root directory +norecursedirs = [ + '*.egg', + '.*', + 'build', + 'venv', + 'ci', + 'vendor', + 'backend', + 'bin', + 'include', + 'tmp_*', + 'results', + 'expected', + 'sql', + 'spec', + 'data', + '__pycache__', +] diff --git a/src/test/columnar_freezing/Makefile b/src/test/columnar_freezing/Makefile deleted file mode 100644 index cd364cdbc..000000000 --- a/src/test/columnar_freezing/Makefile +++ /dev/null @@ -1,56 +0,0 @@ -#------------------------------------------------------------------------- -# -# Makefile for src/test/columnar_freezing -# -# Test that columnar freezing works. -# -#------------------------------------------------------------------------- - -subdir = src/test/columnar_freezing -top_builddir = ../../.. -include $(top_builddir)/Makefile.global - -# In PG15, Perl test modules have been moved to a new namespace -# new() and get_new_node() methods have been unified to 1 method: new() -# Relevant PG commits 201a76183e2056c2217129e12d68c25ec9c559c8 -# b3b4d8e68ae83f432f43f035c7eb481ef93e1583 -pg_version = $(shell $(PG_CONFIG) --version 2>/dev/null) -pg_whole_version = $(shell echo "$(pg_version)"| sed -e 's/^PostgreSQL \([0-9]*\)\(\.[0-9]*\)\{0,1\}\(.*\)/\1\2/') -pg_major_version = $(shell echo "$(pg_whole_version)"| sed -e 's/^\([0-9]\{2\}\)\(.*\)/\1/') - -# for now, we only have a single test file -# due to the above explanation, we ended up separating the test paths for -# different versions. If you need to add new test files, be careful to add both versions -ifeq ($(pg_major_version),13) - test_path = t_pg13_pg14/*.pl -else ifeq ($(pg_major_version),14) - test_path = t_pg13_pg14/*.pl -else - test_path = t/*.pl -endif - -# copied from pgxs/Makefile.global to use postgres' abs build dir for pg_regress -ifeq ($(enable_tap_tests),yes) - -define citus_prove_installcheck -rm -rf '$(CURDIR)'/tmp_check -$(MKDIR_P) '$(CURDIR)'/tmp_check -cd $(srcdir) && \ -TESTDIR='$(CURDIR)' \ -PATH="$(bindir):$$PATH" \ -PGPORT='6$(DEF_PGPORT)' \ -top_builddir='$(CURDIR)/$(top_builddir)' \ -PG_REGRESS='$(pgxsdir)/src/test/regress/pg_regress' \ -TEMP_CONFIG='$(CURDIR)'/postgresql.conf \ -$(PROVE) $(PG_PROVE_FLAGS) $(PROVE_FLAGS) $(if $(PROVE_TESTS),$(PROVE_TESTS),$(test_path)) -endef - -else -citus_prove_installcheck = @echo "TAP tests not enabled when postgres was compiled" -endif - -installcheck: - $(citus_prove_installcheck) - -clean distclean maintainer-clean: - rm -rf tmp_check diff --git a/src/test/columnar_freezing/postgresql.conf b/src/test/columnar_freezing/postgresql.conf deleted file mode 100644 index 39521cc33..000000000 --- a/src/test/columnar_freezing/postgresql.conf +++ /dev/null @@ -1,7 +0,0 @@ -shared_preload_libraries=citus -shared_preload_libraries='citus' -vacuum_freeze_min_age = 50000 -vacuum_freeze_table_age = 50000 -synchronous_commit = off -fsync = off - diff --git a/src/test/columnar_freezing/t/001_columnar_freezing.pl b/src/test/columnar_freezing/t/001_columnar_freezing.pl deleted file mode 100644 index 01e8346cf..000000000 --- a/src/test/columnar_freezing/t/001_columnar_freezing.pl +++ /dev/null @@ -1,52 +0,0 @@ -# Minimal test testing streaming replication -use strict; -use warnings; -use PostgreSQL::Test::Cluster; -use PostgreSQL::Test::Utils; -use Test::More tests => 2; - -# Initialize single node -my $node_one = PostgreSQL::Test::Cluster->new('node_one'); -$node_one->init(); -$node_one->start; - -# initialize the citus extension -$node_one->safe_psql('postgres', "CREATE EXTENSION citus;"); - -# create columnar table and insert simple data to verify the data survives a crash -$node_one->safe_psql('postgres', " -CREATE TABLE test_row(i int); -INSERT INTO test_row VALUES (1); -CREATE TABLE test_columnar_freeze(i int) USING columnar WITH(autovacuum_enabled=false); -INSERT INTO test_columnar_freeze VALUES (1); -"); - -my $ten_thousand_updates = ""; - -foreach (1..10000) { - $ten_thousand_updates .= "UPDATE test_row SET i = i + 1;\n"; -} - -# 70K updates -foreach (1..7) { - $node_one->safe_psql('postgres', $ten_thousand_updates); -} - -my $result = $node_one->safe_psql('postgres', " -select age(relfrozenxid) < 70000 as was_frozen - from pg_class where relname='test_columnar_freeze'; -"); -print "node one count: $result\n"; -is($result, qq(f), 'columnar table was not frozen'); - -$node_one->safe_psql('postgres', 'VACUUM FREEZE test_columnar_freeze;'); - -$result = $node_one->safe_psql('postgres', " -select age(relfrozenxid) < 70000 as was_frozen - from pg_class where relname='test_columnar_freeze'; -"); -print "node one count: $result\n"; -is($result, qq(t), 'columnar table was frozen'); - -$node_one->stop('fast'); - diff --git a/src/test/columnar_freezing/t_pg13_pg14/001_columnar_freezing_pg13_pg14.pl b/src/test/columnar_freezing/t_pg13_pg14/001_columnar_freezing_pg13_pg14.pl deleted file mode 100644 index 1985da2a5..000000000 --- a/src/test/columnar_freezing/t_pg13_pg14/001_columnar_freezing_pg13_pg14.pl +++ /dev/null @@ -1,52 +0,0 @@ -# Minimal test testing streaming replication -use strict; -use warnings; -use PostgresNode; -use TestLib; -use Test::More tests => 2; - -# Initialize single node -my $node_one = get_new_node('node_one'); -$node_one->init(); -$node_one->start; - -# initialize the citus extension -$node_one->safe_psql('postgres', "CREATE EXTENSION citus;"); - -# create columnar table and insert simple data to verify the data survives a crash -$node_one->safe_psql('postgres', " -CREATE TABLE test_row(i int); -INSERT INTO test_row VALUES (1); -CREATE TABLE test_columnar_freeze(i int) USING columnar WITH(autovacuum_enabled=false); -INSERT INTO test_columnar_freeze VALUES (1); -"); - -my $ten_thousand_updates = ""; - -foreach (1..10000) { - $ten_thousand_updates .= "UPDATE test_row SET i = i + 1;\n"; -} - -# 70K updates -foreach (1..7) { - $node_one->safe_psql('postgres', $ten_thousand_updates); -} - -my $result = $node_one->safe_psql('postgres', " -select age(relfrozenxid) < 70000 as was_frozen - from pg_class where relname='test_columnar_freeze'; -"); -print "node one count: $result\n"; -is($result, qq(f), 'columnar table was not frozen'); - -$node_one->safe_psql('postgres', 'VACUUM FREEZE test_columnar_freeze;'); - -$result = $node_one->safe_psql('postgres', " -select age(relfrozenxid) < 70000 as was_frozen - from pg_class where relname='test_columnar_freeze'; -"); -print "node one count: $result\n"; -is($result, qq(t), 'columnar table was frozen'); - -$node_one->stop('fast'); - diff --git a/src/test/recovery/.gitignore b/src/test/recovery/.gitignore deleted file mode 100644 index 871e943d5..000000000 --- a/src/test/recovery/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -# Generated by test suite -/tmp_check/ diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile deleted file mode 100644 index 03e18fa0c..000000000 --- a/src/test/recovery/Makefile +++ /dev/null @@ -1,58 +0,0 @@ -#------------------------------------------------------------------------- -# -# Makefile for src/test/recovery -# -# Losely based on the makefile found in postgres' src/test/recovery. -# We need to define our own invocation of prove to pass the correct path -# to pg_regress and include citus in the shared preload libraries. -# -#------------------------------------------------------------------------- - -subdir = src/test/recovery -top_builddir = ../../.. -include $(top_builddir)/Makefile.global - -# In PG15, Perl test modules have been moved to a new namespace -# new() and get_new_node() methods have been unified to 1 method: new() -# Relevant PG commits 201a76183e2056c2217129e12d68c25ec9c559c8 -# b3b4d8e68ae83f432f43f035c7eb481ef93e1583 -pg_version = $(shell $(PG_CONFIG) --version 2>/dev/null) -pg_whole_version = $(shell echo "$(pg_version)"| sed -e 's/^PostgreSQL \([0-9]*\)\(\.[0-9]*\)\{0,1\}\(.*\)/\1\2/') -pg_major_version = $(shell echo "$(pg_whole_version)"| sed -e 's/^\([0-9]\{2\}\)\(.*\)/\1/') - -# for now, we only have a single test file -# due to the above explanation, we ended up separating the test paths for -# different versions. If you need to add new test files, be careful to add both versions -ifeq ($(pg_major_version),13) - test_path = t_pg13_pg14/*.pl -else ifeq ($(pg_major_version),14) - test_path = t_pg13_pg14/*.pl -else - test_path = t/*.pl -endif - -# copied from pgxs/Makefile.global to use postgres' abs build dir for pg_regress -ifeq ($(enable_tap_tests),yes) - -define citus_prove_installcheck -rm -rf '$(CURDIR)'/tmp_check -$(MKDIR_P) '$(CURDIR)'/tmp_check -cd $(srcdir) && \ -TESTDIR='$(CURDIR)' \ -PATH="$(bindir):$$PATH" \ -PGPORT='6$(DEF_PGPORT)' \ -top_builddir='$(CURDIR)/$(top_builddir)' \ -PG_REGRESS='$(pgxsdir)/src/test/regress/pg_regress' \ -TEMP_CONFIG='$(CURDIR)'/postgresql.conf \ -$(PROVE) $(PG_PROVE_FLAGS) $(PROVE_FLAGS) $(if $(PROVE_TESTS),$(PROVE_TESTS),$(test_path)) -endef - -else -citus_prove_installcheck = @echo "TAP tests not enabled when postgres was compiled" -endif - -installcheck: - $(citus_prove_installcheck) - -clean distclean maintainer-clean: - rm -rf tmp_check diff --git a/src/test/recovery/postgresql.conf b/src/test/recovery/postgresql.conf deleted file mode 100644 index f9d205b38..000000000 --- a/src/test/recovery/postgresql.conf +++ /dev/null @@ -1 +0,0 @@ -shared_preload_libraries=citus diff --git a/src/test/recovery/t/001_columnar_crash_recovery.pl b/src/test/recovery/t/001_columnar_crash_recovery.pl deleted file mode 100644 index 7dee21dd1..000000000 --- a/src/test/recovery/t/001_columnar_crash_recovery.pl +++ /dev/null @@ -1,98 +0,0 @@ -# Minimal test testing streaming replication -use strict; -use warnings; -use PostgreSQL::Test::Cluster; -use PostgreSQL::Test::Utils; -use Test::More tests => 6; - -# Initialize single node -my $node_one = PostgreSQL::Test::Cluster->new('node_one'); -$node_one->init(); -$node_one->start; - -# initialize the citus extension -$node_one->safe_psql('postgres', "CREATE EXTENSION citus;"); - -# create columnar table and insert simple data to verify the data survives a crash -$node_one->safe_psql('postgres', " -BEGIN; -CREATE TABLE t1 (a int, b text) USING columnar; -INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1002) AS a; -COMMIT; -"); - -# simulate crash -$node_one->stop('immediate'); -$node_one->start; - -my $result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;"); -print "node one count: $result\n"; -is($result, qq(1002), 'columnar recovered data from before crash'); - - -# truncate the table to verify the truncation survives a crash -$node_one->safe_psql('postgres', " -TRUNCATE t1; -"); - -# simulate crash -$node_one->stop('immediate'); -$node_one->start; - -$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;"); -print "node one count: $result\n"; -is($result, qq(0), 'columnar recovered truncation'); - -# test crashing while having an open transaction -$node_one->safe_psql('postgres', " -BEGIN; -INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1003) AS a; -"); - -# simulate crash -$node_one->stop('immediate'); -$node_one->start; - -$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;"); -print "node one count: $result\n"; -is($result, qq(0), 'columnar crash during uncommitted transaction'); - -# test crashing while having a prepared transaction -$node_one->safe_psql('postgres', " -BEGIN; -INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1004) AS a; -PREPARE TRANSACTION 'prepared_xact_crash'; -"); - -# simulate crash -$node_one->stop('immediate'); -$node_one->start; - -$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;"); -print "node one count: $result\n"; -is($result, qq(0), 'columnar crash during prepared transaction (before commit)'); - -$node_one->safe_psql('postgres', " -COMMIT PREPARED 'prepared_xact_crash'; -"); - -$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;"); -print "node one count: $result\n"; -is($result, qq(1004), 'columnar crash during prepared transaction (after commit)'); - -# test crash recovery with copied data -$node_one->safe_psql('postgres', " -\\copy t1 FROM stdin delimiter ',' -1,a -2,b -3,c -\\. -"); - -# simulate crash -$node_one->stop('immediate'); -$node_one->start; - -$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;"); -print "node one count: $result\n"; -is($result, qq(1007), 'columnar crash after copy command'); diff --git a/src/test/recovery/t_pg13_pg14/001_columnar_crash_recovery_pg13_pg14.pl b/src/test/recovery/t_pg13_pg14/001_columnar_crash_recovery_pg13_pg14.pl deleted file mode 100644 index 9ea87835f..000000000 --- a/src/test/recovery/t_pg13_pg14/001_columnar_crash_recovery_pg13_pg14.pl +++ /dev/null @@ -1,98 +0,0 @@ -# Minimal test testing streaming replication -use strict; -use warnings; -use PostgresNode; -use TestLib; -use Test::More tests => 6; - -# Initialize single node -my $node_one = get_new_node('node_one'); -$node_one->init(); -$node_one->start; - -# initialize the citus extension -$node_one->safe_psql('postgres', "CREATE EXTENSION citus;"); - -# create columnar table and insert simple data to verify the data survives a crash -$node_one->safe_psql('postgres', " -BEGIN; -CREATE TABLE t1 (a int, b text) USING columnar; -INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1002) AS a; -COMMIT; -"); - -# simulate crash -$node_one->stop('immediate'); -$node_one->start; - -my $result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;"); -print "node one count: $result\n"; -is($result, qq(1002), 'columnar recovered data from before crash'); - - -# truncate the table to verify the truncation survives a crash -$node_one->safe_psql('postgres', " -TRUNCATE t1; -"); - -# simulate crash -$node_one->stop('immediate'); -$node_one->start; - -$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;"); -print "node one count: $result\n"; -is($result, qq(0), 'columnar recovered truncation'); - -# test crashing while having an open transaction -$node_one->safe_psql('postgres', " -BEGIN; -INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1003) AS a; -"); - -# simulate crash -$node_one->stop('immediate'); -$node_one->start; - -$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;"); -print "node one count: $result\n"; -is($result, qq(0), 'columnar crash during uncommitted transaction'); - -# test crashing while having a prepared transaction -$node_one->safe_psql('postgres', " -BEGIN; -INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1004) AS a; -PREPARE TRANSACTION 'prepared_xact_crash'; -"); - -# simulate crash -$node_one->stop('immediate'); -$node_one->start; - -$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;"); -print "node one count: $result\n"; -is($result, qq(0), 'columnar crash during prepared transaction (before commit)'); - -$node_one->safe_psql('postgres', " -COMMIT PREPARED 'prepared_xact_crash'; -"); - -$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;"); -print "node one count: $result\n"; -is($result, qq(1004), 'columnar crash during prepared transaction (after commit)'); - -# test crash recovery with copied data -$node_one->safe_psql('postgres', " -\\copy t1 FROM stdin delimiter ',' -1,a -2,b -3,c -\\. -"); - -# simulate crash -$node_one->stop('immediate'); -$node_one->start; - -$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;"); -print "node one count: $result\n"; -is($result, qq(1007), 'columnar crash after copy command'); diff --git a/src/test/regress/Makefile b/src/test/regress/Makefile index 2de83ec57..b801f33ff 100644 --- a/src/test/regress/Makefile +++ b/src/test/regress/Makefile @@ -44,7 +44,7 @@ vanilla_diffs_file = $(citus_abs_srcdir)/pg_vanilla_outputs/$(MAJORVERSION)/regr # intermediate, for muscle memory backward compatibility. check: check-full check-enterprise-full # check-full triggers all tests that ought to be run routinely -check-full: check-multi check-multi-mx check-multi-1 check-operations check-follower-cluster check-isolation check-failure check-split check-vanilla check-columnar check-columnar-isolation check-pg-upgrade check-arbitrary-configs check-citus-upgrade check-citus-upgrade-mixed check-citus-upgrade-local check-citus-upgrade-mixed-local +check-full: check-multi check-multi-mx check-multi-1 check-operations check-follower-cluster check-isolation check-failure check-split check-vanilla check-columnar check-columnar-isolation check-pg-upgrade check-arbitrary-configs check-citus-upgrade check-citus-upgrade-mixed check-citus-upgrade-local check-citus-upgrade-mixed-local check-pytest # check-enterprise-full triggers all enterprise specific tests check-enterprise-full: check-enterprise check-enterprise-isolation check-enterprise-failure check-enterprise-isolation-logicalrep-1 check-enterprise-isolation-logicalrep-2 check-enterprise-isolation-logicalrep-3 @@ -259,6 +259,9 @@ check-arbitrary-configs: all check-arbitrary-base: all ${arbitrary_config_check} --bindir=$(bindir) --pgxsdir=$(pgxsdir) --parallel=$(parallel) --configs=$(CONFIGS) --seed=$(seed) --base +check-pytest: + pytest -n auto + check-citus-upgrade: all $(citus_upgrade_check) \ --bindir=$(bindir) \ diff --git a/src/test/regress/Pipfile b/src/test/regress/Pipfile index 16da96f21..663785a3d 100644 --- a/src/test/regress/Pipfile +++ b/src/test/regress/Pipfile @@ -14,6 +14,7 @@ filelock = "*" pytest-asyncio = "*" pytest-timeout = "*" pytest-xdist = "*" +pytest-repeat = "*" [dev-packages] black = "*" diff --git a/src/test/regress/Pipfile.lock b/src/test/regress/Pipfile.lock index 709254d77..4a86e09a8 100644 --- a/src/test/regress/Pipfile.lock +++ b/src/test/regress/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "456a43ce06df947ccbf02db7fcbfd654999acaae25911990d4d74fc04b10c77e" + "sha256": "eb9ca3a7b05e76c7ac60179a1755f89600dfb215e02bf08c258d548df1d96025" }, "pipfile-spec": 6, "requires": { @@ -614,6 +614,14 @@ "index": "pypi", "version": "==0.20.3" }, + "pytest-repeat": { + "hashes": [ + "sha256:4474a7d9e9137f6d8cc8ae297f8c4168d33c56dd740aa78cfffe562557e6b96e", + "sha256:5cd3289745ab3156d43eb9c8e7f7d00a926f3ae5c9cf425bec649b2fe15bad5b" + ], + "index": "pypi", + "version": "==0.9.1" + }, "pytest-timeout": { "hashes": [ "sha256:c07ca07404c612f8abbe22294b23c368e2e5104b521c1790195561f37e1ac3d9", diff --git a/src/test/regress/README.md b/src/test/regress/README.md index ce6b0b5a6..9793a5d1d 100644 --- a/src/test/regress/README.md +++ b/src/test/regress/README.md @@ -106,10 +106,18 @@ Adding a new test file is quite simple: See [`src/test/regress/spec/README.md`](https://github.com/citusdata/citus/blob/master/src/test/regress/spec/README.md) +## Pytest testing + +See [`src/test/regress/citus_tests/test/README.md`](https://github.com/citusdata/citus/blob/master/src/test/regress/citus_tests/test/README.md) + ## Upgrade testing See [`src/test/regress/citus_tests/upgrade/README.md`](https://github.com/citusdata/citus/blob/master/src/test/regress/citus_tests/upgrade/README.md) +## Arbitrary configs testing + +See [`src/test/regress/citus_tests/arbitrary_configs/README.md`](https://github.com/citusdata/citus/blob/master/src/test/regress/citus_tests/arbitrary_configsupgrade/README.md) + ## Failure testing See [`src/test/regress/mitmscripts/README.md`](https://github.com/citusdata/citus/blob/master/src/test/regress/mitmscripts/README.md) diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index 1654905cb..2a4adad44 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -1,13 +1,48 @@ +import asyncio import atexit import concurrent.futures import os +import platform +import random +import re import shutil +import socket import subprocess import sys +import time +import typing +from abc import ABC, abstractmethod +from contextlib import asynccontextmanager, closing, contextmanager +from datetime import datetime, timedelta +from pathlib import Path +from tempfile import gettempdir +import filelock +import psycopg +import psycopg.sql import utils +from psycopg import sql from utils import USER +LINUX = False +MACOS = False +FREEBSD = False +OPENBSD = False + +if platform.system() == "Linux": + LINUX = True +elif platform.system() == "Darwin": + MACOS = True +elif platform.system() == "FreeBSD": + FREEBSD = True +elif platform.system() == "OpenBSD": + OPENBSD = True + +BSD = MACOS or FREEBSD or OPENBSD + +TIMEOUT_DEFAULT = timedelta(seconds=int(os.getenv("PG_TEST_TIMEOUT_DEFAULT", "10"))) +FORCE_PORTS = os.getenv("PG_FORCE_PORTS", "NO").lower() not in ("no", "0", "n", "") + def initialize_temp_dir(temp_dir): if os.path.exists(temp_dir): @@ -315,8 +350,834 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) -def run(command, *args, shell=True, **kwargs): +def run(command, *args, check=True, shell=True, silent=False, **kwargs): """run runs the given command and prints it to stderr""" - eprint(f"+ {command} ") - return subprocess.run(command, *args, check=True, shell=shell, **kwargs) + if not silent: + eprint(f"+ {command} ") + if silent: + kwargs.setdefault("stdout", subprocess.DEVNULL) + return subprocess.run(command, *args, check=check, shell=shell, **kwargs) + + +def sudo(command, *args, shell=True, **kwargs): + """ + A version of run that prefixes the command with sudo when the process is + not already run as root + """ + effective_user_id = os.geteuid() + if effective_user_id == 0: + return run(command, *args, shell=shell, **kwargs) + if shell: + return run(f"sudo {command}", *args, shell=shell, **kwargs) + else: + return run(["sudo", *command]) + + +# this is out of ephemeral port range for many systems hence +# it is a lower chance that it will conflict with "in-use" ports +PORT_LOWER_BOUND = 10200 + +# ephemeral port start on many Linux systems +PORT_UPPER_BOUND = 32768 + +next_port = PORT_LOWER_BOUND + + +def cleanup_test_leftovers(nodes): + """ + Cleaning up test leftovers needs to be done in a specific order, because + some of these leftovers depend on others having been removed. They might + even depend on leftovers on other nodes being removed. So this takes a list + of nodes, so that we can clean up all test leftovers globally in the + correct order. + """ + for node in nodes: + node.cleanup_subscriptions() + + for node in nodes: + node.cleanup_publications() + + for node in nodes: + node.cleanup_logical_replication_slots() + + for node in nodes: + node.cleanup_schemas() + + for node in nodes: + node.cleanup_users() + + +class PortLock: + """PortLock allows you to take a lock an a specific port. + + While a port is locked by one process, other processes using PortLock won't + get the same port. + """ + + def __init__(self): + global next_port + first_port = next_port + while True: + next_port += 1 + if next_port >= PORT_UPPER_BOUND: + next_port = PORT_LOWER_BOUND + + # avoid infinite loop + if first_port == next_port: + raise Exception("Could not find port") + + self.lock = filelock.FileLock(Path(gettempdir()) / f"port-{next_port}.lock") + try: + self.lock.acquire(timeout=0) + except filelock.Timeout: + continue + + if FORCE_PORTS: + self.port = next_port + break + + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + try: + s.bind(("127.0.0.1", next_port)) + self.port = next_port + break + except Exception: + self.lock.release() + continue + + def release(self): + """Call release when you are done with the port. + + This way other processes can use it again. + """ + self.lock.release() + + +class QueryRunner(ABC): + """A subclassable interface class that can be used to run queries. + + This is mostly useful to be generic across differnt types of things that + implement the Postgres interface, such as Postgres, PgBouncer, or a Citus + cluster. + + This implements some helpers send queries in a simpler manner than psycopg + allows by default. + """ + + @abstractmethod + def set_default_connection_options(self, options: dict[str, typing.Any]): + """Sets the default connection options on the given options dictionary + + This is the only method that the class that subclasses QueryRunner + needs to implement. + """ + ... + + def make_conninfo(self, **kwargs) -> str: + self.set_default_connection_options(kwargs) + return psycopg.conninfo.make_conninfo(**kwargs) + + def conn(self, *, autocommit=True, **kwargs): + """Open a psycopg connection to this server""" + self.set_default_connection_options(kwargs) + return psycopg.connect( + autocommit=autocommit, + **kwargs, + ) + + def aconn(self, *, autocommit=True, **kwargs): + """Open an asynchronous psycopg connection to this server""" + self.set_default_connection_options(kwargs) + return psycopg.AsyncConnection.connect( + autocommit=autocommit, + **kwargs, + ) + + @contextmanager + def cur(self, autocommit=True, **kwargs): + """Open an psycopg cursor to this server + + The connection and the cursors automatically close once you leave the + "with" block + """ + with self.conn( + autocommit=autocommit, + **kwargs, + ) as conn: + with conn.cursor() as cur: + yield cur + + @asynccontextmanager + async def acur(self, **kwargs): + """Open an asynchronous psycopg cursor to this server + + The connection and the cursors automatically close once you leave the + "async with" block + """ + async with await self.aconn(**kwargs) as conn: + async with conn.cursor() as cur: + yield cur + + def sql(self, query, params=None, **kwargs): + """Run an SQL query + + This opens a new connection and closes it once the query is done + """ + with self.cur(**kwargs) as cur: + cur.execute(query, params=params) + + def sql_value(self, query, params=None, allow_empty_result=False, **kwargs): + """Run an SQL query that returns a single cell and return this value + + This opens a new connection and closes it once the query is done + """ + with self.cur(**kwargs) as cur: + cur.execute(query, params=params) + result = cur.fetchall() + + if allow_empty_result and len(result) == 0: + return None + + assert len(result) == 1 + assert len(result[0]) == 1 + value = result[0][0] + return value + + def asql(self, query, **kwargs): + """Run an SQL query in asynchronous task + + This opens a new connection and closes it once the query is done + """ + return asyncio.ensure_future(self.asql_coroutine(query, **kwargs)) + + async def asql_coroutine( + self, query, params=None, **kwargs + ) -> typing.Optional[typing.List[typing.Any]]: + async with self.acur(**kwargs) as cur: + await cur.execute(query, params=params) + try: + return await cur.fetchall() + except psycopg.ProgrammingError as e: + if "the last operation didn't produce a result" == str(e): + return None + raise + + def psql(self, query, **kwargs): + """Run an SQL query using psql instead of psycopg + + This opens a new connection and closes it once the query is done + """ + + conninfo = self.make_conninfo(**kwargs) + + run( + ["psql", "-X", f"{conninfo}", "-c", query], + shell=False, + silent=True, + ) + + def poll_query_until(self, query, params=None, expected=True, **kwargs): + """Run query repeatedly until it returns the expected result""" + start = datetime.now() + result = None + + while start + TIMEOUT_DEFAULT > datetime.now(): + result = self.sql_value( + query, params=params, allow_empty_result=True, **kwargs + ) + if result == expected: + return + + time.sleep(0.1) + + raise Exception( + f"Timeout reached while polling query, last result was: {result}" + ) + + @contextmanager + def transaction(self, **kwargs): + with self.cur(**kwargs) as cur: + with cur.connection.transaction(): + yield cur + + def sleep(self, duration=3, **kwargs): + """Run pg_sleep""" + return self.sql(f"select pg_sleep({duration})", **kwargs) + + def asleep(self, duration=3, times=1, sequentially=False, **kwargs): + """Run pg_sleep asynchronously in a task. + + times: + You can create a single task that opens multiple connections, which + run pg_sleep concurrently. The asynchronous task will only complete + once all these pg_sleep calls are finished. + sequentially: + Instead of running all pg_sleep calls spawned by providing + times > 1 concurrently, this will run them sequentially. + """ + return asyncio.ensure_future( + self.asleep_coroutine( + duration=duration, times=times, sequentially=sequentially, **kwargs + ) + ) + + async def asleep_coroutine(self, duration=3, times=1, sequentially=False, **kwargs): + """This is the coroutine that the asleep task runs internally""" + if not sequentially: + await asyncio.gather( + *[ + self.asql(f"select pg_sleep({duration})", **kwargs) + for _ in range(times) + ] + ) + else: + for _ in range(times): + await self.asql(f"select pg_sleep({duration})", **kwargs) + + def test(self, **kwargs): + """Test if you can connect""" + return self.sql("select 1", **kwargs) + + def atest(self, **kwargs): + """Test if you can connect asynchronously""" + return self.asql("select 1", **kwargs) + + def psql_test(self, **kwargs): + """Test if you can connect with psql instead of psycopg""" + return self.psql("select 1", **kwargs) + + def debug(self): + print("Connect manually to:\n ", repr(self.make_conninfo())) + print("Press Enter to continue running the test...") + input() + + def psql_debug(self, **kwargs): + conninfo = self.make_conninfo(**kwargs) + run( + ["psql", f"{conninfo}"], + shell=False, + silent=True, + ) + + +class Postgres(QueryRunner): + """A class that represents a Postgres instance on this machine + + You can query it by using the interface provided by QueryRunner or use many + of the helper methods. + """ + + def __init__(self, pgdata): + self.port_lock = PortLock() + + # These values should almost never be changed after initialization + self.host = "127.0.0.1" + self.port = self.port_lock.port + + # These values can be changed when needed + self.dbname = "postgres" + self.user = "postgres" + self.schema = None + + self.pgdata = pgdata + self.log_path = self.pgdata / "pg.log" + + # Used to track objects that we want to clean up at the end of a test + self.subscriptions = set() + self.publications = set() + self.logical_replication_slots = set() + self.schemas = set() + self.users = set() + + def set_default_connection_options(self, options): + options.setdefault("host", self.host) + options.setdefault("port", self.port) + options.setdefault("dbname", self.dbname) + options.setdefault("user", self.user) + if self.schema is not None: + options.setdefault("options", f"-c search_path={self.schema}") + options.setdefault("connect_timeout", 3) + # needed for Ubuntu 18.04 + options.setdefault("client_encoding", "UTF8") + + def initdb(self): + run( + f"initdb -A trust --nosync --username postgres --pgdata {self.pgdata} --allow-group-access --encoding UTF8 --locale POSIX", + stdout=subprocess.DEVNULL, + ) + + with self.conf_path.open(mode="a") as pgconf: + # Allow connecting over unix sockets + pgconf.write("unix_socket_directories = '/tmp'\n") + + # Useful logs for debugging issues + pgconf.write("log_replication_commands = on\n") + # The following to are also useful for debugging, but quite noisy. + # So better to enable them manually by uncommenting. + # pgconf.write("log_connections = on\n") + # pgconf.write("log_disconnections = on\n") + + # Enable citus + pgconf.write("shared_preload_libraries = 'citus'\n") + + # Allow CREATE SUBSCRIPTION to work + pgconf.write("wal_level = 'logical'\n") + # Faster logical replication status update so tests with logical replication + # run faster + pgconf.write("wal_receiver_status_interval = 1\n") + + # Faster logical replication apply worker launch so tests with logical + # replication run faster. This is used in ApplyLauncherMain in + # src/backend/replication/logical/launcher.c. + pgconf.write("wal_retrieve_retry_interval = '250ms'\n") + + # Make sure there's enough logical replication resources for most + # of our tests + pgconf.write("max_logical_replication_workers = 50\n") + pgconf.write("max_wal_senders = 50\n") + pgconf.write("max_worker_processes = 50\n") + pgconf.write("max_replication_slots = 50\n") + + # We need to make the log go to stderr so that the tests can + # check what is being logged. This should be the default, but + # some packagings change the default configuration. + pgconf.write("log_destination = stderr\n") + # We don't need the logs anywhere else than stderr + pgconf.write("logging_collector = off\n") + + # This makes tests run faster and we don't care about crash safety + # of our test data. + pgconf.write("fsync = false\n") + + # conservative settings to ensure we can run multiple postmasters: + pgconf.write("shared_buffers = 1MB\n") + # limit disk space consumption, too: + pgconf.write("max_wal_size = 128MB\n") + + # don't restart after crashes to make it obvious that a crash + # happened + pgconf.write("restart_after_crash = off\n") + + os.truncate(self.hba_path, 0) + self.ssl_access("all", "trust") + self.nossl_access("all", "trust") + self.commit_hba() + + def init_with_citus(self): + self.initdb() + self.start() + self.sql("CREATE EXTENSION citus") + + # Manually turn on ssl, so that we can safely truncate + # postgresql.auto.conf later. We can only do this after creating the + # citus extension because that creates the self signed certificates. + with self.conf_path.open(mode="a") as pgconf: + pgconf.write("ssl = on\n") + + def pgctl(self, command, **kwargs): + run(f"pg_ctl -w --pgdata {self.pgdata} {command}", **kwargs) + + def apgctl(self, command, **kwargs): + return asyncio.create_subprocess_shell( + f"pg_ctl -w --pgdata {self.pgdata} {command}", **kwargs + ) + + def start(self): + try: + self.pgctl(f'-o "-p {self.port}" -l {self.log_path} start') + except Exception: + print(f"\n\nPG_LOG: {self.pgdata}\n") + with self.log_path.open() as f: + print(f.read()) + raise + + def stop(self, mode="fast"): + self.pgctl(f"-m {mode} stop", check=False) + + def cleanup(self): + self.stop() + self.port_lock.release() + + def restart(self): + self.stop() + self.start() + + def reload(self): + self.pgctl("reload") + # Sadly UNIX signals are asynchronous, so we sleep a bit and hope that + # Postgres actually processed the SIGHUP signal after the sleep. + time.sleep(0.1) + + async def arestart(self): + process = await self.apgctl("-m fast restart") + await process.communicate() + + def nossl_access(self, dbname, auth_type): + """Prepends a local non-SSL access to the HBA file""" + with self.hba_path.open() as pghba: + old_contents = pghba.read() + with self.hba_path.open(mode="w") as pghba: + pghba.write(f"local {dbname} all {auth_type}\n") + pghba.write(f"hostnossl {dbname} all 127.0.0.1/32 {auth_type}\n") + pghba.write(f"hostnossl {dbname} all ::1/128 {auth_type}\n") + pghba.write(old_contents) + + def ssl_access(self, dbname, auth_type): + """Prepends a local SSL access rule to the HBA file""" + with self.hba_path.open() as pghba: + old_contents = pghba.read() + with self.hba_path.open(mode="w") as pghba: + pghba.write(f"hostssl {dbname} all 127.0.0.1/32 {auth_type}\n") + pghba.write(f"hostssl {dbname} all ::1/128 {auth_type}\n") + pghba.write(old_contents) + + @property + def hba_path(self): + return self.pgdata / "pg_hba.conf" + + @property + def conf_path(self): + return self.pgdata / "postgresql.conf" + + def commit_hba(self): + """Mark the current HBA contents as non-resetable by reset_hba""" + with self.hba_path.open() as pghba: + old_contents = pghba.read() + with self.hba_path.open(mode="w") as pghba: + pghba.write("# committed-rules\n") + pghba.write(old_contents) + + def reset_hba(self): + """Remove any HBA rules that were added after the last call to commit_hba""" + with self.hba_path.open() as f: + hba_contents = f.read() + committed = hba_contents[hba_contents.find("# committed-rules\n") :] + with self.hba_path.open("w") as f: + f.write(committed) + + def prepare_reset(self): + """Prepares all changes to reset Postgres settings and objects + + To actually apply the prepared changes a restart might still be needed. + """ + self.reset_hba() + os.truncate(self.pgdata / "postgresql.auto.conf", 0) + + def reset(self): + """Resets any changes to Postgres settings from previous tests""" + self.prepare_reset() + self.restart() + + async def delayed_start(self, delay=1): + """Start Postgres after a delay + + NOTE: The sleep is asynchronous, but while waiting for Postgres to + start the pg_ctl start command will block the event loop. This is + currently acceptable for our usage of this method in the existing + tests and this way it was easiest to implement. However, it seems + totally reasonable to change this behaviour in the future if necessary. + """ + await asyncio.sleep(delay) + self.start() + + def configure(self, *configs): + """Configure specific Postgres settings using ALTER SYSTEM SET + + NOTE: after configuring a call to reload or restart is needed for the + settings to become effective. + """ + for config in configs: + self.sql(f"alter system set {config}") + + def log_handle(self): + """Returns the opened logfile at the current end of the log + + By later calling read on this file you can read the contents that were + written from this moment on. + + IMPORTANT: This handle should be closed once it's not needed anymore + """ + f = self.log_path.open() + f.seek(0, os.SEEK_END) + return f + + @contextmanager + def log_contains(self, re_string, times=None): + """Checks if during this with block the log matches re_string + + re_string: + The regex to search for. + times: + If None, any number of matches is accepted. If a number, only that + specific number of matches is accepted. + """ + with self.log_handle() as f: + yield + content = f.read() + if times is None: + assert re.search(re_string, content) + else: + match_count = len(re.findall(re_string, content)) + assert match_count == times + + def create_user(self, name, args: typing.Optional[psycopg.sql.Composable] = None): + self.users.add(name) + if args is None: + args = sql.SQL("") + self.sql(sql.SQL("CREATE USER {} {}").format(sql.Identifier(name), args)) + + def create_schema(self, name): + self.schemas.add(name) + self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name))) + + def create_publication(self, name: str, args: psycopg.sql.Composable): + self.publications.add(name) + self.sql(sql.SQL("CREATE PUBLICATION {} {}").format(sql.Identifier(name), args)) + + def create_logical_replication_slot( + self, name, plugin, temporary=False, twophase=False + ): + self.logical_replication_slots.add(name) + self.sql( + "SELECT pg_catalog.pg_create_logical_replication_slot(%s,%s,%s,%s)", + (name, plugin, temporary, twophase), + ) + + def create_subscription(self, name: str, args: psycopg.sql.Composable): + self.subscriptions.add(name) + self.sql( + sql.SQL("CREATE SUBSCRIPTION {} {}").format(sql.Identifier(name), args) + ) + + def cleanup_users(self): + for user in self.users: + self.sql(sql.SQL("DROP USER IF EXISTS {}").format(sql.Identifier(user))) + + def cleanup_schemas(self): + for schema in self.schemas: + self.sql( + sql.SQL("DROP SCHEMA IF EXISTS {} CASCADE").format( + sql.Identifier(schema) + ) + ) + + def cleanup_publications(self): + for publication in self.publications: + self.sql( + sql.SQL("DROP PUBLICATION IF EXISTS {}").format( + sql.Identifier(publication) + ) + ) + + def cleanup_logical_replication_slots(self): + for slot in self.logical_replication_slots: + self.sql( + "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = %s", + (slot,), + ) + + def cleanup_subscriptions(self): + for subscription in self.subscriptions: + try: + self.sql( + sql.SQL("ALTER SUBSCRIPTION {} DISABLE").format( + sql.Identifier(subscription) + ) + ) + except psycopg.errors.UndefinedObject: + # Subscription didn't exist already + continue + self.sql( + sql.SQL("ALTER SUBSCRIPTION {} SET (slot_name = NONE)").format( + sql.Identifier(subscription) + ) + ) + self.sql( + sql.SQL("DROP SUBSCRIPTION {}").format(sql.Identifier(subscription)) + ) + + def lsn(self, mode): + """Returns the lsn for the given mode""" + queries = { + "insert": "SELECT pg_current_wal_insert_lsn()", + "flush": "SELECT pg_current_wal_flush_lsn()", + "write": "SELECT pg_current_wal_lsn()", + "receive": "SELECT pg_last_wal_receive_lsn()", + "replay": "SELECT pg_last_wal_replay_lsn()", + } + return self.sql_value(queries[mode]) + + def wait_for_catchup(self, subscription_name, mode="replay", target_lsn=None): + """Waits until the subscription has caught up""" + if target_lsn is None: + target_lsn = self.lsn("write") + + # Before release 12 walreceiver just set the application name to + # "walreceiver" + self.poll_query_until( + sql.SQL( + """ + SELECT {} <= {} AND state = 'streaming' + FROM pg_catalog.pg_stat_replication + WHERE application_name IN ({}, 'walreceiver') + """ + ).format(target_lsn, sql.Identifier(f"{mode}_lsn"), subscription_name) + ) + + @contextmanager + def _enable_firewall(self): + """Enables the firewall for the platform that you are running + + Normally this should not be called directly, and instead drop_traffic + or reject_traffic should be used. + """ + fw_token = None + if BSD: + if MACOS: + command_stderr = sudo( + "pfctl -E", stderr=subprocess.PIPE, text=True + ).stderr + match = re.search(r"^Token : (\d+)", command_stderr, flags=re.MULTILINE) + assert match is not None + fw_token = match.group(1) + sudo( + 'bash -c "' + f"echo 'anchor \\\"port_{self.port}\\\"'" + f' | pfctl -a citus_test -f -"' + ) + try: + yield + finally: + if MACOS: + sudo(f"pfctl -X {fw_token}") + + @contextmanager + def drop_traffic(self): + """Drops all TCP packets to this query runner""" + with self._enable_firewall(): + if LINUX: + sudo( + "iptables --append OUTPUT " + "--protocol tcp " + f"--destination {self.host} " + f"--destination-port {self.port} " + "--jump DROP " + ) + elif BSD: + sudo( + "bash -c '" + f'echo "block drop out proto tcp from any to {self.host} port {self.port}"' + f"| pfctl -a citus_test/port_{self.port} -f -'" + ) + else: + raise Exception("This OS cannot run this test") + try: + yield + finally: + if LINUX: + sudo( + "iptables --delete OUTPUT " + "--protocol tcp " + f"--destination {self.host} " + f"--destination-port {self.port} " + "--jump DROP " + ) + elif BSD: + sudo(f"pfctl -a citus_test/port_{self.port} -F all") + + @contextmanager + def reject_traffic(self): + """Rejects all traffic to this query runner with a TCP RST message""" + with self._enable_firewall(): + if LINUX: + sudo( + "iptables --append OUTPUT " + "--protocol tcp " + f"--destination {self.host} " + f"--destination-port {self.port} " + "--jump REJECT " + "--reject-with tcp-reset" + ) + elif BSD: + sudo( + "bash -c '" + f'echo "block return-rst out out proto tcp from any to {self.host} port {self.port}"' + f"| pfctl -a citus_test/port_{self.port} -f -'" + ) + else: + raise Exception("This OS cannot run this test") + try: + yield + finally: + if LINUX: + sudo( + "iptables --delete OUTPUT " + "--protocol tcp " + f"--destination {self.host} " + f"--destination-port {self.port} " + "--jump REJECT " + "--reject-with tcp-reset" + ) + elif BSD: + sudo(f"pfctl -a citus_test/port_{self.port} -F all") + + +class CitusCluster(QueryRunner): + """A class that represents a Citus cluster on this machine + + The nodes in the cluster can be accessed directly using the coordinator, + workers, and nodes properties. + + If it doesn't matter which of the nodes in the cluster is used to run a + query, then you can use the methods provided by QueryRunner directly on the + cluster. In that case a random node will be chosen to run your query. + """ + + def __init__(self, basedir: Path, worker_count: int): + self.coordinator = Postgres(basedir / "coordinator") + self.workers = [Postgres(basedir / f"worker{i}") for i in range(worker_count)] + self.nodes = [self.coordinator] + self.workers + self._schema = None + self.failed_reset = False + + parallel_run(Postgres.init_with_citus, self.nodes) + with self.coordinator.cur() as cur: + cur.execute( + "SELECT pg_catalog.citus_set_coordinator_host(%s, %s)", + (self.coordinator.host, self.coordinator.port), + ) + for worker in self.workers: + cur.execute( + "SELECT pg_catalog.citus_add_node(%s, %s)", + (worker.host, worker.port), + ) + + def set_default_connection_options(self, options): + random.choice(self.nodes).set_default_connection_options(options) + + @property + def schema(self): + return self._schema + + @schema.setter + def schema(self, value): + self._schema = value + for node in self.nodes: + node.schema = value + + def reset(self): + """Resets any changes to Postgres settings from previous tests""" + parallel_run(Postgres.prepare_reset, self.nodes) + parallel_run(Postgres.restart, self.nodes) + + def cleanup(self): + parallel_run(Postgres.cleanup, self.nodes) + + def debug(self): + """Print information to stdout to help with debugging your cluster""" + print("The nodes in this cluster and their connection strings are:") + for node in self.nodes: + print(f"{node.pgdata}:\n ", repr(node.make_conninfo())) + print("Press Enter to continue running the test...") + input() diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index d686a71c5..b2d33eb56 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -9,7 +9,6 @@ import re import shutil import sys from collections import OrderedDict -from glob import glob from typing import Optional import common @@ -26,6 +25,27 @@ def schedule_line_is_upgrade_after(test_schedule_line: str) -> bool: ) +def run_python_test(test_file_name, repeat): + """Runs the test using pytest + + This function never returns as it usese os.execlp to replace the current + process with a new pytest process. + """ + test_path = regress_dir / "citus_tests" / "test" / f"{test_file_name}.py" + if not test_path.exists(): + raise Exception("Test could not be found in any schedule") + + os.execlp( + "pytest", + "pytest", + "--numprocesses", + "auto", + "--count", + str(repeat), + str(test_path), + ) + + if __name__ == "__main__": args = argparse.ArgumentParser() args.add_argument( @@ -64,7 +84,9 @@ if __name__ == "__main__": args = vars(args.parse_args()) - regress_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) + regress_dir = pathlib.Path( + os.path.dirname(os.path.dirname(os.path.realpath(__file__))) + ) test_file_path = args["path"] test_file_name = args["test_name"] use_base_schedule = args["use_base_schedule"] @@ -138,17 +160,20 @@ if __name__ == "__main__": test_file_extension = pathlib.Path(test_file_path).suffix test_file_name = pathlib.Path(test_file_path).stem - if test_file_extension not in ".spec.sql": + if test_file_extension not in (".spec", ".sql", ".py"): print( - "ERROR: Unrecognized test extension. Valid extensions are: .sql and .spec" + "ERROR: Unrecognized test extension. Valid extensions are: .sql, .spec, and .py" ) sys.exit(1) test_schedule = "" dependencies = [] + if test_file_name.startswith("test_"): + run_python_test(test_file_name, args["repeat"]) + # find related schedule - for schedule_file_path in sorted(glob(os.path.join(regress_dir, "*_schedule"))): + for schedule_file_path in sorted(regress_dir.glob("*_schedule")): for schedule_line in open(schedule_file_path, "r"): if re.search(r"\b" + test_file_name + r"\b", schedule_line): test_schedule = pathlib.Path(schedule_file_path).stem diff --git a/src/test/regress/citus_tests/test/README.md b/src/test/regress/citus_tests/test/README.md new file mode 100644 index 000000000..6aac98e49 --- /dev/null +++ b/src/test/regress/citus_tests/test/README.md @@ -0,0 +1,171 @@ +# Pytest based tests + +## Usage + +Run all tests in parallel: + +```bash +pytest -n auto +``` + +Run all tests sequentially: +```bash +pytest +``` + +Run a specific test: +```bash +pytest test/test_columnar.py::test_recovery +``` + +Run a specific test file in parallel: +```bash +pytest -n auto test/test_columnar.py +``` + +Run any test that contains a certain string in the name: +```bash +pytest -k recovery +``` + +Run tests without it capturing stdout/stderr. This can be useful to see the +logs of a passing test: +```bash +pytest -s test/test_columnar.py::test_recovery +``` + +## General info + +Our other tests work by comparing output of a sequence of SQL commands that's +executed by `psql` to an expected output. If there's a difference between the +expected and actual output, then the tests fails. This works fine for many +cases, but certain types of tests are hard to write and a lot of care usually +has to be taken to make sure output is completely identical in every run. + +The tests in this directory use a different approach and use +[`pytest`][pytest-docs] to run tests that are written in the Python programming +language. This idea is similar to TAP tests that are part of Postgres, with the +important difference that those are written in Perl. + +In the sections below you can find most stuff you'll need to know about +`pytest` to run and write such tests, but if you want more detailed info some +useful references are: +- [A blog with pytest tips and tricks][pytest-tips] +- [The official pytest docs][pytest-docs] + +[pytest-docs]: https://docs.pytest.org/en/stable/ +[pytest-tips]: https://pythontest.com/pytest-tips-tricks/ + +## Adding a new test + +Tests are automatically discovered by `pytest` using a simple but effective +heuristic. In this directory (`src/test/regress/citus_tests/test`) it finds +all of the files that are named `test_{some name}.py`. Those files +are then searched for function names starting with the `test_` prefix. All those +functions are considered tests by `pytest`. + + +### Fixtures aka Dependency Injection aka Teardown/Cleanup + +An important part of tests is that they have some dependencies. The most +important dependency for us is usually a running Citus cluster. These +dependencies are provided by what `pytest` calls [fixtures]. Fixtures are +functions that `yield` a value. Anything before the `yield` is done during setup +and anything after the yield is done during teardown of the test (or whole +session). All our fixtures are defined in `conftest.py`. + + +Using a fixture in a test is very easy, but looks like a lot of magic. All you +have to do is make sure your test function has an argument with the same name as +the name of the fixture. For example: + +```python +def test_some_query(cluster): + cluster.coordinator.sql("SELECT 1") +``` + +If you need a cluster of a specific size you can use the `cluster_factory` +fixture: +```python +def test_with_100_workers(cluster_factory): + cluster = cluster_factory(100) +``` + +If you want more details on how fixtures work a few useful pages of the pytest +docs are: +- [About fixtures][fixtures] +- [How to use fixtures][fixtures-how-to] +- [Fixtures reference][fixtures-reference] + +[fixtures]: https://docs.pytest.org/en/stable/explanation/fixtures.html +[fixtures-how-to]: https://docs.pytest.org/en/stable/how-to/fixtures.html +[fixtures-reference]: https://docs.pytest.org/en/stable/reference/fixtures.html +## Connecting to a test postgres + +Sometimes your test is failing in an unexpected way and the easiest way to find +out why is to connect to Postgres at a certain point interactively. + +### Using `psql_debug` +The easiest way is to use the `psql_debug()` method of your `Cluster` or +`Postgres` instance. +```python +def test_something(cluster): + # working stuff + + cluster.coordinator.psql_debug() + + # unexpectedly failing test +``` + +Then run this test with stdout/stderr capturing disabled (`-s`) and it will show +you an interactive `psql` prompt right at that point in the test: +```bash +$ pytest -s test/test_your_thing.py::test_something + +... + +psql (15.2) +SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, compression: off) +Type "help" for help. + +127.0.0.1 postgres@postgres:10201-20016= +> select 1; + +``` + + +### Debug manually + +Sometimes you need to connect to more than one node though. For that you can use +a `Cluster` its `debug` method instead. + +```python +def test_something(cluster): + # working stuff + + cluster.debug() + + # unexpectedly failing test +``` + + +Then run this test with stdout/stderr capturing disabled (`-s`) and it will show +you the connection string for each of the nodes in the cluster: +```bash +$ PG_FORCE_PORTS=true pytest -s test/test_your_thing.py::test_something +... + +The nodes in this cluster and their connection strings are: +/tmp/pytest-of-jelte/pytest-752/cluster2-0/coordinator: + "host=127.0.0.1 port=10202 dbname=postgres user=postgres options='-c search_path=test_recovery' connect_timeout=3 client_encoding=UTF8" +/tmp/pytest-of-jelte/pytest-752/cluster2-0/worker0: + "host=127.0.0.1 port=10203 dbname=postgres user=postgres options='-c search_path=test_recovery' connect_timeout=3 client_encoding=UTF8" +/tmp/pytest-of-jelte/pytest-752/cluster2-0/worker1: + "host=127.0.0.1 port=10204 dbname=postgres user=postgres options='-c search_path=test_recovery' connect_timeout=3 client_encoding=UTF8" +Press Enter to continue running the test... +``` + +Then in another terminal you can manually connect to as many of them as you want. +Using `PG_FORCE_PORTS` is recommended here, to make sure that the ports will +stay the same across runs of the tests. That way you can reuse the connection +strings that you got from a previous run, if you need to debug again. diff --git a/src/test/regress/citus_tests/test/__init__.py b/src/test/regress/citus_tests/test/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/test/regress/citus_tests/test/conftest.py b/src/test/regress/citus_tests/test/conftest.py new file mode 100644 index 000000000..467f253f6 --- /dev/null +++ b/src/test/regress/citus_tests/test/conftest.py @@ -0,0 +1,95 @@ +import pytest +from common import CitusCluster, Postgres, cleanup_test_leftovers, parallel_run + + +@pytest.fixture(scope="session") +def cluster_factory_session(tmp_path_factory): + """The session level pytest fixture that creates and caches citus clusters + + IMPORTANT: This should not be used directly, but only indirectly through + the cluster_factory fixture. + """ + clusters = {} + + def _make_or_get_cluster(worker_count: int): + if worker_count not in clusters: + clusters[worker_count] = CitusCluster( + tmp_path_factory.mktemp(f"cluster{worker_count}-"), worker_count + ) + return clusters[worker_count] + + yield _make_or_get_cluster + + parallel_run(CitusCluster.cleanup, clusters.values()) + + +@pytest.fixture +def cluster_factory(cluster_factory_session, request): + """The pytest fixture that creates and caches citus clusters + + When the function provided by the factory is called, it returns a cluster + with the given worker count. This cluster is cached across tests, so that + future invocations with the same worker count don't need to create a + cluster again, but can reuse the previously created one. + + To try and make sure that tests don't depend on eachother this tries very + hard to clean up anything that is created during the test. + + It also prints the Postgres logs that were produced during the test to + stdout. Normally these will be hidden, but when a test fails pytest will + show all stdout output produced during the test. Thus showing the Postgres + logs in that case makes it easier to debug. + """ + + log_handles = [] + clusters = [] + nodes = [] + + def _make_or_get_cluster(worker_count: int): + nonlocal log_handles + nonlocal nodes + cluster = cluster_factory_session(worker_count) + if cluster.failed_reset: + cluster.reset() + cluster.failed_reset = False + clusters.append(cluster) + log_handles += [(node, node.log_handle()) for node in cluster.nodes] + nodes += cluster.nodes + + # Create a dedicated schema for the test and use it by default + cluster.coordinator.create_schema(request.node.originalname) + cluster.schema = request.node.originalname + + return cluster + + yield _make_or_get_cluster + + try: + # We clean up test leftovers on all nodes together, instead of per + # cluster. The reason for this is that some subscriptions/publication + # pairs might be between different clusters. And by cleaning them up + # all together, the ordering of the DROPs is easy to make correct. + cleanup_test_leftovers(nodes) + parallel_run(Postgres.prepare_reset, nodes) + parallel_run(Postgres.restart, nodes) + except Exception: + for cluster in clusters: + cluster.failed_reset = True + raise + finally: + for node, log in log_handles: + print(f"\n\nPG_LOG: {node.pgdata}\n") + print(log.read()) + log.close() + + +@pytest.fixture(name="coord") +def coordinator(cluster_factory): + """Sets up a clean single-node Citus cluster for this test""" + yield cluster_factory(0).coordinator + + +@pytest.fixture +def cluster(cluster_factory): + """Sets up a clean 2-worker Citus cluste for this test""" + yield cluster_factory(2) diff --git a/src/test/regress/citus_tests/test/test_columnar.py b/src/test/regress/citus_tests/test/test_columnar.py new file mode 100644 index 000000000..7366cd432 --- /dev/null +++ b/src/test/regress/citus_tests/test/test_columnar.py @@ -0,0 +1,117 @@ +import psycopg +import pytest + + +def test_freezing(coord): + coord.configure("vacuum_freeze_min_age = 50000", "vacuum_freeze_table_age = 50000") + coord.restart() + + # create columnar table and insert simple data to verify the data survives + # a crash + coord.sql("CREATE TABLE test_row(i int)") + coord.sql("INSERT INTO test_row VALUES (1) ") + coord.sql( + "CREATE TABLE test_columnar_freeze(i int) USING columnar WITH(autovacuum_enabled=false)" + ) + coord.sql("INSERT INTO test_columnar_freeze VALUES (1)") + + for _ in range(0, 7): + with coord.cur() as cur: + for _ in range(0, 10_000): + cur.execute("UPDATE test_row SET i = i + 1") + + frozen_age = coord.sql_value( + """ + select age(relfrozenxid) + from pg_class where relname='test_columnar_freeze'; + """ + ) + + assert frozen_age > 70_000, "columnar table was frozen" + coord.sql("VACUUM FREEZE test_columnar_freeze") + + frozen_age = coord.sql_value( + """ + select age(relfrozenxid) + from pg_class where relname='test_columnar_freeze'; + """ + ) + assert frozen_age < 70_000, "columnar table was not frozen" + + +def test_recovery(coord): + # create columnar table and insert simple data to verify the data survives a crash + coord.sql("CREATE TABLE t1 (a int, b text) USING columnar") + coord.sql( + "INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1002) AS a" + ) + + # simulate crash + coord.stop("immediate") + coord.start() + + row_count = coord.sql_value("SELECT count(*) FROM t1") + assert row_count == 1002, "columnar didn't recover data before crash correctly" + + # truncate the table to verify the truncation survives a crash + coord.sql("TRUNCATE t1") + # simulate crash + coord.stop("immediate") + coord.start() + + row_count = coord.sql_value("SELECT count(*) FROM t1") + assert row_count == 0, "columnar didn't recover the truncate correctly" + + # test crashing while having an open transaction + with pytest.raises( + psycopg.OperationalError, + match="server closed the connection unexpectedly|consuming input failed: EOF detected", + ): + with coord.transaction() as cur: + cur.execute( + "INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1003) AS a" + ) + # simulate crash + coord.stop("immediate") + + coord.start() + + row_count = coord.sql_value("SELECT count(*) FROM t1") + assert row_count == 0, "columnar didn't recover uncommited transaction" + + # test crashing while having a prepared transaction + with pytest.raises( + psycopg.OperationalError, + match="server closed the connection unexpectedly|consuming input failed: EOF detected", + ): + with coord.transaction() as cur: + cur.execute( + "INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1004) AS a" + ) + cur.execute("PREPARE TRANSACTION 'prepared_xact_crash'") + # simulate crash + coord.stop("immediate") + + coord.start() + + row_count = coord.sql_value("SELECT count(*) FROM t1") + assert row_count == 0, "columnar didn't recover uncommitted prepared transaction" + + coord.sql("COMMIT PREPARED 'prepared_xact_crash'") + + row_count = coord.sql_value("SELECT count(*) FROM t1") + assert row_count == 1004, "columnar didn't recover committed transaction" + + # test crash recovery with copied data + with coord.cur() as cur: + with cur.copy("COPY t1 FROM STDIN") as copy: + copy.write_row((1, "a")) + copy.write_row((2, "b")) + copy.write_row((3, "c")) + + # simulate crash + coord.stop("immediate") + coord.start() + + row_count = coord.sql_value("SELECT count(*) FROM t1") + assert row_count == 1007, "columnar didn't recover after copy"