Convert columnar tap tests to pytest

add-pytap
Jelte Fennema 2023-02-14 18:13:56 +01:00
parent 273911ac7f
commit bc44509a76
16 changed files with 783 additions and 440 deletions

View File

@ -860,10 +860,6 @@ workflows:
pg_major: 13
image_tag: '<< pipeline.parameters.pg13_version >>'
requires: [build-13]
- tap-test-citus:
<<: *tap-test-citus-13
name: 'test-13_tap-columnar-freezing'
suite: columnar_freezing
- tap-test-citus: &tap-test-citus-14
name: 'test-14_tap-recovery'
@ -871,10 +867,6 @@ workflows:
pg_major: 14
image_tag: '<< pipeline.parameters.pg14_version >>'
requires: [build-14]
- tap-test-citus:
<<: *tap-test-citus-14
name: 'test-14_tap-columnar-freezing'
suite: columnar_freezing
- tap-test-citus: &tap-test-citus-15
name: 'test-15_tap-recovery'
@ -882,10 +874,6 @@ workflows:
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
requires: [build-15]
- tap-test-citus:
<<: *tap-test-citus-15
name: 'test-15_tap-columnar-freezing'
suite: columnar_freezing
- test-arbitrary-configs:
name: 'test-13_check-arbitrary-configs'
@ -937,7 +925,6 @@ workflows:
- test-13_check-columnar
- test-13_check-columnar-isolation
- test-13_tap-recovery
- test-13_tap-columnar-freezing
- test-13_check-failure
- test-13_check-enterprise
- test-13_check-enterprise-isolation
@ -957,7 +944,6 @@ workflows:
- test-14_check-columnar
- test-14_check-columnar-isolation
- test-14_tap-recovery
- test-14_tap-columnar-freezing
- test-14_check-failure
- test-14_check-enterprise
- test-14_check-enterprise-isolation
@ -977,7 +963,6 @@ workflows:
- test-15_check-columnar
- test-15_check-columnar-isolation
- test-15_tap-recovery
- test-15_tap-columnar-freezing
- test-15_check-failure
- test-15_check-enterprise
- test-15_check-enterprise-isolation

View File

@ -1,56 +0,0 @@
#-------------------------------------------------------------------------
#
# Makefile for src/test/columnar_freezing
#
# Test that columnar freezing works.
#
#-------------------------------------------------------------------------
subdir = src/test/columnar_freezing
top_builddir = ../../..
include $(top_builddir)/Makefile.global
# In PG15, Perl test modules have been moved to a new namespace
# new() and get_new_node() methods have been unified to 1 method: new()
# Relevant PG commits 201a76183e2056c2217129e12d68c25ec9c559c8
# b3b4d8e68ae83f432f43f035c7eb481ef93e1583
pg_version = $(shell $(PG_CONFIG) --version 2>/dev/null)
pg_whole_version = $(shell echo "$(pg_version)"| sed -e 's/^PostgreSQL \([0-9]*\)\(\.[0-9]*\)\{0,1\}\(.*\)/\1\2/')
pg_major_version = $(shell echo "$(pg_whole_version)"| sed -e 's/^\([0-9]\{2\}\)\(.*\)/\1/')
# for now, we only have a single test file
# due to the above explanation, we ended up separating the test paths for
# different versions. If you need to add new test files, be careful to add both versions
ifeq ($(pg_major_version),13)
test_path = t_pg13_pg14/*.pl
else ifeq ($(pg_major_version),14)
test_path = t_pg13_pg14/*.pl
else
test_path = t/*.pl
endif
# copied from pgxs/Makefile.global to use postgres' abs build dir for pg_regress
ifeq ($(enable_tap_tests),yes)
define citus_prove_installcheck
rm -rf '$(CURDIR)'/tmp_check
$(MKDIR_P) '$(CURDIR)'/tmp_check
cd $(srcdir) && \
TESTDIR='$(CURDIR)' \
PATH="$(bindir):$$PATH" \
PGPORT='6$(DEF_PGPORT)' \
top_builddir='$(CURDIR)/$(top_builddir)' \
PG_REGRESS='$(pgxsdir)/src/test/regress/pg_regress' \
TEMP_CONFIG='$(CURDIR)'/postgresql.conf \
$(PROVE) $(PG_PROVE_FLAGS) $(PROVE_FLAGS) $(if $(PROVE_TESTS),$(PROVE_TESTS),$(test_path))
endef
else
citus_prove_installcheck = @echo "TAP tests not enabled when postgres was compiled"
endif
installcheck:
$(citus_prove_installcheck)
clean distclean maintainer-clean:
rm -rf tmp_check

View File

@ -1,7 +0,0 @@
shared_preload_libraries=citus
shared_preload_libraries='citus'
vacuum_freeze_min_age = 50000
vacuum_freeze_table_age = 50000
synchronous_commit = off
fsync = off

View File

@ -1,52 +0,0 @@
# Minimal test testing streaming replication
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More tests => 2;
# Initialize single node
my $node_one = PostgreSQL::Test::Cluster->new('node_one');
$node_one->init();
$node_one->start;
# initialize the citus extension
$node_one->safe_psql('postgres', "CREATE EXTENSION citus;");
# create columnar table and insert simple data to verify the data survives a crash
$node_one->safe_psql('postgres', "
CREATE TABLE test_row(i int);
INSERT INTO test_row VALUES (1);
CREATE TABLE test_columnar_freeze(i int) USING columnar WITH(autovacuum_enabled=false);
INSERT INTO test_columnar_freeze VALUES (1);
");
my $ten_thousand_updates = "";
foreach (1..10000) {
$ten_thousand_updates .= "UPDATE test_row SET i = i + 1;\n";
}
# 70K updates
foreach (1..7) {
$node_one->safe_psql('postgres', $ten_thousand_updates);
}
my $result = $node_one->safe_psql('postgres', "
select age(relfrozenxid) < 70000 as was_frozen
from pg_class where relname='test_columnar_freeze';
");
print "node one count: $result\n";
is($result, qq(f), 'columnar table was not frozen');
$node_one->safe_psql('postgres', 'VACUUM FREEZE test_columnar_freeze;');
$result = $node_one->safe_psql('postgres', "
select age(relfrozenxid) < 70000 as was_frozen
from pg_class where relname='test_columnar_freeze';
");
print "node one count: $result\n";
is($result, qq(t), 'columnar table was frozen');
$node_one->stop('fast');

View File

@ -1,52 +0,0 @@
# Minimal test testing streaming replication
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 2;
# Initialize single node
my $node_one = get_new_node('node_one');
$node_one->init();
$node_one->start;
# initialize the citus extension
$node_one->safe_psql('postgres', "CREATE EXTENSION citus;");
# create columnar table and insert simple data to verify the data survives a crash
$node_one->safe_psql('postgres', "
CREATE TABLE test_row(i int);
INSERT INTO test_row VALUES (1);
CREATE TABLE test_columnar_freeze(i int) USING columnar WITH(autovacuum_enabled=false);
INSERT INTO test_columnar_freeze VALUES (1);
");
my $ten_thousand_updates = "";
foreach (1..10000) {
$ten_thousand_updates .= "UPDATE test_row SET i = i + 1;\n";
}
# 70K updates
foreach (1..7) {
$node_one->safe_psql('postgres', $ten_thousand_updates);
}
my $result = $node_one->safe_psql('postgres', "
select age(relfrozenxid) < 70000 as was_frozen
from pg_class where relname='test_columnar_freeze';
");
print "node one count: $result\n";
is($result, qq(f), 'columnar table was not frozen');
$node_one->safe_psql('postgres', 'VACUUM FREEZE test_columnar_freeze;');
$result = $node_one->safe_psql('postgres', "
select age(relfrozenxid) < 70000 as was_frozen
from pg_class where relname='test_columnar_freeze';
");
print "node one count: $result\n";
is($result, qq(t), 'columnar table was frozen');
$node_one->stop('fast');

View File

@ -1,2 +0,0 @@
# Generated by test suite
/tmp_check/

View File

@ -1,58 +0,0 @@
#-------------------------------------------------------------------------
#
# Makefile for src/test/recovery
#
# Losely based on the makefile found in postgres' src/test/recovery.
# We need to define our own invocation of prove to pass the correct path
# to pg_regress and include citus in the shared preload libraries.
#
#-------------------------------------------------------------------------
subdir = src/test/recovery
top_builddir = ../../..
include $(top_builddir)/Makefile.global
# In PG15, Perl test modules have been moved to a new namespace
# new() and get_new_node() methods have been unified to 1 method: new()
# Relevant PG commits 201a76183e2056c2217129e12d68c25ec9c559c8
# b3b4d8e68ae83f432f43f035c7eb481ef93e1583
pg_version = $(shell $(PG_CONFIG) --version 2>/dev/null)
pg_whole_version = $(shell echo "$(pg_version)"| sed -e 's/^PostgreSQL \([0-9]*\)\(\.[0-9]*\)\{0,1\}\(.*\)/\1\2/')
pg_major_version = $(shell echo "$(pg_whole_version)"| sed -e 's/^\([0-9]\{2\}\)\(.*\)/\1/')
# for now, we only have a single test file
# due to the above explanation, we ended up separating the test paths for
# different versions. If you need to add new test files, be careful to add both versions
ifeq ($(pg_major_version),13)
test_path = t_pg13_pg14/*.pl
else ifeq ($(pg_major_version),14)
test_path = t_pg13_pg14/*.pl
else
test_path = t/*.pl
endif
# copied from pgxs/Makefile.global to use postgres' abs build dir for pg_regress
ifeq ($(enable_tap_tests),yes)
define citus_prove_installcheck
rm -rf '$(CURDIR)'/tmp_check
$(MKDIR_P) '$(CURDIR)'/tmp_check
cd $(srcdir) && \
TESTDIR='$(CURDIR)' \
PATH="$(bindir):$$PATH" \
PGPORT='6$(DEF_PGPORT)' \
top_builddir='$(CURDIR)/$(top_builddir)' \
PG_REGRESS='$(pgxsdir)/src/test/regress/pg_regress' \
TEMP_CONFIG='$(CURDIR)'/postgresql.conf \
$(PROVE) $(PG_PROVE_FLAGS) $(PROVE_FLAGS) $(if $(PROVE_TESTS),$(PROVE_TESTS),$(test_path))
endef
else
citus_prove_installcheck = @echo "TAP tests not enabled when postgres was compiled"
endif
installcheck:
$(citus_prove_installcheck)
clean distclean maintainer-clean:
rm -rf tmp_check

View File

@ -1 +0,0 @@
shared_preload_libraries=citus

View File

@ -1,98 +0,0 @@
# Minimal test testing streaming replication
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More tests => 6;
# Initialize single node
my $node_one = PostgreSQL::Test::Cluster->new('node_one');
$node_one->init();
$node_one->start;
# initialize the citus extension
$node_one->safe_psql('postgres', "CREATE EXTENSION citus;");
# create columnar table and insert simple data to verify the data survives a crash
$node_one->safe_psql('postgres', "
BEGIN;
CREATE TABLE t1 (a int, b text) USING columnar;
INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1002) AS a;
COMMIT;
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
my $result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(1002), 'columnar recovered data from before crash');
# truncate the table to verify the truncation survives a crash
$node_one->safe_psql('postgres', "
TRUNCATE t1;
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(0), 'columnar recovered truncation');
# test crashing while having an open transaction
$node_one->safe_psql('postgres', "
BEGIN;
INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1003) AS a;
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(0), 'columnar crash during uncommitted transaction');
# test crashing while having a prepared transaction
$node_one->safe_psql('postgres', "
BEGIN;
INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1004) AS a;
PREPARE TRANSACTION 'prepared_xact_crash';
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(0), 'columnar crash during prepared transaction (before commit)');
$node_one->safe_psql('postgres', "
COMMIT PREPARED 'prepared_xact_crash';
");
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(1004), 'columnar crash during prepared transaction (after commit)');
# test crash recovery with copied data
$node_one->safe_psql('postgres', "
\\copy t1 FROM stdin delimiter ','
1,a
2,b
3,c
\\.
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(1007), 'columnar crash after copy command');

View File

@ -1,98 +0,0 @@
# Minimal test testing streaming replication
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 6;
# Initialize single node
my $node_one = get_new_node('node_one');
$node_one->init();
$node_one->start;
# initialize the citus extension
$node_one->safe_psql('postgres', "CREATE EXTENSION citus;");
# create columnar table and insert simple data to verify the data survives a crash
$node_one->safe_psql('postgres', "
BEGIN;
CREATE TABLE t1 (a int, b text) USING columnar;
INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1002) AS a;
COMMIT;
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
my $result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(1002), 'columnar recovered data from before crash');
# truncate the table to verify the truncation survives a crash
$node_one->safe_psql('postgres', "
TRUNCATE t1;
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(0), 'columnar recovered truncation');
# test crashing while having an open transaction
$node_one->safe_psql('postgres', "
BEGIN;
INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1003) AS a;
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(0), 'columnar crash during uncommitted transaction');
# test crashing while having a prepared transaction
$node_one->safe_psql('postgres', "
BEGIN;
INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1004) AS a;
PREPARE TRANSACTION 'prepared_xact_crash';
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(0), 'columnar crash during prepared transaction (before commit)');
$node_one->safe_psql('postgres', "
COMMIT PREPARED 'prepared_xact_crash';
");
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(1004), 'columnar crash during prepared transaction (after commit)');
# test crash recovery with copied data
$node_one->safe_psql('postgres', "
\\copy t1 FROM stdin delimiter ','
1,a
2,b
3,c
\\.
");
# simulate crash
$node_one->stop('immediate');
$node_one->start;
$result = $node_one->safe_psql('postgres', "SELECT count(*) FROM t1;");
print "node one count: $result\n";
is($result, qq(1007), 'columnar crash after copy command');

View File

@ -8,6 +8,12 @@ mitmproxy = {editable = true, ref = "fix/tcp-flow-kill", git = "https://github.c
construct = "==2.9.45"
docopt = "==0.6.2"
cryptography = "==3.4.8"
pytest = "*"
psycopg = "*"
filelock = "*"
pytest-asyncio = "*"
pytest-timeout = "*"
pytest-xdist = "*"
[dev-packages]
black = "*"

View File

@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "635b4c111e3bca87373fcdf308febf0a816dde15b14f6bf078f2b456630e5ef1"
"sha256": "32d824bcbf219cee2dc590c861bf7857235694d8875275ac3a06fc89982f63ff"
},
"pipfile-spec": 6,
"requires": {
@ -24,6 +24,14 @@
"markers": "python_version >= '3.6'",
"version": "==3.4.1"
},
"attrs": {
"hashes": [
"sha256:29e95c7f6778868dbd49170f98f8818f78f3dc5e0e37c0b1f474e3561b240836",
"sha256:c9227bfc2f01993c03f68db37d1d15c9690188323c067c641f1a35ca58185f99"
],
"markers": "python_version >= '3.6'",
"version": "==22.2.0"
},
"blinker": {
"hashes": [
"sha256:471aee25f3992bd325afa3772f1063dbdbbca947a041b8b89466dc00d606f8b6"
@ -241,6 +249,30 @@
"index": "pypi",
"version": "==0.6.2"
},
"exceptiongroup": {
"hashes": [
"sha256:327cbda3da756e2de031a3107b81ab7b3770a602c4d16ca618298c526f4bec1e",
"sha256:bcb67d800a4497e1b404c2dd44fca47d3b7a5e5433dbab67f96c1a685cdfdf23"
],
"markers": "python_version < '3.11'",
"version": "==1.1.0"
},
"execnet": {
"hashes": [
"sha256:8f694f3ba9cc92cab508b152dcfe322153975c29bda272e2fd7f3f00f36e47c5",
"sha256:a295f7cc774947aac58dde7fdc85f4aa00c42adf5d8f5468fc630c1acf30a142"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'",
"version": "==1.9.0"
},
"filelock": {
"hashes": [
"sha256:7b319f24340b51f55a2bf7a12ac0755a9b03e718311dac567a0f4f7fabd2f5de",
"sha256:f58d535af89bb9ad5cd4df046f741f8553a418c01a7856bf0d173bbc9f6bd16d"
],
"index": "pypi",
"version": "==3.9.0"
},
"flask": {
"hashes": [
"sha256:59da8a3170004800a2837844bfa84d49b022550616070f7cb1a659682b2e7c9f",
@ -281,6 +313,14 @@
"markers": "python_full_version >= '3.6.1'",
"version": "==6.0.1"
},
"iniconfig": {
"hashes": [
"sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3",
"sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"
],
"markers": "python_version >= '3.7'",
"version": "==2.0.0"
},
"itsdangerous": {
"hashes": [
"sha256:2c2349112351b88699d8d4b6b075022c0808887cb7ad10069318a8b0bc88db44",
@ -431,6 +471,14 @@
],
"version": "==1.0.4"
},
"packaging": {
"hashes": [
"sha256:714ac14496c3e68c99c29b00845f7a2b85f3bb6f1078fd9f72fd20f0570002b2",
"sha256:b6ad297f8907de0fa2fe1ccbd26fdaf387f5f47c7275fedf8cce89f99446cf97"
],
"markers": "python_version >= '3.7'",
"version": "==23.0"
},
"passlib": {
"hashes": [
"sha256:aa6bca462b8d8bda89c70b382f0c298a20b5560af6cbfa2dce410c0a2fb669f1",
@ -438,6 +486,14 @@
],
"version": "==1.7.4"
},
"pluggy": {
"hashes": [
"sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159",
"sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"
],
"markers": "python_version >= '3.6'",
"version": "==1.0.0"
},
"protobuf": {
"hashes": [
"sha256:0c44e01f74109decea196b5b313b08edb5316df77313995594a6981e95674259",
@ -465,6 +521,14 @@
"markers": "python_version >= '3.5'",
"version": "==3.18.3"
},
"psycopg": {
"hashes": [
"sha256:59b4a71536b146925513c0234dfd1dc42b81e65d56ce5335dff4813434dbc113",
"sha256:b1500c42063abaa01d30b056f0b300826b8dd8d586900586029a294ce74af327"
],
"index": "pypi",
"version": "==3.1.8"
},
"publicsuffix2": {
"hashes": [
"sha256:00f8cc31aa8d0d5592a5ced19cccba7de428ebca985db26ac852d920ddd6fe7b",
@ -520,6 +584,38 @@
],
"version": "==1.8.2"
},
"pytest": {
"hashes": [
"sha256:c7c6ca206e93355074ae32f7403e8ea12163b1163c976fee7d4d84027c162be5",
"sha256:d45e0952f3727241918b8fd0f376f5ff6b301cc0777c6f9a556935c92d8a7d42"
],
"index": "pypi",
"version": "==7.2.1"
},
"pytest-asyncio": {
"hashes": [
"sha256:83cbf01169ce3e8eb71c6c278ccb0574d1a7a3bb8eaaf5e50e0ad342afb33b36",
"sha256:f129998b209d04fcc65c96fc85c11e5316738358909a8399e93be553d7656442"
],
"index": "pypi",
"version": "==0.20.3"
},
"pytest-timeout": {
"hashes": [
"sha256:c07ca07404c612f8abbe22294b23c368e2e5104b521c1790195561f37e1ac3d9",
"sha256:f6f50101443ce70ad325ceb4473c4255e9d74e3c7cd0ef827309dfa4c0d975c6"
],
"index": "pypi",
"version": "==2.1.0"
},
"pytest-xdist": {
"hashes": [
"sha256:336098e3bbd8193276867cc87db8b22903c3927665dff9d1ac8684c02f597b68",
"sha256:fa10f95a2564cd91652f2d132725183c3b590d9fdcdec09d3677386ecf4c1ce9"
],
"index": "pypi",
"version": "==3.2.0"
},
"ruamel.yaml": {
"hashes": [
"sha256:1a771fc92d3823682b7f0893ad56cb5a5c87c48e62b5399d6f42c8759a583b33",
@ -543,6 +639,14 @@
],
"version": "==2.4.0"
},
"tomli": {
"hashes": [
"sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc",
"sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"
],
"markers": "python_version < '3.11'",
"version": "==2.0.1"
},
"tornado": {
"hashes": [
"sha256:1d54d13ab8414ed44de07efecb97d4ef7c39f7438cf5e976ccd356bebb1b5fca",
@ -560,6 +664,14 @@
"markers": "python_version >= '3.7'",
"version": "==6.2"
},
"typing-extensions": {
"hashes": [
"sha256:1511434bb92bf8dd198c12b1cc812e800d4181cfcb867674e0f8279cc93087aa",
"sha256:16fa4864408f655d35ec496218b85f79b3437c829e93320c7c9215ccfd92489e"
],
"markers": "python_version >= '3.7'",
"version": "==4.4.0"
},
"urwid": {
"hashes": [
"sha256:588bee9c1cb208d0906a9f73c613d2bd32c3ed3702012f51efe318a3f2127eae"

View File

View File

@ -0,0 +1,43 @@
import pytest
import os
import filelock
from .utils import Postgres
@pytest.fixture(scope="session")
def coordinator_session(tmp_path_factory):
"""Starts a new Postgres db that is shared for tests in this process"""
pg = Postgres(tmp_path_factory.getbasetemp() / "coordinator" / "pgdata")
pg.initdb()
os.truncate(pg.hba_path, 0)
pg.ssl_access("all", "trust")
pg.nossl_access("all", "trust")
pg.commit_hba()
pg.start()
pg.sql("CREATE EXTENSION citus")
yield pg
pg.cleanup()
@pytest.fixture(name="coord")
def coordinator(coordinator_session):
"""Sets up a clean Postgres for this test, using the session Postgres
It als prints the Postgres logs that were created during the test. This can
be useful for debugging a failure.
"""
pg = coordinator_session
# Resets any changes to Postgres settings from previous tests
pg.reset_hba()
os.truncate(pg.pgdata / "postgresql.auto.conf", 0)
pg.reload()
with pg.log_path.open() as f:
f.seek(0, os.SEEK_END)
yield pg
print("\n\nPG_LOG\n")
print(f.read())

View File

@ -0,0 +1,109 @@
import pytest
import psycopg
def test_freezing(coord):
coord.configure("vacuum_freeze_min_age = 50000")
coord.configure("vacuum_freeze_table_age = 50000")
coord.restart()
# create columnar table and insert simple data to verify the data survives
# a crash
coord.sql("CREATE TABLE test_row(i int)")
coord.sql("INSERT INTO test_row VALUES (1) ")
coord.sql(
"CREATE TABLE test_columnar_freeze(i int) USING columnar WITH(autovacuum_enabled=false)"
)
coord.sql("INSERT INTO test_columnar_freeze VALUES (1)")
for _ in range(0, 7):
with coord.cur() as cur:
for _ in range(0, 10_000):
cur.execute("UPDATE test_row SET i = i + 1")
frozen_age = coord.sql_value(
"""
select age(relfrozenxid)
from pg_class where relname='test_columnar_freeze';
"""
)
assert frozen_age > 70000, "columnar table was frozen"
coord.sql("VACUUM FREEZE test_columnar_freeze")
frozen_age = coord.sql_value(
"""
select age(relfrozenxid)
from pg_class where relname='test_columnar_freeze';
"""
)
assert frozen_age < 70000, "columnar table was not frozen"
def test_recovery(coord):
coord.sql("CREATE TABLE t1 (a int, b text) USING columnar")
coord.sql(
"INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1002) AS a"
)
# simulate crash
coord.stop("immediate")
coord.start()
row_count = coord.sql_value("SELECT count(*) FROM t1")
assert row_count == 1002, "columnar didn't recover data before crash correctly"
coord.sql("TRUNCATE t1")
# simulate crash
coord.stop("immediate")
coord.start()
row_count = coord.sql_value("SELECT count(*) FROM t1")
assert row_count == 0, "columnar didn't recover the truncate correctly"
with pytest.raises(
psycopg.OperationalError, match="server closed the connection unexpectedly"
):
with coord.transaction() as cur:
cur.execute(
"INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1003) AS a"
)
# simulate crash
coord.stop("immediate")
coord.start()
row_count = coord.sql_value("SELECT count(*) FROM t1")
assert row_count == 0, "columnar didn't recover uncommited transaction"
with pytest.raises(
psycopg.OperationalError, match="server closed the connection unexpectedly"
):
with coord.transaction() as cur:
cur.execute(
"INSERT INTO t1 SELECT a, 'hello world ' || a FROM generate_series(1,1004) AS a"
)
cur.execute("PREPARE TRANSACTION 'prepared_xact_crash'")
# simulate crash
coord.stop("immediate")
coord.start()
row_count = coord.sql_value("SELECT count(*) FROM t1")
assert row_count == 0, "columnar didn't recover uncommitted prepared transaction"
coord.sql("COMMIT PREPARED 'prepared_xact_crash'")
row_count = coord.sql_value("SELECT count(*) FROM t1")
assert row_count == 1004, "columnar didn't recover committed transaction"
with coord.cur() as cur:
with cur.copy("COPY t1 FROM STDIN") as copy:
copy.write_row((1, 'a'))
copy.write_row((2, 'b'))
copy.write_row((3, 'c'))
# simulate crash
coord.stop("immediate")
coord.start()
row_count = coord.sql_value("SELECT count(*) FROM t1")
assert row_count == 1007, "columnar didn't recover after copy"

View File

@ -0,0 +1,512 @@
from pathlib import Path
import subprocess
from contextlib import contextmanager, closing
from contextlib import asynccontextmanager
import psycopg
import os
import re
import sys
import time
import asyncio
import socket
from tempfile import gettempdir
import filelock
import typing
import platform
ENABLE_VALGRIND = bool(os.environ.get("ENABLE_VALGRIND"))
USE_SUDO = bool(os.environ.get("USE_SUDO"))
LINUX = False
MACOS = False
FREEBSD = False
OPENBSD = False
if platform.system() == "Linux":
LINUX = True
elif platform.system() == "Darwin":
MACOS = True
elif platform.system() == "FreeBSD":
FREEBSD = True
elif platform.system() == "OpenBSD":
OPENBSD = True
BSD = MACOS or FREEBSD or OPENBSD
def eprint(*args, **kwargs):
"""eprint prints to stderr"""
print(*args, file=sys.stderr, **kwargs)
def run(command, *args, check=True, shell=True, silent=False, **kwargs):
"""run runs the given command and prints it to stderr"""
if not silent:
eprint(f"+ {command} ")
if silent:
kwargs.setdefault("stdout", subprocess.DEVNULL)
return subprocess.run(command, *args, check=check, shell=shell, **kwargs)
def sudo(command, *args, shell=True, **kwargs):
"""
A version of run that prefixes the command with sudo when the process is
not already run as root
"""
effective_user_id = os.geteuid()
if effective_user_id == 0:
return run(command, *args, shell=shell, **kwargs)
if shell:
return run(f"sudo {command}", *args, shell=shell, **kwargs)
else:
return run(["sudo", *command])
def get_pg_major_version():
full_version_string = run(
"initdb --version", stdout=subprocess.PIPE, encoding="utf-8", silent=True
).stdout
major_version_string = re.search("[0-9]+", full_version_string)
assert major_version_string is not None
return int(major_version_string.group(0))
PG_MAJOR_VERSION = get_pg_major_version()
# this is out of ephemeral port range for many systems hence
# it is a lower change that it will conflict with "in-use" ports
PORT_LOWER_BOUND = 10200
# ephemeral port start on many Linux systems
PORT_UPPER_BOUND = 32768
next_port = PORT_LOWER_BOUND
class PortLock:
def __init__(self):
global next_port
while True:
next_port += 1
if next_port >= PORT_UPPER_BOUND:
next_port = PORT_LOWER_BOUND
self.lock = filelock.FileLock(Path(gettempdir()) / f"port-{next_port}.lock")
try:
self.lock.acquire(timeout=0)
except filelock.Timeout:
continue
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
try:
s.bind(("127.0.0.1", next_port))
self.port = next_port
break
except Exception:
continue
def release(self):
self.lock.release()
class QueryRunner:
def __init__(self, host, port):
self.host = host
self.port = port
self.default_db = "postgres"
self.default_user = "postgres"
def set_default_connection_options(self, options):
options.setdefault("dbname", self.default_db)
options.setdefault("user", self.default_user)
if ENABLE_VALGRIND:
# If valgrind is enabled PgBouncer is a significantly slower to
# respond to connection requests, so we wait a little longer.
options.setdefault("connect_timeout", 20)
else:
options.setdefault("connect_timeout", 3)
# needed for Ubuntu 18.04
options.setdefault("client_encoding", "UTF8")
def conn(self, *, autocommit=True, **kwargs):
"""Open a psycopg connection to this server"""
self.set_default_connection_options(kwargs)
return psycopg.connect(
autocommit=autocommit,
host=self.host,
port=self.port,
**kwargs,
)
def aconn(self, *, autocommit=True, **kwargs):
"""Open an asynchronous psycopg connection to this server"""
self.set_default_connection_options(kwargs)
return psycopg.AsyncConnection.connect(
autocommit=autocommit,
host=self.host,
port=self.port,
**kwargs,
)
@contextmanager
def cur(self, autocommit=True, **kwargs):
"""Open an psycopg cursor to this server
The connection and the cursors automatically close once you leave the
"with" block
"""
with self.conn(
autocommit=autocommit,
**kwargs,
) as conn:
with conn.cursor() as cur:
yield cur
@asynccontextmanager
async def acur(self, **kwargs):
"""Open an asynchronous psycopg cursor to this server
The connection and the cursors automatically close once you leave the
"async with" block
"""
async with await self.aconn(**kwargs) as conn:
async with conn.cursor() as cur:
yield cur
def sql(self, query, params=None, **kwargs):
"""Run an SQL query
This opens a new connection and closes it once the query is done
"""
with self.cur(**kwargs) as cur:
cur.execute(query, params=params)
def sql_value(self, query, params=None, **kwargs):
"""Run an SQL query that returns a single cell and return this value
This opens a new connection and closes it once the query is done
"""
with self.cur(**kwargs) as cur:
cur.execute(query, params=params)
result = cur.fetchall()
assert len(result) == 1
assert len(result[0]) == 1
value = result[0][0]
return value
def asql(self, query, **kwargs):
"""Run an SQL query in asynchronous task
This opens a new connection and closes it once the query is done
"""
return asyncio.ensure_future(self.asql_coroutine(query, **kwargs))
async def asql_coroutine(
self, query, params=None, **kwargs
) -> typing.Optional[typing.List[typing.Any]]:
async with self.acur(**kwargs) as cur:
await cur.execute(query, params=params)
try:
return await cur.fetchall()
except psycopg.ProgrammingError as e:
if "the last operation didn't produce a result" == str(e):
return None
raise
def psql(self, query, **kwargs):
"""Run an SQL query using psql instead of psycopg
This opens a new connection and closes it once the query is done
"""
self.set_default_connection_options(kwargs)
connect_options = " ".join([f"{k}={v}" for k, v in kwargs.items()])
run(
["psql", f"port={self.port} {connect_options}", "-c", query],
shell=False,
silent=True,
)
@contextmanager
def transaction(self, **kwargs):
with self.cur(**kwargs) as cur:
with cur.connection.transaction():
yield cur
def sleep(self, duration=3, **kwargs):
"""Run pg_sleep"""
return self.sql(f"select pg_sleep({duration})", **kwargs)
def asleep(self, duration=3, times=1, sequentially=False, **kwargs):
"""Run pg_sleep asynchronously in a task.
times:
You can create a single task that opens multiple connections, which
run pg_sleep concurrently. The asynchronous task will only complete
once all these pg_sleep calls are finished.
sequentially:
Instead of running all pg_sleep calls spawned by providing
times > 1 concurrently, this will run them sequentially.
"""
return asyncio.ensure_future(
self.asleep_coroutine(
duration=duration, times=times, sequentially=sequentially, **kwargs
)
)
async def asleep_coroutine(self, duration=3, times=1, sequentially=False, **kwargs):
"""This is the coroutine that the asleep task runs internally"""
if not sequentially:
await asyncio.gather(
*[
self.asql(f"select pg_sleep({duration})", **kwargs)
for _ in range(times)
]
)
else:
for _ in range(times):
await self.asql(f"select pg_sleep({duration})", **kwargs)
def test(self, **kwargs):
"""Test if you can connect"""
return self.sql("select 1", **kwargs)
def atest(self, **kwargs):
"""Test if you can connect asynchronously"""
return self.asql("select 1", **kwargs)
def psql_test(self, **kwargs):
"""Test if you can connect with psql instead of psycopg"""
return self.psql("select 1", **kwargs)
@contextmanager
def enable_firewall(self):
"""Enables the firewall for the platform that you are running
Normally this should not be called directly, and instead drop_traffic
or reject_traffic should be used.
"""
fw_token = None
if BSD:
if MACOS:
command_stderr = sudo(
f"pfctl -E", stderr=subprocess.PIPE, text=True
).stderr
match = re.search(r"^Token : (\d+)", command_stderr, flags=re.MULTILINE)
assert match is not None
fw_token = match.group(1)
sudo(
'bash -c "'
f"echo 'anchor \\\"port_{self.port}\\\"'"
f' | pfctl -a pgbouncer_test -f -"'
)
try:
yield
finally:
if MACOS:
sudo(f"pfctl -X {fw_token}")
@contextmanager
def drop_traffic(self):
"""Drops all TCP packets to this query runner"""
with self.enable_firewall():
if LINUX:
sudo(
"iptables --append OUTPUT "
"--protocol tcp "
f"--destination {self.host} "
f"--destination-port {self.port} "
"--jump DROP "
)
elif BSD:
sudo(
"bash -c '"
f'echo "block drop out proto tcp from any to {self.host} port {self.port}"'
f"| pfctl -a pgbouncer_test/port_{self.port} -f -'"
)
else:
raise Exception("This OS cannot run this test")
try:
yield
finally:
if LINUX:
sudo(
"iptables --delete OUTPUT "
"--protocol tcp "
f"--destination {self.host} "
f"--destination-port {self.port} "
"--jump DROP "
)
elif BSD:
sudo(f"pfctl -a pgbouncer_test/port_{self.port} -F all")
@contextmanager
def reject_traffic(self):
"""Rejects all traffic to this query runner with a TCP RST message"""
with self.enable_firewall():
if LINUX:
sudo(
"iptables --append OUTPUT "
"--protocol tcp "
f"--destination {self.host} "
f"--destination-port {self.port} "
"--jump REJECT "
"--reject-with tcp-reset"
)
elif BSD:
sudo(
"bash -c '"
f'echo "block return-rst out out proto tcp from any to {self.host} port {self.port}"'
f"| pfctl -a pgbouncer_test/port_{self.port} -f -'"
)
else:
raise Exception("This OS cannot run this test")
try:
yield
finally:
if LINUX:
sudo(
"iptables --delete OUTPUT "
"--protocol tcp "
f"--destination {self.host} "
f"--destination-port {self.port} "
"--jump REJECT "
"--reject-with tcp-reset"
)
elif BSD:
sudo(f"pfctl -a pgbouncer_test/port_{self.port} -F all")
class Postgres(QueryRunner):
def __init__(self, pgdata):
self.port_lock = PortLock()
super().__init__("127.0.0.1", self.port_lock.port)
self.pgdata = pgdata
self.log_path = self.pgdata / "pg.log"
self.connections = {}
self.cursors = {}
def initdb(self):
run(
f"initdb -A trust --nosync --username postgres --pgdata {self.pgdata}",
stdout=subprocess.DEVNULL,
)
with self.conf_path.open(mode="a") as pgconf:
pgconf.write("unix_socket_directories = '/tmp'\n")
pgconf.write("log_connections = on\n")
pgconf.write("log_disconnections = on\n")
pgconf.write("logging_collector = off\n")
pgconf.write("shared_preload_libraries = 'citus'\n")
# We need to make the log go to stderr so that the tests can
# check what is being logged. This should be the default, but
# some packagings change the default configuration.
pgconf.write("log_destination = stderr\n")
# This makes tests run faster and we don't care about crash safety
# of our test data.
pgconf.write("fsync = false\n")
def pgctl(self, command, **kwargs):
run(f"pg_ctl -w --pgdata {self.pgdata} {command}", **kwargs)
def apgctl(self, command, **kwargs):
return asyncio.create_subprocess_shell(
f"pg_ctl -w --pgdata {self.pgdata} {command}", **kwargs
)
def start(self):
try:
self.pgctl(f'-o "-p {self.port}" -l {self.log_path} start')
except Exception:
print("\n\nPG_LOG\n")
with self.log_path.open() as f:
print(f.read())
raise
def stop(self, mode="fast"):
self.pgctl(f"-m {mode} stop", check=False)
def cleanup(self):
self.stop()
self.port_lock.release()
def restart(self):
self.stop()
self.start()
def reload(self):
self.pgctl("reload")
time.sleep(1)
async def arestart(self):
process = await self.apgctl("-m fast restart")
await process.communicate()
def nossl_access(self, dbname, auth_type):
"""Prepends a local non-SSL access to the HBA file"""
with self.hba_path.open() as pghba:
old_contents = pghba.read()
with self.hba_path.open(mode="w") as pghba:
pghba.write(f"local {dbname} all {auth_type}\n")
pghba.write(f"hostnossl {dbname} all 127.0.0.1/32 {auth_type}\n")
pghba.write(f"hostnossl {dbname} all ::1/128 {auth_type}\n")
pghba.write(old_contents)
def ssl_access(self, dbname, auth_type):
"""Prepends a local SSL access rule to the HBA file"""
with self.hba_path.open() as pghba:
old_contents = pghba.read()
with self.hba_path.open(mode="w") as pghba:
pghba.write(f"hostssl {dbname} all 127.0.0.1/32 {auth_type}\n")
pghba.write(f"hostssl {dbname} all ::1/128 {auth_type}\n")
pghba.write(old_contents)
@property
def hba_path(self):
return self.pgdata / "pg_hba.conf"
@property
def conf_path(self):
return self.pgdata / "postgresql.conf"
def commit_hba(self):
"""Mark the current HBA contents as non-resetable by reset_hba"""
with self.hba_path.open() as pghba:
old_contents = pghba.read()
with self.hba_path.open(mode="w") as pghba:
pghba.write("# committed-rules\n")
pghba.write(old_contents)
def reset_hba(self):
"""Remove any HBA rules that were added after the last call to commit_hba"""
with self.hba_path.open() as f:
hba_contents = f.read()
committed = hba_contents[hba_contents.find("# committed-rules\n") :]
with self.hba_path.open("w") as f:
f.write(committed)
async def delayed_start(self, delay=1):
"""Start Postgres after a delay
NOTE: The sleep is asynchronous, but while waiting for Postgres to
start the pg_ctl start command will block the event loop. This is
currently acceptable for our usage of this method in the existing
tests and this way it was easiest to implement. However, it seems
totally reasonable to change this behaviour in the future if necessary.
"""
await asyncio.sleep(delay)
self.start()
def configure(self, config):
"""Configure specific Postgres settings using ALTER SYSTEM SET
NOTE: after configuring a call to reload or restart is needed for the
settings to become effective.
"""
self.sql(f"alter system set {config}")