mirror of https://github.com/citusdata/citus.git
Merge pull request #3448 from citusdata/insert_select_leak
Add insert/select connection leak testspull/3449/head
commit
b0f9f94a52
|
@ -0,0 +1,103 @@
|
||||||
|
CREATE SCHEMA insert_select_connection_leak;
|
||||||
|
SET search_path TO 'insert_select_connection_leak';
|
||||||
|
SET citus.next_shard_id TO 4213581;
|
||||||
|
SET citus.shard_count TO 64;
|
||||||
|
CREATE OR REPLACE FUNCTION
|
||||||
|
worker_connection_count(nodeport int)
|
||||||
|
RETURNS int AS $$
|
||||||
|
SELECT result::int - 1 FROM
|
||||||
|
run_command_on_workers($Q$select count(*) from pg_stat_activity where backend_type = 'client backend';$Q$)
|
||||||
|
WHERE nodeport = nodeport
|
||||||
|
$$ LANGUAGE SQL;
|
||||||
|
CREATE TABLE source_table(a int, b int);
|
||||||
|
SELECT create_distributed_table('source_table', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE target_table(a numeric, b int not null);
|
||||||
|
SELECT create_distributed_table('target_table', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source_table SELECT i, 2 * i FROM generate_series(1, 10000) i;
|
||||||
|
EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
|
INSERT/SELECT method: repartition
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 64
|
||||||
|
Tasks Shown: One of 64
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_table_4213581 source_table
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
SELECT worker_connection_count(:worker_1_port) AS pre_xact_worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) AS pre_xact_worker_2_connections \gset
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
SELECT worker_connection_count(:worker_1_port) AS worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) AS worker_2_connections \gset
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections;
|
||||||
|
leaked_worker_1_connections | leaked_worker_2_connections
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
END;
|
||||||
|
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
|
||||||
|
leaked_worker_1_connections | leaked_worker_2_connections
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- ROLLBACK
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
ROLLBACK;
|
||||||
|
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
|
||||||
|
leaked_worker_1_connections | leaked_worker_2_connections
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\set VERBOSITY TERSE
|
||||||
|
-- Error on constraint failure
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
SELECT worker_connection_count(:worker_1_port) AS worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) AS worker_2_connections \gset
|
||||||
|
SAVEPOINT s1;
|
||||||
|
INSERT INTO target_table SELECT a, CASE WHEN a < 5000 THEN b ELSE null END FROM source_table;
|
||||||
|
ERROR: null value in column "b" violates not-null constraint
|
||||||
|
ROLLBACK TO SAVEPOINT s1;
|
||||||
|
SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections;
|
||||||
|
leaked_worker_1_connections | leaked_worker_2_connections
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
END;
|
||||||
|
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
|
||||||
|
leaked_worker_1_connections | leaked_worker_2_connections
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SCHEMA insert_select_connection_leak CASCADE;
|
|
@ -42,10 +42,14 @@ test: multi_read_from_secondaries
|
||||||
test: multi_create_table
|
test: multi_create_table
|
||||||
test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table
|
test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table
|
||||||
test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select
|
test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select
|
||||||
test: multi_insert_select_window multi_shard_update_delete window_functions dml_recursive recursive_dml_with_different_planners_executors insert_select_repartition
|
test: multi_shard_update_delete recursive_dml_with_different_planners_executors
|
||||||
|
test: insert_select_repartition window_functions dml_recursive multi_insert_select_window
|
||||||
test: multi_insert_select_conflict
|
test: multi_insert_select_conflict
|
||||||
test: multi_row_insert
|
test: multi_row_insert
|
||||||
|
|
||||||
|
# following should not run in parallel because it relies on connection counts to workers
|
||||||
|
test: insert_select_connection_leak
|
||||||
|
|
||||||
# ---------
|
# ---------
|
||||||
# at the end of the regression tests regaring recursively planned modifications
|
# at the end of the regression tests regaring recursively planned modifications
|
||||||
# ensure that we don't leak any intermediate results
|
# ensure that we don't leak any intermediate results
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
CREATE SCHEMA insert_select_connection_leak;
|
||||||
|
SET search_path TO 'insert_select_connection_leak';
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 4213581;
|
||||||
|
SET citus.shard_count TO 64;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION
|
||||||
|
worker_connection_count(nodeport int)
|
||||||
|
RETURNS int AS $$
|
||||||
|
SELECT result::int - 1 FROM
|
||||||
|
run_command_on_workers($Q$select count(*) from pg_stat_activity where backend_type = 'client backend';$Q$)
|
||||||
|
WHERE nodeport = nodeport
|
||||||
|
$$ LANGUAGE SQL;
|
||||||
|
|
||||||
|
CREATE TABLE source_table(a int, b int);
|
||||||
|
SELECT create_distributed_table('source_table', 'a');
|
||||||
|
|
||||||
|
CREATE TABLE target_table(a numeric, b int not null);
|
||||||
|
SELECT create_distributed_table('target_table', 'a');
|
||||||
|
|
||||||
|
INSERT INTO source_table SELECT i, 2 * i FROM generate_series(1, 10000) i;
|
||||||
|
|
||||||
|
EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
|
||||||
|
SELECT worker_connection_count(:worker_1_port) AS pre_xact_worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) AS pre_xact_worker_2_connections \gset
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
SELECT worker_connection_count(:worker_1_port) AS worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) AS worker_2_connections \gset
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections;
|
||||||
|
END;
|
||||||
|
|
||||||
|
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
|
||||||
|
|
||||||
|
-- ROLLBACK
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
|
||||||
|
|
||||||
|
\set VERBOSITY TERSE
|
||||||
|
|
||||||
|
-- Error on constraint failure
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO target_table SELECT * FROM source_table;
|
||||||
|
SELECT worker_connection_count(:worker_1_port) AS worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) AS worker_2_connections \gset
|
||||||
|
SAVEPOINT s1;
|
||||||
|
INSERT INTO target_table SELECT a, CASE WHEN a < 5000 THEN b ELSE null END FROM source_table;
|
||||||
|
ROLLBACK TO SAVEPOINT s1;
|
||||||
|
SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections;
|
||||||
|
END;
|
||||||
|
|
||||||
|
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
|
||||||
|
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SCHEMA insert_select_connection_leak CASCADE;
|
Loading…
Reference in New Issue