diff --git a/src/test/regress/expected/insert_select_connection_leak.out b/src/test/regress/expected/insert_select_connection_leak.out new file mode 100644 index 000000000..a971c608e --- /dev/null +++ b/src/test/regress/expected/insert_select_connection_leak.out @@ -0,0 +1,103 @@ +CREATE SCHEMA insert_select_connection_leak; +SET search_path TO 'insert_select_connection_leak'; +SET citus.next_shard_id TO 4213581; +SET citus.shard_count TO 64; +CREATE OR REPLACE FUNCTION +worker_connection_count(nodeport int) +RETURNS int AS $$ + SELECT result::int - 1 FROM + run_command_on_workers($Q$select count(*) from pg_stat_activity where backend_type = 'client backend';$Q$) + WHERE nodeport = nodeport +$$ LANGUAGE SQL; +CREATE TABLE source_table(a int, b int); +SELECT create_distributed_table('source_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE target_table(a numeric, b int not null); +SELECT create_distributed_table('target_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_table SELECT i, 2 * i FROM generate_series(1, 10000) i; +EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) + Task Count: 64 + Tasks Shown: One of 64 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_table_4213581 source_table +(8 rows) + +SELECT worker_connection_count(:worker_1_port) AS pre_xact_worker_1_connections, + worker_connection_count(:worker_2_port) AS pre_xact_worker_2_connections \gset +BEGIN; +INSERT INTO target_table SELECT * FROM source_table; +SELECT worker_connection_count(:worker_1_port) AS worker_1_connections, + worker_connection_count(:worker_2_port) AS worker_2_connections \gset +INSERT INTO target_table SELECT * FROM source_table; +INSERT INTO target_table SELECT * FROM source_table; +INSERT INTO target_table SELECT * FROM source_table; +INSERT INTO target_table SELECT * FROM source_table; +SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections, + worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections; + leaked_worker_1_connections | leaked_worker_2_connections +--------------------------------------------------------------------- + 0 | 0 +(1 row) + +END; +SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, + worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; + leaked_worker_1_connections | leaked_worker_2_connections +--------------------------------------------------------------------- + 0 | 0 +(1 row) + +-- ROLLBACK +BEGIN; +INSERT INTO target_table SELECT * FROM source_table; +INSERT INTO target_table SELECT * FROM source_table; +ROLLBACK; +SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, + worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; + leaked_worker_1_connections | leaked_worker_2_connections +--------------------------------------------------------------------- + 0 | 0 +(1 row) + +\set VERBOSITY TERSE +-- Error on constraint failure +BEGIN; +INSERT INTO target_table SELECT * FROM source_table; +SELECT worker_connection_count(:worker_1_port) AS worker_1_connections, + worker_connection_count(:worker_2_port) AS worker_2_connections \gset +SAVEPOINT s1; +INSERT INTO target_table SELECT a, CASE WHEN a < 5000 THEN b ELSE null END FROM source_table; +ERROR: null value in column "b" violates not-null constraint +ROLLBACK TO SAVEPOINT s1; +SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections, + worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections; + leaked_worker_1_connections | leaked_worker_2_connections +--------------------------------------------------------------------- + 0 | 0 +(1 row) + +END; +SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, + worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; + leaked_worker_1_connections | leaked_worker_2_connections +--------------------------------------------------------------------- + 0 | 0 +(1 row) + +SET client_min_messages TO WARNING; +DROP SCHEMA insert_select_connection_leak CASCADE; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index aa5eb2927..1b50e21c5 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -42,10 +42,14 @@ test: multi_read_from_secondaries test: multi_create_table test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select -test: multi_insert_select_window multi_shard_update_delete window_functions dml_recursive recursive_dml_with_different_planners_executors insert_select_repartition +test: multi_shard_update_delete recursive_dml_with_different_planners_executors +test: insert_select_repartition window_functions dml_recursive multi_insert_select_window test: multi_insert_select_conflict test: multi_row_insert +# following should not run in parallel because it relies on connection counts to workers +test: insert_select_connection_leak + # --------- # at the end of the regression tests regaring recursively planned modifications # ensure that we don't leak any intermediate results diff --git a/src/test/regress/sql/insert_select_connection_leak.sql b/src/test/regress/sql/insert_select_connection_leak.sql new file mode 100644 index 000000000..167acd9e5 --- /dev/null +++ b/src/test/regress/sql/insert_select_connection_leak.sql @@ -0,0 +1,70 @@ +CREATE SCHEMA insert_select_connection_leak; +SET search_path TO 'insert_select_connection_leak'; + +SET citus.next_shard_id TO 4213581; +SET citus.shard_count TO 64; + +CREATE OR REPLACE FUNCTION +worker_connection_count(nodeport int) +RETURNS int AS $$ + SELECT result::int - 1 FROM + run_command_on_workers($Q$select count(*) from pg_stat_activity where backend_type = 'client backend';$Q$) + WHERE nodeport = nodeport +$$ LANGUAGE SQL; + +CREATE TABLE source_table(a int, b int); +SELECT create_distributed_table('source_table', 'a'); + +CREATE TABLE target_table(a numeric, b int not null); +SELECT create_distributed_table('target_table', 'a'); + +INSERT INTO source_table SELECT i, 2 * i FROM generate_series(1, 10000) i; + +EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table; + +SELECT worker_connection_count(:worker_1_port) AS pre_xact_worker_1_connections, + worker_connection_count(:worker_2_port) AS pre_xact_worker_2_connections \gset + +BEGIN; +INSERT INTO target_table SELECT * FROM source_table; +SELECT worker_connection_count(:worker_1_port) AS worker_1_connections, + worker_connection_count(:worker_2_port) AS worker_2_connections \gset +INSERT INTO target_table SELECT * FROM source_table; +INSERT INTO target_table SELECT * FROM source_table; +INSERT INTO target_table SELECT * FROM source_table; +INSERT INTO target_table SELECT * FROM source_table; +SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections, + worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections; +END; + +SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, + worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; + +-- ROLLBACK +BEGIN; +INSERT INTO target_table SELECT * FROM source_table; +INSERT INTO target_table SELECT * FROM source_table; +ROLLBACK; + +SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, + worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; + +\set VERBOSITY TERSE + +-- Error on constraint failure +BEGIN; +INSERT INTO target_table SELECT * FROM source_table; +SELECT worker_connection_count(:worker_1_port) AS worker_1_connections, + worker_connection_count(:worker_2_port) AS worker_2_connections \gset +SAVEPOINT s1; +INSERT INTO target_table SELECT a, CASE WHEN a < 5000 THEN b ELSE null END FROM source_table; +ROLLBACK TO SAVEPOINT s1; +SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections, + worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections; +END; + +SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections, + worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections; + +SET client_min_messages TO WARNING; +DROP SCHEMA insert_select_connection_leak CASCADE;