Convert columnar tap tests to pytest (#6720)

Having as little Perl as possible in our repo seems a worthy goal. Sadly
Postgres its Perl based TAP infrastructure was the only way in which we
could
run tests that were hard to do using only SQL commands. This change adds
infrastructure to run such "application style tests" using python and
converts all our existing Perl TAP tests to this new infrastructure.

Some of the helper functions that are added in this PR are currently
unused. Most of these will be used by the CDC PR that depends on this.
Some others are there because they were needed by the PgBouncer test
framework that this is based on, and the functions seemed useful enough
to citus testing to keep.

The main features of the test suite are:
1. Application style tests using a programming language that our
developers know how to write.
2. Caching of Citus clusters in-between tests using the ["fixture"
pattern][fixture] from `pytest` to achieve speedy tests. To make this
work in practice any changes made during a test are automatically
undone. Schemas, replication slots, subscriptions, publications are
dropped at the end of each test. And any changes made by `ALTER SYSTEM`
or manually editing of `pg_hba.conf` are undone too.
3. Automatic parallel execution of tests using the `-n auto` flag that's
added by `pytest-xdist`. This improved the speed of tests greatly with
the similar test framework I created for PgBouncer. Right now it doesn't
help much yet though, since this PR only adds two tests (one of which
takes ~10 times longer than the other).

Possible future improvements are:
1. Clean up even more things at the end of each test (e.g. users that
were created). These are fairly easy to add, but I have not done so yet
since they were not needed yet for this PR or the CDC PR. So I would not
be able to test the cleanup easily.
2. Support for query block detection similar to what we can now do using
isolation tests.

[fixture]: https://docs.pytest.org/en/6.2.x/fixture.html
pull/6813/head
Jelte Fennema 2023-03-31 12:25:19 +02:00 committed by GitHub
parent 01ea5f58a9
commit 7b60cdd13b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1379 additions and 465 deletions

View File

@ -6,7 +6,7 @@ orbs:
parameters:
image_suffix:
type: string
default: '-v4b2ae97'
default: '-v087ecd7'
pg13_version:
type: string
default: '13.10'
@ -269,6 +269,41 @@ jobs:
- coverage:
flags: 'test_<< parameters.old_pg_major >>_<< parameters.new_pg_major >>,upgrade'
test-pytest:
description: Runs pytest based tests
parameters:
pg_major:
description: 'postgres major version'
type: integer
image:
description: 'docker image to use as for the tests'
type: string
default: citus/failtester
image_tag:
description: 'docker image tag to use'
type: string
docker:
- image: '<< parameters.image >>:<< parameters.image_tag >><< pipeline.parameters.image_suffix >>'
working_directory: /home/circleci/project
steps:
- checkout
- attach_workspace:
at: .
- install_extension:
pg_major: << parameters.pg_major >>
- configure
- enable_core
- run:
name: 'Run pytest'
command: |
gosu circleci \
make -C src/test/regress check-pytest
no_output_timeout: 2m
- stack_trace
- coverage:
flags: 'test_<< parameters.pg_major >>,pytest'
test-arbitrary-configs:
description: Runs tests on arbitrary configs
parallelism: 6
@ -558,7 +593,7 @@ jobs:
testForDebugging="<< parameters.test >>"
if [ -z "$testForDebugging" ]; then
detected_changes=$(git diff origin/main... --name-only --diff-filter=AM | (grep 'src/test/regress/sql/.*.sql\|src/test/regress/spec/.*.spec' || true))
detected_changes=$(git diff origin/main... --name-only --diff-filter=AM | (grep 'src/test/regress/sql/.*\.sql\|src/test/regress/spec/.*\.spec\|src/test/regress/citus_tests/test/test_.*\.py' || true))
tests=${detected_changes}
else
tests=$testForDebugging;
@ -861,42 +896,30 @@ workflows:
image: citus/failtester
make: check-failure
- tap-test-citus: &tap-test-citus-13
name: 'test-13_tap-recovery'
suite: recovery
- test-pytest:
name: 'test-13_pytest'
pg_major: 13
image_tag: '<< pipeline.parameters.pg13_version >>'
requires: [build-13]
- tap-test-citus:
<<: *tap-test-citus-13
name: 'test-13_tap-columnar-freezing'
suite: columnar_freezing
- tap-test-citus: &tap-test-citus-14
name: 'test-14_tap-recovery'
suite: recovery
- test-pytest:
name: 'test-14_pytest'
pg_major: 14
image_tag: '<< pipeline.parameters.pg14_version >>'
requires: [build-14]
- tap-test-citus:
<<: *tap-test-citus-14
name: 'test-14_tap-columnar-freezing'
suite: columnar_freezing
- tap-test-citus: &tap-test-citus-15
name: 'test-15_tap-recovery'
suite: recovery
- test-pytest:
name: 'test-15_pytest'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
requires: [build-15]
- tap-test-citus:
<<: *tap-test-citus-15
name: 'test-15_tap-columnar-freezing'
suite: columnar_freezing
- tap-test-citus:
<<: *tap-test-citus-15
name: 'test-15_tap-cdc'
suite: cdc
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
requires: [build-15]
- test-arbitrary-configs:
name: 'test-13_check-arbitrary-configs'
@ -947,8 +970,6 @@ workflows:
- test-13_check-follower-cluster
- test-13_check-columnar
- test-13_check-columnar-isolation
- test-13_tap-recovery
- test-13_tap-columnar-freezing
- test-13_check-failure
- test-13_check-enterprise
- test-13_check-enterprise-isolation
@ -967,8 +988,6 @@ workflows:
- test-14_check-follower-cluster
- test-14_check-columnar
- test-14_check-columnar-isolation
- test-14_tap-recovery
- test-14_tap-columnar-freezing
- test-14_check-failure
- test-14_check-enterprise
- test-14_check-enterprise-isolation
@ -987,8 +1006,6 @@ workflows:
- test-15_check-follower-cluster
- test-15_check-columnar
- test-15_check-columnar-isolation
- test-15_tap-recovery
- test-15_tap-columnar-freezing
- test-15_check-failure
- test-15_check-enterprise
- test-15_check-enterprise-isolation

View File

@ -17,7 +17,7 @@ trim_trailing_whitespace = true
insert_final_newline = unset
trim_trailing_whitespace = unset
[*.{sql,sh,py}]
[*.{sql,sh,py,toml}]
indent_style = space
indent_size = 4
tab_width = 4

View File

@ -3,3 +3,35 @@ profile = 'black'
[tool.black]
include = '(src/test/regress/bin/diff-filter|\.pyi?|\.ipynb)$'
[tool.pytest.ini_options]
addopts = [
"--import-mode=importlib",
"--showlocals",
"--tb=short",
]
pythonpath = 'src/test/regress/citus_tests'
asyncio_mode = 'auto'
# Make test discovery quicker from the root dir of the repo
testpaths = ['src/test/regress/citus_tests/test']
# Make test discovery quicker from other directories than root directory
norecursedirs = [
'*.egg',
'.*',
'build',
'venv',
'ci',
'vendor',
'backend',
'bin',
'include',
'tmp_*',
'results',
'expected',
'sql',
'spec',
'data',
'__pycache__',
]

View File

@ -1,56 +0,0 @@
#-------------------------------------------------------------------------
#
# Makefile for src/test/columnar_freezing
#
# Test that columnar freezing works.
#
#-------------------------------------------------------------------------
subdir = src/test/columnar_freezing
top_builddir = ../../..
include $(top_builddir)/Makefile.global
# In PG15, Perl test modules have been moved to a new namespace
# new() and get_new_node() methods have been unified to 1 method: new()
# Relevant PG commits 201a76183e2056c2217129e12d68c25ec9c559c8
# b3b4d8e68ae83f432f43f035c7eb481ef93e1583
pg_version = $(shell $(PG_CONFIG) --version 2>/dev/null)
pg_whole_version = $(shell echo "$(pg_version)"| sed -e 's/^PostgreSQL \([0-9]*\)\(\.[0-9]*\)\{0,1\}\(.*\)/\1\2/')
pg_major_version = $(shell echo "$(pg_whole_version)"| sed -e 's/^\([0-9]\{2\}\)\(.*\)/\1/')
# for now, we only have a single test file
# due to the above explanation, we ended up separating the test paths for
# different versions. If you need to add new test files, be careful to add both versions
ifeq ($(pg_major_version),13)
test_path = t_pg13_pg14/*.pl
else ifeq ($(pg_major_version),14)
test_path = t_pg13_pg14/*.pl
else
test_path = t/*.pl
endif
# copied from pgxs/Makefile.global to use postgres' abs build dir for pg_regress
ifeq ($(enable_tap_tests),yes)
define citus_prove_installcheck
rm -rf '$(CURDIR)'/tmp_check
$(MKDIR_P) '$(CURDIR)'/tmp_check
cd $(srcdir) && \
TESTDIR='$(CURDIR)' \
PATH="$(bindir):$$PATH" \
PGPORT='6$(DEF_PGPORT)' \
top_builddir='$(CURDIR)/$(top_builddir)' \
PG_REGRESS='$(pgxsdir)/src/test/regress/pg_regress' \
TEMP_CONFIG='$(CURDIR)'/postgresql.conf \
$(PROVE) $(PG_PROVE_FLAGS) $(PROVE_FLAGS) $(if $(PROVE_TESTS),$(PROVE_TESTS),$(test_path))
endef
else
citus_prove_installcheck = @echo "TAP tests not enabled when postgres was compiled"
endif
installcheck:
$(citus_prove_installcheck)
clean distclean maintainer-clean:
rm -rf tmp_check

View File

@ -1,7 +0,0 @@
shared_preload_libraries=citus
shared_preload_libraries='citus'
vacuum_freeze_min_age = 50000
vacuum_freeze_table_age = 50000
synchronous_commit = off
fsync = off

View File

@ -1,52 +0,0 @@
# Minimal test testing streaming replication
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More tests => 2;
# Initialize single node
my $node_one = PostgreSQL::Test::Cluster->new('node_one');
$node_one->init();
$node_one->start;
# initialize the citus extension
$node_one->safe_psql('postgres', "CREATE EXTENSION citus;");
# create columnar table and insert simple data to verify the data survives a crash
$node_one->safe_psql('postgres', "
CREATE TABLE test_row(i int);
INSERT INTO test_row VALUES (1);
CREATE TABLE test_columnar_freeze(i int) USING columnar WITH(autovacuum_enabled=false);
INSERT INTO test_columnar_freeze VALUES (1);
");
my $ten_thousand_updates = "";
foreach (1..10000) {
$ten_thousand_updates .= "UPDATE test_row SET i = i + 1;\n";
}
# 70K updates
foreach (1..7) {
$node_one->safe_psql('postgres', $ten_thousand_updates);
}
my $result = $node_one->safe_psql('postgres', "
select age(relfrozenxid) < 70000 as was_frozen
from pg_class where relname='test_columnar_freeze';
");
print "node one count: $result\n";
is($result, qq(f), 'columnar table was not frozen');
$node_one->safe_psql('postgres', 'VACUUM FREEZE test_columnar_freeze;');
$result = $node_one->safe_psql('postgres', "
select age(relfrozenxid) < 70000 as was_frozen
from pg_class where relname='test_columnar_freeze';
");
print "node one count: $result\n";
is($result, qq(t), 'columnar table was frozen');
$node_one->stop('fast');

View File

@ -1,52 +0,0 @@
# Minimal test testing streaming replication
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 2;
# Initialize single node
my $node_one = get_new_node('node_one');
$node_one->init();
$node_one->start;
# initialize the citus extension
$node_one->safe_psql('postgres', "CREATE EXTENSION citus;");
# create columnar table and insert simple data to verify the data survives a crash
$node_one->safe_psql('postgres', "
CREATE TABLE test_row(i int);
INSERT INTO test_row VALUES (1);
CREATE TABLE test_columnar_freeze(i int) USING columnar WITH(autovacuum_enabled=false);
INSERT INTO test_columnar_freeze VALUES (1);
");
my $ten_thousand_updates = "";
foreach (1..10000) {
$ten_thousand_updates .= "UPDATE test_row SET i = i + 1;\n";
}
# 70K updates
foreach (1..7) {
$node_one->safe_psql('postgres', $ten_thousand_updates);
}
my $result = $node_one->safe_psql('postgres', "
select age(relfrozenxid) < 70000 as was_frozen
from pg_class where relname='test_columnar_freeze';
");
print "node one count: $result\n";
is($result, qq(f), 'columnar table was not frozen');
$node_one->safe_psql('postgres', 'VACUUM FREEZE test_columnar_freeze;');
$result = $node_one->safe_psql('postgres', "
select age(relfrozenxid) < 70000 as was_frozen
from pg_class where relname='test_columnar_freeze';
");
print "node one count: $result\n";
is($result, qq(t), 'columnar table was frozen');
$node_one->stop('fast');

View File

@ -1,2 +0,0 @@
# Generated by test suite
/tmp_check/

View File

@ -1,58 +0,0 @@
#-------------------------------------------------------------------------
#
# Makefile for src/test/recovery
#
# Losely based on the makefile found in postgres' src/test/recovery.
# We need to define our own invocation of prove to pass the correct path
# to pg_regress and include citus in the shared preload libraries.
#
#-------------------------------------------------------------------------
subdir = src/test/recovery
top_builddir = ../../..
include $(top_builddir)/Makefile.global
# In PG15, Perl test modules have been moved to a new namespace
# new() and get_new_node() methods have been unified to 1 method: new()
# Relevant PG commits 201a76183e2056c2217129e12d68c25ec9c559c8
# b3b4d8e68ae83f432f43f035c7eb481ef93e1583
pg_version = $(shell $(PG_CONFIG) --version 2>/dev/null)
pg_whole_version = $(shell echo "$(pg_version)"| sed -e 's/^PostgreSQL \([0-9]*\)\(\.[0-9]*\)\{0,1\}\(.*\)/\1\2/')
pg_major_version = $(shell echo "$(pg_whole_version)"| sed -e 's/^\([0-9]\{2\}\)\(.*\)/\1/')
# for now, we only have a single test file
# due to the above explanation, we ended up separating the test paths for
# different versions. If you need to add new test files, be careful to add both versions
ifeq ($(pg_major_version),13)
test_path = t_pg13_pg14/*.pl
else ifeq ($(pg_major_version),14)
test_path = t_pg13_pg14/*.pl
else
test_path = t/*.pl
endif
# copied from pgxs/Makefile.global to use postgres' abs build dir for pg_regress
ifeq ($(enable_tap_tests),yes)
define citus_prove_installcheck
rm -rf '$(CURDIR)'/tmp_check
$(MKDIR_P) '$(CURDIR)'/tmp_check
cd $(srcdir) && \
TESTDIR='$(CURDIR)' \
PATH="$(bindir):$$PATH" \
PGPORT='6$(DEF_PGPORT)' \
top_builddir='$(CURDIR)/$(top_builddir)' \
PG_REGRESS='$(pgxsdir)/src/test/regress/pg_regress' \
TEMP_CONFIG='$(CURDIR)'/postgresql.conf \
$(PROVE) $(PG_PROVE_FLAGS) $(PROVE_FLAGS) $(if $(PROVE_TESTS),$(PROVE_TESTS),$(test_path))
endef
else
citus_prove_installcheck = @echo "TAP tests not enabled when postgres was compiled"
endif
installcheck:
$(citus_prove_installcheck)
clean distclean maintainer-clean:
rm -rf tmp_check

View File

@ -1 +0,0 @@
shared_preload_libraries=citus

View File

@ -1,98 +0,0 @@
# Minimal test testing streaming replication
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More tests => 6;
# Initialize single node
my $node_one = PostgreSQL::Test::Cluster->new('node_one');
$node_one->init();
$node_one->start;
# initialize the citus extension
$node_one->safe_psql('postgres', "CREATE EXTENSION citus;");
# create columnar table and insert simple data to verify the data survives a crash
$node_one->safe_psql('postgres', "
BEGIN;
CREATE TABLE t1 (a int, b text) USING columnar;
INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1002) AS a;
COMMIT;
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
my $result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(1002), 'columnar recovered data from before crash');
# truncate the table to verify the truncation survives a crash
$node_one->safe_psql('postgres', "
TRUNCATE t1;
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(0), 'columnar recovered truncation');
# test crashing while having an open transaction
$node_one->safe_psql('postgres', "
BEGIN;
INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1003) AS a;
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(0), 'columnar crash during uncommitted transaction');
# test crashing while having a prepared transaction
$node_one->safe_psql('postgres', "
BEGIN;
INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1004) AS a;
PREPARE TRANSACTION 'prepared_xact_crash';
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(0), 'columnar crash during prepared transaction (before commit)');
$node_one->safe_psql('postgres', "
COMMIT PREPARED 'prepared_xact_crash';
");
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(1004), 'columnar crash during prepared transaction (after commit)');
# test crash recovery with copied data
$node_one->safe_psql('postgres', "
\\copy t1 FROM stdin delimiter ','
1,a
2,b
3,c
\\.
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(1007), 'columnar crash after copy command');

View File

@ -1,98 +0,0 @@
# Minimal test testing streaming replication
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 6;
# Initialize single node
my $node_one = get_new_node('node_one');
$node_one->init();
$node_one->start;
# initialize the citus extension
$node_one->safe_psql('postgres', "CREATE EXTENSION citus;");
# create columnar table and insert simple data to verify the data survives a crash
$node_one->safe_psql('postgres', "
BEGIN;
CREATE TABLE t1 (a int, b text) USING columnar;
INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1002) AS a;
COMMIT;
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
my $result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(1002), 'columnar recovered data from before crash');
# truncate the table to verify the truncation survives a crash
$node_one->safe_psql('postgres', "
TRUNCATE t1;
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(0), 'columnar recovered truncation');
# test crashing while having an open transaction
$node_one->safe_psql('postgres', "
BEGIN;
INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1003) AS a;
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(0), 'columnar crash during uncommitted transaction');
# test crashing while having a prepared transaction
$node_one->safe_psql('postgres', "
BEGIN;
INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1004) AS a;
PREPARE TRANSACTION 'prepared_xact_crash';
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(0), 'columnar crash during prepared transaction (before commit)');
$node_one->safe_psql('postgres', "
COMMIT PREPARED 'prepared_xact_crash';
");
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(1004), 'columnar crash during prepared transaction (after commit)');
# test crash recovery with copied data
$node_one->safe_psql('postgres', "
\\copy t1 FROM stdin delimiter ','
1,a
2,b
3,c
\\.
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(1007), 'columnar crash after copy command');

View File

@ -44,7 +44,7 @@ vanilla_diffs_file = $(citus_abs_srcdir)/pg_vanilla_outputs/$(MAJORVERSION)/regr
# intermediate, for muscle memory backward compatibility.
check: check-full check-enterprise-full
# check-full triggers all tests that ought to be run routinely
check-full: check-multi check-multi-mx check-multi-1 check-operations check-follower-cluster check-isolation check-failure check-split check-vanilla check-columnar check-columnar-isolation check-pg-upgrade check-arbitrary-configs check-citus-upgrade check-citus-upgrade-mixed check-citus-upgrade-local check-citus-upgrade-mixed-local
check-full: check-multi check-multi-mx check-multi-1 check-operations check-follower-cluster check-isolation check-failure check-split check-vanilla check-columnar check-columnar-isolation check-pg-upgrade check-arbitrary-configs check-citus-upgrade check-citus-upgrade-mixed check-citus-upgrade-local check-citus-upgrade-mixed-local check-pytest
# check-enterprise-full triggers all enterprise specific tests
check-enterprise-full: check-enterprise check-enterprise-isolation check-enterprise-failure check-enterprise-isolation-logicalrep-1 check-enterprise-isolation-logicalrep-2 check-enterprise-isolation-logicalrep-3
@ -259,6 +259,9 @@ check-arbitrary-configs: all
check-arbitrary-base: all
${arbitrary_config_check} --bindir=$(bindir) --pgxsdir=$(pgxsdir) --parallel=$(parallel) --configs=$(CONFIGS) --seed=$(seed) --base
check-pytest:
pytest -n auto
check-citus-upgrade: all
$(citus_upgrade_check) \
--bindir=$(bindir) \

View File

@ -14,6 +14,7 @@ filelock = "*"
pytest-asyncio = "*"
pytest-timeout = "*"
pytest-xdist = "*"
pytest-repeat = "*"
[dev-packages]
black = "*"

View File

@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "456a43ce06df947ccbf02db7fcbfd654999acaae25911990d4d74fc04b10c77e"
"sha256": "eb9ca3a7b05e76c7ac60179a1755f89600dfb215e02bf08c258d548df1d96025"
},
"pipfile-spec": 6,
"requires": {
@ -614,6 +614,14 @@
"index": "pypi",
"version": "==0.20.3"
},
"pytest-repeat": {
"hashes": [
"sha256:4474a7d9e9137f6d8cc8ae297f8c4168d33c56dd740aa78cfffe562557e6b96e",
"sha256:5cd3289745ab3156d43eb9c8e7f7d00a926f3ae5c9cf425bec649b2fe15bad5b"
],
"index": "pypi",
"version": "==0.9.1"
},
"pytest-timeout": {
"hashes": [
"sha256:c07ca07404c612f8abbe22294b23c368e2e5104b521c1790195561f37e1ac3d9",

View File

@ -106,10 +106,18 @@ Adding a new test file is quite simple:
See [`src/test/regress/spec/README.md`](https://github.com/citusdata/citus/blob/master/src/test/regress/spec/README.md)
## Pytest testing
See [`src/test/regress/citus_tests/test/README.md`](https://github.com/citusdata/citus/blob/master/src/test/regress/citus_tests/test/README.md)
## Upgrade testing
See [`src/test/regress/citus_tests/upgrade/README.md`](https://github.com/citusdata/citus/blob/master/src/test/regress/citus_tests/upgrade/README.md)
## Arbitrary configs testing
See [`src/test/regress/citus_tests/arbitrary_configs/README.md`](https://github.com/citusdata/citus/blob/master/src/test/regress/citus_tests/arbitrary_configsupgrade/README.md)
## Failure testing
See [`src/test/regress/mitmscripts/README.md`](https://github.com/citusdata/citus/blob/master/src/test/regress/mitmscripts/README.md)

View File

@ -1,13 +1,48 @@
import asyncio
import atexit
import concurrent.futures
import os
import platform
import random
import re
import shutil
import socket
import subprocess
import sys
import time
import typing
from abc import ABC, abstractmethod
from contextlib import asynccontextmanager, closing, contextmanager
from datetime import datetime, timedelta
from pathlib import Path
from tempfile import gettempdir
import filelock
import psycopg
import psycopg.sql
import utils
from psycopg import sql
from utils import USER
LINUX = False
MACOS = False
FREEBSD = False
OPENBSD = False
if platform.system() == "Linux":
LINUX = True
elif platform.system() == "Darwin":
MACOS = True
elif platform.system() == "FreeBSD":
FREEBSD = True
elif platform.system() == "OpenBSD":
OPENBSD = True
BSD = MACOS or FREEBSD or OPENBSD
TIMEOUT_DEFAULT = timedelta(seconds=int(os.getenv("PG_TEST_TIMEOUT_DEFAULT", "10")))
FORCE_PORTS = os.getenv("PG_FORCE_PORTS", "NO").lower() not in ("no", "0", "n", "")
def initialize_temp_dir(temp_dir):
if os.path.exists(temp_dir):
@ -315,8 +350,834 @@ def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def run(command, *args, shell=True, **kwargs):
def run(command, *args, check=True, shell=True, silent=False, **kwargs):
"""run runs the given command and prints it to stderr"""
eprint(f"+ {command} ")
return subprocess.run(command, *args, check=True, shell=shell, **kwargs)
if not silent:
eprint(f"+ {command} ")
if silent:
kwargs.setdefault("stdout", subprocess.DEVNULL)
return subprocess.run(command, *args, check=check, shell=shell, **kwargs)
def sudo(command, *args, shell=True, **kwargs):
"""
A version of run that prefixes the command with sudo when the process is
not already run as root
"""
effective_user_id = os.geteuid()
if effective_user_id == 0:
return run(command, *args, shell=shell, **kwargs)
if shell:
return run(f"sudo {command}", *args, shell=shell, **kwargs)
else:
return run(["sudo", *command])
# this is out of ephemeral port range for many systems hence
# it is a lower chance that it will conflict with "in-use" ports
PORT_LOWER_BOUND = 10200
# ephemeral port start on many Linux systems
PORT_UPPER_BOUND = 32768
next_port = PORT_LOWER_BOUND
def cleanup_test_leftovers(nodes):
"""
Cleaning up test leftovers needs to be done in a specific order, because
some of these leftovers depend on others having been removed. They might
even depend on leftovers on other nodes being removed. So this takes a list
of nodes, so that we can clean up all test leftovers globally in the
correct order.
"""
for node in nodes:
node.cleanup_subscriptions()
for node in nodes:
node.cleanup_publications()
for node in nodes:
node.cleanup_logical_replication_slots()
for node in nodes:
node.cleanup_schemas()
for node in nodes:
node.cleanup_users()
class PortLock:
"""PortLock allows you to take a lock an a specific port.
While a port is locked by one process, other processes using PortLock won't
get the same port.
"""
def __init__(self):
global next_port
first_port = next_port
while True:
next_port += 1
if next_port >= PORT_UPPER_BOUND:
next_port = PORT_LOWER_BOUND
# avoid infinite loop
if first_port == next_port:
raise Exception("Could not find port")
self.lock = filelock.FileLock(Path(gettempdir()) / f"port-{next_port}.lock")
try:
self.lock.acquire(timeout=0)
except filelock.Timeout:
continue
if FORCE_PORTS:
self.port = next_port
break
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
try:
s.bind(("127.0.0.1", next_port))
self.port = next_port
break
except Exception:
self.lock.release()
continue
def release(self):
"""Call release when you are done with the port.
This way other processes can use it again.
"""
self.lock.release()
class QueryRunner(ABC):
"""A subclassable interface class that can be used to run queries.
This is mostly useful to be generic across differnt types of things that
implement the Postgres interface, such as Postgres, PgBouncer, or a Citus
cluster.
This implements some helpers send queries in a simpler manner than psycopg
allows by default.
"""
@abstractmethod
def set_default_connection_options(self, options: dict[str, typing.Any]):
"""Sets the default connection options on the given options dictionary
This is the only method that the class that subclasses QueryRunner
needs to implement.
"""
...
def make_conninfo(self, **kwargs) -> str:
self.set_default_connection_options(kwargs)
return psycopg.conninfo.make_conninfo(**kwargs)
def conn(self, *, autocommit=True, **kwargs):
"""Open a psycopg connection to this server"""
self.set_default_connection_options(kwargs)
return psycopg.connect(
autocommit=autocommit,
**kwargs,
)
def aconn(self, *, autocommit=True, **kwargs):
"""Open an asynchronous psycopg connection to this server"""
self.set_default_connection_options(kwargs)
return psycopg.AsyncConnection.connect(
autocommit=autocommit,
**kwargs,
)
@contextmanager
def cur(self, autocommit=True, **kwargs):
"""Open an psycopg cursor to this server
The connection and the cursors automatically close once you leave the
"with" block
"""
with self.conn(
autocommit=autocommit,
**kwargs,
) as conn:
with conn.cursor() as cur:
yield cur
@asynccontextmanager
async def acur(self, **kwargs):
"""Open an asynchronous psycopg cursor to this server
The connection and the cursors automatically close once you leave the
"async with" block
"""
async with await self.aconn(**kwargs) as conn:
async with conn.cursor() as cur:
yield cur
def sql(self, query, params=None, **kwargs):
"""Run an SQL query
This opens a new connection and closes it once the query is done
"""
with self.cur(**kwargs) as cur:
cur.execute(query, params=params)
def sql_value(self, query, params=None, allow_empty_result=False, **kwargs):
"""Run an SQL query that returns a single cell and return this value
This opens a new connection and closes it once the query is done
"""
with self.cur(**kwargs) as cur:
cur.execute(query, params=params)
result = cur.fetchall()
if allow_empty_result and len(result) == 0:
return None
assert len(result) == 1
assert len(result[0]) == 1
value = result[0][0]
return value
def asql(self, query, **kwargs):
"""Run an SQL query in asynchronous task
This opens a new connection and closes it once the query is done
"""
return asyncio.ensure_future(self.asql_coroutine(query, **kwargs))
async def asql_coroutine(
self, query, params=None, **kwargs
) -> typing.Optional[typing.List[typing.Any]]:
async with self.acur(**kwargs) as cur:
await cur.execute(query, params=params)
try:
return await cur.fetchall()
except psycopg.ProgrammingError as e:
if "the last operation didn't produce a result" == str(e):
return None
raise
def psql(self, query, **kwargs):
"""Run an SQL query using psql instead of psycopg
This opens a new connection and closes it once the query is done
"""
conninfo = self.make_conninfo(**kwargs)
run(
["psql", "-X", f"{conninfo}", "-c", query],
shell=False,
silent=True,
)
def poll_query_until(self, query, params=None, expected=True, **kwargs):
"""Run query repeatedly until it returns the expected result"""
start = datetime.now()
result = None
while start + TIMEOUT_DEFAULT > datetime.now():
result = self.sql_value(
query, params=params, allow_empty_result=True, **kwargs
)
if result == expected:
return
time.sleep(0.1)
raise Exception(
f"Timeout reached while polling query, last result was: {result}"
)
@contextmanager
def transaction(self, **kwargs):
with self.cur(**kwargs) as cur:
with cur.connection.transaction():
yield cur
def sleep(self, duration=3, **kwargs):
"""Run pg_sleep"""
return self.sql(f"select pg_sleep({duration})", **kwargs)
def asleep(self, duration=3, times=1, sequentially=False, **kwargs):
"""Run pg_sleep asynchronously in a task.
times:
You can create a single task that opens multiple connections, which
run pg_sleep concurrently. The asynchronous task will only complete
once all these pg_sleep calls are finished.
sequentially:
Instead of running all pg_sleep calls spawned by providing
times > 1 concurrently, this will run them sequentially.
"""
return asyncio.ensure_future(
self.asleep_coroutine(
duration=duration, times=times, sequentially=sequentially, **kwargs
)
)
async def asleep_coroutine(self, duration=3, times=1, sequentially=False, **kwargs):
"""This is the coroutine that the asleep task runs internally"""
if not sequentially:
await asyncio.gather(
*[
self.asql(f"select pg_sleep({duration})", **kwargs)
for _ in range(times)
]
)
else:
for _ in range(times):
await self.asql(f"select pg_sleep({duration})", **kwargs)
def test(self, **kwargs):
"""Test if you can connect"""
return self.sql("select 1", **kwargs)
def atest(self, **kwargs):
"""Test if you can connect asynchronously"""
return self.asql("select 1", **kwargs)
def psql_test(self, **kwargs):
"""Test if you can connect with psql instead of psycopg"""
return self.psql("select 1", **kwargs)
def debug(self):
print("Connect manually to:\n ", repr(self.make_conninfo()))
print("Press Enter to continue running the test...")
input()
def psql_debug(self, **kwargs):
conninfo = self.make_conninfo(**kwargs)
run(
["psql", f"{conninfo}"],
shell=False,
silent=True,
)
class Postgres(QueryRunner):
"""A class that represents a Postgres instance on this machine
You can query it by using the interface provided by QueryRunner or use many
of the helper methods.
"""
def __init__(self, pgdata):
self.port_lock = PortLock()
# These values should almost never be changed after initialization
self.host = "127.0.0.1"
self.port = self.port_lock.port
# These values can be changed when needed
self.dbname = "postgres"
self.user = "postgres"
self.schema = None
self.pgdata = pgdata
self.log_path = self.pgdata / "pg.log"
# Used to track objects that we want to clean up at the end of a test
self.subscriptions = set()
self.publications = set()
self.logical_replication_slots = set()
self.schemas = set()
self.users = set()
def set_default_connection_options(self, options):
options.setdefault("host", self.host)
options.setdefault("port", self.port)
options.setdefault("dbname", self.dbname)
options.setdefault("user", self.user)
if self.schema is not None:
options.setdefault("options", f"-c search_path={self.schema}")
options.setdefault("connect_timeout", 3)
# needed for Ubuntu 18.04
options.setdefault("client_encoding", "UTF8")
def initdb(self):
run(
f"initdb -A trust --nosync --username postgres --pgdata {self.pgdata} --allow-group-access --encoding UTF8 --locale POSIX",
stdout=subprocess.DEVNULL,
)
with self.conf_path.open(mode="a") as pgconf:
# Allow connecting over unix sockets
pgconf.write("unix_socket_directories = '/tmp'\n")
# Useful logs for debugging issues
pgconf.write("log_replication_commands = on\n")
# The following to are also useful for debugging, but quite noisy.
# So better to enable them manually by uncommenting.
# pgconf.write("log_connections = on\n")
# pgconf.write("log_disconnections = on\n")
# Enable citus
pgconf.write("shared_preload_libraries = 'citus'\n")
# Allow CREATE SUBSCRIPTION to work
pgconf.write("wal_level = 'logical'\n")
# Faster logical replication status update so tests with logical replication
# run faster
pgconf.write("wal_receiver_status_interval = 1\n")
# Faster logical replication apply worker launch so tests with logical
# replication run faster. This is used in ApplyLauncherMain in
# src/backend/replication/logical/launcher.c.
pgconf.write("wal_retrieve_retry_interval = '250ms'\n")
# Make sure there's enough logical replication resources for most
# of our tests
pgconf.write("max_logical_replication_workers = 50\n")
pgconf.write("max_wal_senders = 50\n")
pgconf.write("max_worker_processes = 50\n")
pgconf.write("max_replication_slots = 50\n")
# We need to make the log go to stderr so that the tests can
# check what is being logged. This should be the default, but
# some packagings change the default configuration.
pgconf.write("log_destination = stderr\n")
# We don't need the logs anywhere else than stderr
pgconf.write("logging_collector = off\n")
# This makes tests run faster and we don't care about crash safety
# of our test data.
pgconf.write("fsync = false\n")
# conservative settings to ensure we can run multiple postmasters:
pgconf.write("shared_buffers = 1MB\n")
# limit disk space consumption, too:
pgconf.write("max_wal_size = 128MB\n")
# don't restart after crashes to make it obvious that a crash
# happened
pgconf.write("restart_after_crash = off\n")
os.truncate(self.hba_path, 0)
self.ssl_access("all", "trust")
self.nossl_access("all", "trust")
self.commit_hba()
def init_with_citus(self):
self.initdb()
self.start()
self.sql("CREATE EXTENSION citus")
# Manually turn on ssl, so that we can safely truncate
# postgresql.auto.conf later. We can only do this after creating the
# citus extension because that creates the self signed certificates.
with self.conf_path.open(mode="a") as pgconf:
pgconf.write("ssl = on\n")
def pgctl(self, command, **kwargs):
run(f"pg_ctl -w --pgdata {self.pgdata} {command}", **kwargs)
def apgctl(self, command, **kwargs):
return asyncio.create_subprocess_shell(
f"pg_ctl -w --pgdata {self.pgdata} {command}", **kwargs
)
def start(self):
try:
self.pgctl(f'-o "-p {self.port}" -l {self.log_path} start')
except Exception:
print(f"\n\nPG_LOG: {self.pgdata}\n")
with self.log_path.open() as f:
print(f.read())
raise
def stop(self, mode="fast"):
self.pgctl(f"-m {mode} stop", check=False)
def cleanup(self):
self.stop()
self.port_lock.release()
def restart(self):
self.stop()
self.start()
def reload(self):
self.pgctl("reload")
# Sadly UNIX signals are asynchronous, so we sleep a bit and hope that
# Postgres actually processed the SIGHUP signal after the sleep.
time.sleep(0.1)
async def arestart(self):
process = await self.apgctl("-m fast restart")
await process.communicate()
def nossl_access(self, dbname, auth_type):
"""Prepends a local non-SSL access to the HBA file"""
with self.hba_path.open() as pghba:
old_contents = pghba.read()
with self.hba_path.open(mode="w") as pghba:
pghba.write(f"local {dbname} all {auth_type}\n")
pghba.write(f"hostnossl {dbname} all 127.0.0.1/32 {auth_type}\n")
pghba.write(f"hostnossl {dbname} all ::1/128 {auth_type}\n")
pghba.write(old_contents)
def ssl_access(self, dbname, auth_type):
"""Prepends a local SSL access rule to the HBA file"""
with self.hba_path.open() as pghba:
old_contents = pghba.read()
with self.hba_path.open(mode="w") as pghba:
pghba.write(f"hostssl {dbname} all 127.0.0.1/32 {auth_type}\n")
pghba.write(f"hostssl {dbname} all ::1/128 {auth_type}\n")
pghba.write(old_contents)
@property
def hba_path(self):
return self.pgdata / "pg_hba.conf"
@property
def conf_path(self):
return self.pgdata / "postgresql.conf"
def commit_hba(self):
"""Mark the current HBA contents as non-resetable by reset_hba"""
with self.hba_path.open() as pghba:
old_contents = pghba.read()
with self.hba_path.open(mode="w") as pghba:
pghba.write("# committed-rules\n")
pghba.write(old_contents)
def reset_hba(self):
"""Remove any HBA rules that were added after the last call to commit_hba"""
with self.hba_path.open() as f:
hba_contents = f.read()
committed = hba_contents[hba_contents.find("# committed-rules\n") :]
with self.hba_path.open("w") as f:
f.write(committed)
def prepare_reset(self):
"""Prepares all changes to reset Postgres settings and objects
To actually apply the prepared changes a restart might still be needed.
"""
self.reset_hba()
os.truncate(self.pgdata / "postgresql.auto.conf", 0)
def reset(self):
"""Resets any changes to Postgres settings from previous tests"""
self.prepare_reset()
self.restart()
async def delayed_start(self, delay=1):
"""Start Postgres after a delay
NOTE: The sleep is asynchronous, but while waiting for Postgres to
start the pg_ctl start command will block the event loop. This is
currently acceptable for our usage of this method in the existing
tests and this way it was easiest to implement. However, it seems
totally reasonable to change this behaviour in the future if necessary.
"""
await asyncio.sleep(delay)
self.start()
def configure(self, *configs):
"""Configure specific Postgres settings using ALTER SYSTEM SET
NOTE: after configuring a call to reload or restart is needed for the
settings to become effective.
"""
for config in configs:
self.sql(f"alter system set {config}")
def log_handle(self):
"""Returns the opened logfile at the current end of the log
By later calling read on this file you can read the contents that were
written from this moment on.
IMPORTANT: This handle should be closed once it's not needed anymore
"""
f = self.log_path.open()
f.seek(0, os.SEEK_END)
return f
@contextmanager
def log_contains(self, re_string, times=None):
"""Checks if during this with block the log matches re_string
re_string:
The regex to search for.
times:
If None, any number of matches is accepted. If a number, only that
specific number of matches is accepted.
"""
with self.log_handle() as f:
yield
content = f.read()
if times is None:
assert re.search(re_string, content)
else:
match_count = len(re.findall(re_string, content))
assert match_count == times
def create_user(self, name, args: typing.Optional[psycopg.sql.Composable] = None):
self.users.add(name)
if args is None:
args = sql.SQL("")
self.sql(sql.SQL("CREATE USER {} {}").format(sql.Identifier(name), args))
def create_schema(self, name):
self.schemas.add(name)
self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name)))
def create_publication(self, name: str, args: psycopg.sql.Composable):
self.publications.add(name)
self.sql(sql.SQL("CREATE PUBLICATION {} {}").format(sql.Identifier(name), args))
def create_logical_replication_slot(
self, name, plugin, temporary=False, twophase=False
):
self.logical_replication_slots.add(name)
self.sql(
"SELECT pg_catalog.pg_create_logical_replication_slot(%s,%s,%s,%s)",
(name, plugin, temporary, twophase),
)
def create_subscription(self, name: str, args: psycopg.sql.Composable):
self.subscriptions.add(name)
self.sql(
sql.SQL("CREATE SUBSCRIPTION {} {}").format(sql.Identifier(name), args)
)
def cleanup_users(self):
for user in self.users:
self.sql(sql.SQL("DROP USER IF EXISTS {}").format(sql.Identifier(user)))
def cleanup_schemas(self):
for schema in self.schemas:
self.sql(
sql.SQL("DROP SCHEMA IF EXISTS {} CASCADE").format(
sql.Identifier(schema)
)
)
def cleanup_publications(self):
for publication in self.publications:
self.sql(
sql.SQL("DROP PUBLICATION IF EXISTS {}").format(
sql.Identifier(publication)
)
)
def cleanup_logical_replication_slots(self):
for slot in self.logical_replication_slots:
self.sql(
"SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = %s",
(slot,),
)
def cleanup_subscriptions(self):
for subscription in self.subscriptions:
try:
self.sql(
sql.SQL("ALTER SUBSCRIPTION {} DISABLE").format(
sql.Identifier(subscription)
)
)
except psycopg.errors.UndefinedObject:
# Subscription didn't exist already
continue
self.sql(
sql.SQL("ALTER SUBSCRIPTION {} SET (slot_name = NONE)").format(
sql.Identifier(subscription)
)
)
self.sql(
sql.SQL("DROP SUBSCRIPTION {}").format(sql.Identifier(subscription))
)
def lsn(self, mode):
"""Returns the lsn for the given mode"""
queries = {
"insert": "SELECT pg_current_wal_insert_lsn()",
"flush": "SELECT pg_current_wal_flush_lsn()",
"write": "SELECT pg_current_wal_lsn()",
"receive": "SELECT pg_last_wal_receive_lsn()",
"replay": "SELECT pg_last_wal_replay_lsn()",
}
return self.sql_value(queries[mode])
def wait_for_catchup(self, subscription_name, mode="replay", target_lsn=None):
"""Waits until the subscription has caught up"""
if target_lsn is None:
target_lsn = self.lsn("write")
# Before release 12 walreceiver just set the application name to
# "walreceiver"
self.poll_query_until(
sql.SQL(
"""
SELECT {} <= {} AND state = 'streaming'
FROM pg_catalog.pg_stat_replication
WHERE application_name IN ({}, 'walreceiver')
"""
).format(target_lsn, sql.Identifier(f"{mode}_lsn"), subscription_name)
)
@contextmanager
def _enable_firewall(self):
"""Enables the firewall for the platform that you are running
Normally this should not be called directly, and instead drop_traffic
or reject_traffic should be used.
"""
fw_token = None
if BSD:
if MACOS:
command_stderr = sudo(
"pfctl -E", stderr=subprocess.PIPE, text=True
).stderr
match = re.search(r"^Token : (\d+)", command_stderr, flags=re.MULTILINE)
assert match is not None
fw_token = match.group(1)
sudo(
'bash -c "'
f"echo 'anchor \\\"port_{self.port}\\\"'"
f' | pfctl -a citus_test -f -"'
)
try:
yield
finally:
if MACOS:
sudo(f"pfctl -X {fw_token}")
@contextmanager
def drop_traffic(self):
"""Drops all TCP packets to this query runner"""
with self._enable_firewall():
if LINUX:
sudo(
"iptables --append OUTPUT "
"--protocol tcp "
f"--destination {self.host} "
f"--destination-port {self.port} "
"--jump DROP "
)
elif BSD:
sudo(
"bash -c '"
f'echo "block drop out proto tcp from any to {self.host} port {self.port}"'
f"| pfctl -a citus_test/port_{self.port} -f -'"
)
else:
raise Exception("This OS cannot run this test")
try:
yield
finally:
if LINUX:
sudo(
"iptables --delete OUTPUT "
"--protocol tcp "
f"--destination {self.host} "
f"--destination-port {self.port} "
"--jump DROP "
)
elif BSD:
sudo(f"pfctl -a citus_test/port_{self.port} -F all")
@contextmanager
def reject_traffic(self):
"""Rejects all traffic to this query runner with a TCP RST message"""
with self._enable_firewall():
if LINUX:
sudo(
"iptables --append OUTPUT "
"--protocol tcp "
f"--destination {self.host} "
f"--destination-port {self.port} "
"--jump REJECT "
"--reject-with tcp-reset"
)
elif BSD:
sudo(
"bash -c '"
f'echo "block return-rst out out proto tcp from any to {self.host} port {self.port}"'
f"| pfctl -a citus_test/port_{self.port} -f -'"
)
else:
raise Exception("This OS cannot run this test")
try:
yield
finally:
if LINUX:
sudo(
"iptables --delete OUTPUT "
"--protocol tcp "
f"--destination {self.host} "
f"--destination-port {self.port} "
"--jump REJECT "
"--reject-with tcp-reset"
)
elif BSD:
sudo(f"pfctl -a citus_test/port_{self.port} -F all")
class CitusCluster(QueryRunner):
"""A class that represents a Citus cluster on this machine
The nodes in the cluster can be accessed directly using the coordinator,
workers, and nodes properties.
If it doesn't matter which of the nodes in the cluster is used to run a
query, then you can use the methods provided by QueryRunner directly on the
cluster. In that case a random node will be chosen to run your query.
"""
def __init__(self, basedir: Path, worker_count: int):
self.coordinator = Postgres(basedir / "coordinator")
self.workers = [Postgres(basedir / f"worker{i}") for i in range(worker_count)]
self.nodes = [self.coordinator] + self.workers
self._schema = None
self.failed_reset = False
parallel_run(Postgres.init_with_citus, self.nodes)
with self.coordinator.cur() as cur:
cur.execute(
"SELECT pg_catalog.citus_set_coordinator_host(%s, %s)",
(self.coordinator.host, self.coordinator.port),
)
for worker in self.workers:
cur.execute(
"SELECT pg_catalog.citus_add_node(%s, %s)",
(worker.host, worker.port),
)
def set_default_connection_options(self, options):
random.choice(self.nodes).set_default_connection_options(options)
@property
def schema(self):
return self._schema
@schema.setter
def schema(self, value):
self._schema = value
for node in self.nodes:
node.schema = value
def reset(self):
"""Resets any changes to Postgres settings from previous tests"""
parallel_run(Postgres.prepare_reset, self.nodes)
parallel_run(Postgres.restart, self.nodes)
def cleanup(self):
parallel_run(Postgres.cleanup, self.nodes)
def debug(self):
"""Print information to stdout to help with debugging your cluster"""
print("The nodes in this cluster and their connection strings are:")
for node in self.nodes:
print(f"{node.pgdata}:\n ", repr(node.make_conninfo()))
print("Press Enter to continue running the test...")
input()

View File

@ -9,7 +9,6 @@ import re
import shutil
import sys
from collections import OrderedDict
from glob import glob
from typing import Optional
import common
@ -26,6 +25,27 @@ def schedule_line_is_upgrade_after(test_schedule_line: str) -> bool:
)
def run_python_test(test_file_name, repeat):
"""Runs the test using pytest
This function never returns as it usese os.execlp to replace the current
process with a new pytest process.
"""
test_path = regress_dir / "citus_tests" / "test" / f"{test_file_name}.py"
if not test_path.exists():
raise Exception("Test could not be found in any schedule")
os.execlp(
"pytest",
"pytest",
"--numprocesses",
"auto",
"--count",
str(repeat),
str(test_path),
)
if __name__ == "__main__":
args = argparse.ArgumentParser()
args.add_argument(
@ -64,7 +84,9 @@ if __name__ == "__main__":
args = vars(args.parse_args())
regress_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
regress_dir = pathlib.Path(
os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
)
test_file_path = args["path"]
test_file_name = args["test_name"]
use_base_schedule = args["use_base_schedule"]
@ -138,17 +160,20 @@ if __name__ == "__main__":
test_file_extension = pathlib.Path(test_file_path).suffix
test_file_name = pathlib.Path(test_file_path).stem
if test_file_extension not in ".spec.sql":
if test_file_extension not in (".spec", ".sql", ".py"):
print(
"ERROR: Unrecognized test extension. Valid extensions are: .sql and .spec"
"ERROR: Unrecognized test extension. Valid extensions are: .sql, .spec, and .py"
)
sys.exit(1)
test_schedule = ""
dependencies = []
if test_file_name.startswith("test_"):
run_python_test(test_file_name, args["repeat"])
# find related schedule
for schedule_file_path in sorted(glob(os.path.join(regress_dir, "*_schedule"))):
for schedule_file_path in sorted(regress_dir.glob("*_schedule")):
for schedule_line in open(schedule_file_path, "r"):
if re.search(r"\b" + test_file_name + r"\b", schedule_line):
test_schedule = pathlib.Path(schedule_file_path).stem

View File

@ -0,0 +1,171 @@
# Pytest based tests
## Usage
Run all tests in parallel:
```bash
pytest -n auto
```
Run all tests sequentially:
```bash
pytest
```
Run a specific test:
```bash
pytest test/test_columnar.py::test_recovery
```
Run a specific test file in parallel:
```bash
pytest -n auto test/test_columnar.py
```
Run any test that contains a certain string in the name:
```bash
pytest -k recovery
```
Run tests without it capturing stdout/stderr. This can be useful to see the
logs of a passing test:
```bash
pytest -s test/test_columnar.py::test_recovery
```
## General info
Our other tests work by comparing output of a sequence of SQL commands that's
executed by `psql` to an expected output. If there's a difference between the
expected and actual output, then the tests fails. This works fine for many
cases, but certain types of tests are hard to write and a lot of care usually
has to be taken to make sure output is completely identical in every run.
The tests in this directory use a different approach and use
[`pytest`][pytest-docs] to run tests that are written in the Python programming
language. This idea is similar to TAP tests that are part of Postgres, with the
important difference that those are written in Perl.
In the sections below you can find most stuff you'll need to know about
`pytest` to run and write such tests, but if you want more detailed info some
useful references are:
- [A blog with pytest tips and tricks][pytest-tips]
- [The official pytest docs][pytest-docs]
[pytest-docs]: https://docs.pytest.org/en/stable/
[pytest-tips]: https://pythontest.com/pytest-tips-tricks/
## Adding a new test
Tests are automatically discovered by `pytest` using a simple but effective
heuristic. In this directory (`src/test/regress/citus_tests/test`) it finds
all of the files that are named `test_{some name}.py`. Those files
are then searched for function names starting with the `test_` prefix. All those
functions are considered tests by `pytest`.
### Fixtures aka Dependency Injection aka Teardown/Cleanup
An important part of tests is that they have some dependencies. The most
important dependency for us is usually a running Citus cluster. These
dependencies are provided by what `pytest` calls [fixtures]. Fixtures are
functions that `yield` a value. Anything before the `yield` is done during setup
and anything after the yield is done during teardown of the test (or whole
session). All our fixtures are defined in `conftest.py`.
Using a fixture in a test is very easy, but looks like a lot of magic. All you
have to do is make sure your test function has an argument with the same name as
the name of the fixture. For example:
```python
def test_some_query(cluster):
cluster.coordinator.sql("SELECT 1")
```
If you need a cluster of a specific size you can use the `cluster_factory`
fixture:
```python
def test_with_100_workers(cluster_factory):
cluster = cluster_factory(100)
```
If you want more details on how fixtures work a few useful pages of the pytest
docs are:
- [About fixtures][fixtures]
- [How to use fixtures][fixtures-how-to]
- [Fixtures reference][fixtures-reference]
[fixtures]: https://docs.pytest.org/en/stable/explanation/fixtures.html
[fixtures-how-to]: https://docs.pytest.org/en/stable/how-to/fixtures.html
[fixtures-reference]: https://docs.pytest.org/en/stable/reference/fixtures.html
## Connecting to a test postgres
Sometimes your test is failing in an unexpected way and the easiest way to find
out why is to connect to Postgres at a certain point interactively.
### Using `psql_debug`
The easiest way is to use the `psql_debug()` method of your `Cluster` or
`Postgres` instance.
```python
def test_something(cluster):
# working stuff
cluster.coordinator.psql_debug()
# unexpectedly failing test
```
Then run this test with stdout/stderr capturing disabled (`-s`) and it will show
you an interactive `psql` prompt right at that point in the test:
```bash
$ pytest -s test/test_your_thing.py::test_something
...
psql (15.2)
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, compression: off)
Type "help" for help.
127.0.0.1 postgres@postgres:10201-20016=
> select 1;
```
### Debug manually
Sometimes you need to connect to more than one node though. For that you can use
a `Cluster` its `debug` method instead.
```python
def test_something(cluster):
# working stuff
cluster.debug()
# unexpectedly failing test
```
Then run this test with stdout/stderr capturing disabled (`-s`) and it will show
you the connection string for each of the nodes in the cluster:
```bash
$ PG_FORCE_PORTS=true pytest -s test/test_your_thing.py::test_something
...
The nodes in this cluster and their connection strings are:
/tmp/pytest-of-jelte/pytest-752/cluster2-0/coordinator:
"host=127.0.0.1 port=10202 dbname=postgres user=postgres options='-c search_path=test_recovery' connect_timeout=3 client_encoding=UTF8"
/tmp/pytest-of-jelte/pytest-752/cluster2-0/worker0:
"host=127.0.0.1 port=10203 dbname=postgres user=postgres options='-c search_path=test_recovery' connect_timeout=3 client_encoding=UTF8"
/tmp/pytest-of-jelte/pytest-752/cluster2-0/worker1:
"host=127.0.0.1 port=10204 dbname=postgres user=postgres options='-c search_path=test_recovery' connect_timeout=3 client_encoding=UTF8"
Press Enter to continue running the test...
```
Then in another terminal you can manually connect to as many of them as you want.
Using `PG_FORCE_PORTS` is recommended here, to make sure that the ports will
stay the same across runs of the tests. That way you can reuse the connection
strings that you got from a previous run, if you need to debug again.

View File

@ -0,0 +1,95 @@
import pytest
from common import CitusCluster, Postgres, cleanup_test_leftovers, parallel_run
@pytest.fixture(scope="session")
def cluster_factory_session(tmp_path_factory):
"""The session level pytest fixture that creates and caches citus clusters
IMPORTANT: This should not be used directly, but only indirectly through
the cluster_factory fixture.
"""
clusters = {}
def _make_or_get_cluster(worker_count: int):
if worker_count not in clusters:
clusters[worker_count] = CitusCluster(
tmp_path_factory.mktemp(f"cluster{worker_count}-"), worker_count
)
return clusters[worker_count]
yield _make_or_get_cluster
parallel_run(CitusCluster.cleanup, clusters.values())
@pytest.fixture
def cluster_factory(cluster_factory_session, request):
"""The pytest fixture that creates and caches citus clusters
When the function provided by the factory is called, it returns a cluster
with the given worker count. This cluster is cached across tests, so that
future invocations with the same worker count don't need to create a
cluster again, but can reuse the previously created one.
To try and make sure that tests don't depend on eachother this tries very
hard to clean up anything that is created during the test.
It also prints the Postgres logs that were produced during the test to
stdout. Normally these will be hidden, but when a test fails pytest will
show all stdout output produced during the test. Thus showing the Postgres
logs in that case makes it easier to debug.
"""
log_handles = []
clusters = []
nodes = []
def _make_or_get_cluster(worker_count: int):
nonlocal log_handles
nonlocal nodes
cluster = cluster_factory_session(worker_count)
if cluster.failed_reset:
cluster.reset()
cluster.failed_reset = False
clusters.append(cluster)
log_handles += [(node, node.log_handle()) for node in cluster.nodes]
nodes += cluster.nodes
# Create a dedicated schema for the test and use it by default
cluster.coordinator.create_schema(request.node.originalname)
cluster.schema = request.node.originalname
return cluster
yield _make_or_get_cluster
try:
# We clean up test leftovers on all nodes together, instead of per
# cluster. The reason for this is that some subscriptions/publication
# pairs might be between different clusters. And by cleaning them up
# all together, the ordering of the DROPs is easy to make correct.
cleanup_test_leftovers(nodes)
parallel_run(Postgres.prepare_reset, nodes)
parallel_run(Postgres.restart, nodes)
except Exception:
for cluster in clusters:
cluster.failed_reset = True
raise
finally:
for node, log in log_handles:
print(f"\n\nPG_LOG: {node.pgdata}\n")
print(log.read())
log.close()
@pytest.fixture(name="coord")
def coordinator(cluster_factory):
"""Sets up a clean single-node Citus cluster for this test"""
yield cluster_factory(0).coordinator
@pytest.fixture
def cluster(cluster_factory):
"""Sets up a clean 2-worker Citus cluste for this test"""
yield cluster_factory(2)

View File

@ -0,0 +1,117 @@
import psycopg
import pytest
def test_freezing(coord):
coord.configure("vacuum_freeze_min_age = 50000", "vacuum_freeze_table_age = 50000")
coord.restart()
# create columnar table and insert simple data to verify the data survives
# a crash
coord.sql("CREATE TABLE test_row(i int)")
coord.sql("INSERT INTO test_row VALUES (1) ")
coord.sql(
"CREATE TABLE test_columnar_freeze(i int) USING columnar WITH(autovacuum_enabled=false)"
)
coord.sql("INSERT INTO test_columnar_freeze VALUES (1)")
for _ in range(0, 7):
with coord.cur() as cur:
for _ in range(0, 10_000):
cur.execute("UPDATE test_row SET i = i + 1")
frozen_age = coord.sql_value(
"""
select age(relfrozenxid)
from pg_class where relname='test_columnar_freeze';
"""
)
assert frozen_age > 70_000, "columnar table was frozen"
coord.sql("VACUUM FREEZE test_columnar_freeze")
frozen_age = coord.sql_value(
"""
select age(relfrozenxid)
from pg_class where relname='test_columnar_freeze';
"""
)
assert frozen_age < 70_000, "columnar table was not frozen"
def test_recovery(coord):
# create columnar table and insert simple data to verify the data survives a crash
coord.sql("CREATE TABLE t1 (a int, b text) USING columnar")
coord.sql(
"INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1002) AS a"
)
# simulate crash
coord.stop("immediate")
coord.start()
row_count = coord.sql_value("SELECT count(*) FROM t1")
assert row_count == 1002, "columnar didn't recover data before crash correctly"
# truncate the table to verify the truncation survives a crash
coord.sql("TRUNCATE t1")
# simulate crash
coord.stop("immediate")
coord.start()
row_count = coord.sql_value("SELECT count(*) FROM t1")
assert row_count == 0, "columnar didn't recover the truncate correctly"
# test crashing while having an open transaction
with pytest.raises(
psycopg.OperationalError,
match="server closed the connection unexpectedly|consuming input failed: EOF detected",
):
with coord.transaction() as cur:
cur.execute(
"INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1003) AS a"
)
# simulate crash
coord.stop("immediate")
coord.start()
row_count = coord.sql_value("SELECT count(*) FROM t1")
assert row_count == 0, "columnar didn't recover uncommited transaction"
# test crashing while having a prepared transaction
with pytest.raises(
psycopg.OperationalError,
match="server closed the connection unexpectedly|consuming input failed: EOF detected",
):
with coord.transaction() as cur:
cur.execute(
"INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1004) AS a"
)
cur.execute("PREPARE TRANSACTION 'prepared_xact_crash'")
# simulate crash
coord.stop("immediate")
coord.start()
row_count = coord.sql_value("SELECT count(*) FROM t1")
assert row_count == 0, "columnar didn't recover uncommitted prepared transaction"
coord.sql("COMMIT PREPARED 'prepared_xact_crash'")
row_count = coord.sql_value("SELECT count(*) FROM t1")
assert row_count == 1004, "columnar didn't recover committed transaction"
# test crash recovery with copied data
with coord.cur() as cur:
with cur.copy("COPY t1 FROM STDIN") as copy:
copy.write_row((1, "a"))
copy.write_row((2, "b"))
copy.write_row((3, "c"))
# simulate crash
coord.stop("immediate")
coord.start()
row_count = coord.sql_value("SELECT count(*) FROM t1")
assert row_count == 1007, "columnar didn't recover after copy"