Wait for cleanup function (#6549)

Adding a testing function `wait_for_resource_cleanup` which waits until
all records in `pg_dist_cleanup` are cleaned up. The motivation is to
prevent flakiness in our tests, since the `NOTICE: cleaned up X orphaned
resources` message is not consistent in many cases. This PR replaces
`citus_cleanup_orphaned_resources` calls with
`wait_for_resource_cleanup` calls.
pull/6554/head
Ahmet Gedemenli 2022-12-08 13:19:25 +03:00 committed by GitHub
parent cbb33167f9
commit 190307e8d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 341 additions and 150 deletions

View File

@ -274,8 +274,12 @@ SELECT pg_reload_conf();
-- END: Split a shard along its co-located shards -- END: Split a shard along its co-located shards
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 11 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN: Validate Shard Info and Data -- BEGIN: Validate Shard Info and Data
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
@ -525,9 +529,12 @@ NOTICE: cleaned up 11 orphaned resources
-- END: Split a partition table directly -- END: Split a partition table directly
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL pg_catalog.citus_cleanup_orphaned_resources(); wait_for_resource_cleanup
RESET client_min_messages; ---------------------------------------------------------------------
(1 row)
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN: Validate Shard Info and Data -- BEGIN: Validate Shard Info and Data
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport

View File

@ -48,9 +48,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
(1 row) (1 row)
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL pg_catalog.citus_cleanup_orphaned_resources(); wait_for_resource_cleanup
RESET client_min_messages; ---------------------------------------------------------------------
(1 row)
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO "citus_split_test_schema"; SET search_path TO "citus_split_test_schema";

View File

@ -237,9 +237,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
(1 row) (1 row)
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL pg_catalog.citus_cleanup_orphaned_resources(); wait_for_resource_cleanup
RESET client_min_messages; ---------------------------------------------------------------------
(1 row)
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- Perform 3 way split -- Perform 3 way split
SELECT pg_catalog.citus_split_shard_by_split_points( SELECT pg_catalog.citus_split_shard_by_split_points(
@ -254,9 +257,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
-- END : Split two shards : One with move and One without move. -- END : Split two shards : One with move and One without move.
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL pg_catalog.citus_cleanup_orphaned_resources(); wait_for_resource_cleanup
RESET client_min_messages; ---------------------------------------------------------------------
(1 row)
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN : Move a shard post split. -- BEGIN : Move a shard post split.
SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes'); SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes');
@ -423,8 +429,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
(1 row) (1 row)
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 3 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
SET search_path TO "citus_split_test_schema"; SET search_path TO "citus_split_test_schema";
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
@ -476,9 +486,12 @@ ERROR: cannot use logical replication to transfer shards of the relation table_
DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY. DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY.
HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'. HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'.
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL pg_catalog.citus_cleanup_orphaned_resources(); wait_for_resource_cleanup
RESET client_min_messages; ---------------------------------------------------------------------
(1 row)
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard FROM pg_dist_shard AS shard
@ -526,9 +539,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
(1 row) (1 row)
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL pg_catalog.citus_cleanup_orphaned_resources(); wait_for_resource_cleanup
RESET client_min_messages; ---------------------------------------------------------------------
(1 row)
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard FROM pg_dist_shard AS shard

View File

@ -233,8 +233,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
(1 row) (1 row)
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 3 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- Perform 3 way split -- Perform 3 way split
SELECT pg_catalog.citus_split_shard_by_split_points( SELECT pg_catalog.citus_split_shard_by_split_points(
@ -249,8 +253,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
-- END : Split two shards : One with move and One without move. -- END : Split two shards : One with move and One without move.
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 3 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN : Move a shard post split. -- BEGIN : Move a shard post split.
SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes'); SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes');
@ -417,8 +425,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
(1 row) (1 row)
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 3 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
SET search_path TO "citus_split_test_schema"; SET search_path TO "citus_split_test_schema";
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport

View File

@ -274,8 +274,12 @@ SELECT pg_reload_conf();
-- END: Split a shard along its co-located shards -- END: Split a shard along its co-located shards
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 11 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN: Validate Shard Info and Data -- BEGIN: Validate Shard Info and Data
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
@ -525,8 +529,12 @@ NOTICE: cleaned up 11 orphaned resources
-- END: Split a partition table directly -- END: Split a partition table directly
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 11 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN: Validate Shard Info and Data -- BEGIN: Validate Shard Info and Data
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport

View File

@ -67,8 +67,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 1 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").cancel(' || :pid || ')');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -101,8 +105,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 4 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
master_move_shard_placement master_move_shard_placement
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -138,8 +138,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 2 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- cancel on dropping subscription -- cancel on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").cancel(' || :pid || ')');
mitmproxy mitmproxy
@ -156,8 +160,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 4 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- try again -- try again
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
master_move_shard_placement master_move_shard_placement
@ -225,8 +233,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 2 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards(); CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 1 orphaned shards NOTICE: cleaned up 1 orphaned shards
-- failure on setting lock_timeout (right before dropping subscriptions & replication slots) -- failure on setting lock_timeout (right before dropping subscriptions & replication slots)
@ -273,8 +285,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 4 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_shards(); CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 1 orphaned shards NOTICE: cleaned up 1 orphaned shards
-- cancellation on disabling subscription (right before dropping it) -- cancellation on disabling subscription (right before dropping it)
@ -293,8 +309,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 4 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- disable maintenance daemon cleanup, to prevent the flaky test -- disable maintenance daemon cleanup, to prevent the flaky test
ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1; ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1;
SELECT pg_reload_conf(); SELECT pg_reload_conf();
@ -344,9 +364,12 @@ SELECT pg_reload_conf();
-- cleanup leftovers -- cleanup leftovers
-- then, verify we don't see any error for already dropped subscription -- then, verify we don't see any error for already dropped subscription
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL citus_cleanup_orphaned_resources(); wait_for_resource_cleanup
RESET client_min_messages; ---------------------------------------------------------------------
(1 row)
-- cancellation on dropping subscription -- cancellation on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')');
mitmproxy mitmproxy
@ -393,8 +416,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 3 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- lets create few more indexes and fail with both -- lets create few more indexes and fail with both
-- parallel mode and sequential mode -- parallel mode and sequential mode
CREATE INDEX index_failure_2 ON t(id); CREATE INDEX index_failure_2 ON t(id);

View File

@ -15,9 +15,12 @@ SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SELECT pg_backend_pid() as pid \gset SELECT pg_backend_pid() as pid \gset
-- cleanup any leftovers from previous tests so we get consistent output -- cleanup any leftovers from previous tests so we get consistent output
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL pg_catalog.citus_cleanup_orphaned_resources(); wait_for_resource_cleanup
RESET client_min_messages; ---------------------------------------------------------------------
(1 row)
-- Disable defer shard delete to stop auto cleanup. -- Disable defer shard delete to stop auto cleanup.
ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1; ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1;
SELECT pg_reload_conf(); SELECT pg_reload_conf();
@ -98,8 +101,12 @@ CONTEXT: while executing command on localhost:xxxxx
(0 rows) (0 rows)
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 3 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
@ -204,8 +211,12 @@ ERROR: Failed to run worker_split_shard_replication_setup UDF. It should succes
(0 rows) (0 rows)
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 4 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
@ -316,8 +327,12 @@ CONTEXT: while executing command on localhost:xxxxx
(0 rows) (0 rows)
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 5 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
@ -434,8 +449,12 @@ CONTEXT: while executing command on localhost:xxxxx
(1 row) (1 row)
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 8 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
@ -552,8 +571,12 @@ CONTEXT: while executing command on localhost:xxxxx
(1 row) (1 row)
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 8 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
@ -675,8 +698,12 @@ CONTEXT: while executing command on localhost:xxxxx
(1 row) (1 row)
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 8 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type

View File

@ -19,7 +19,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
-- cleanup leftovers if any -- cleanup leftovers if any
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
CREATE TABLE table_1 (id int PRIMARY KEY); CREATE TABLE table_1 (id int PRIMARY KEY);
CREATE TABLE table_2 (ref_id int REFERENCES table_1(id) UNIQUE, data int); CREATE TABLE table_2 (ref_id int REFERENCES table_1(id) UNIQUE, data int);
SELECT create_distributed_table('table_1', 'id'); SELECT create_distributed_table('table_1', 'id');
@ -273,7 +278,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- cancellation on dropping subscription -- cancellation on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").cancel(' || :pid || ')');
mitmproxy mitmproxy
@ -290,7 +300,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- failure on dropping publication -- failure on dropping publication
SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").killall()'); SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").killall()');
mitmproxy mitmproxy
@ -308,7 +323,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- cancellation on dropping publication -- cancellation on dropping publication
SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").cancel(' || :pid || ')');
mitmproxy mitmproxy
@ -325,7 +345,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- failure on dropping replication slot -- failure on dropping replication slot
SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").killall()'); SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").killall()');
mitmproxy mitmproxy
@ -343,7 +368,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- cancellation on dropping replication slot -- cancellation on dropping replication slot
SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").cancel(' || :pid || ')');
mitmproxy mitmproxy
@ -360,7 +390,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- failure on foreign key creation -- failure on foreign key creation
SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").kill()');
mitmproxy mitmproxy

View File

@ -449,8 +449,12 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid >= 1230000 ORDER BY nodeport
ERROR: insert or update on table "lineitem_streaming_1230044" violates foreign key constraint "test_constraint_1230044" ERROR: insert or update on table "lineitem_streaming_1230044" violates foreign key constraint "test_constraint_1230044"
DETAIL: Key (l_orderkey)=(128) is not present in table "orders_streaming_1230046". DETAIL: Key (l_orderkey)=(128) is not present in table "orders_streaming_1230046".
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 2 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- connect to the worker node with metadata -- connect to the worker node with metadata
\c - mx_isolation_role_ent - :worker_1_port \c - mx_isolation_role_ent - :worker_1_port
SET search_path to "Tenant Isolation"; SET search_path to "Tenant Isolation";
@ -716,7 +720,12 @@ SELECT * FROM pg_dist_shard
(24 rows) (24 rows)
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- test failure scenarios with triggers on workers -- test failure scenarios with triggers on workers
\c - postgres - :worker_1_port \c - postgres - :worker_1_port
SET search_path to "Tenant Isolation"; SET search_path to "Tenant Isolation";
@ -1017,8 +1026,12 @@ SELECT count(*) FROM test_colocated_table_2;
(1 row) (1 row)
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 3 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
\c - postgres - :worker_1_port \c - postgres - :worker_1_port
-- show the foreign keys of the main table & its colocated shard on other tables -- show the foreign keys of the main table & its colocated shard on other tables
SELECT tbl.relname, fk."Constraint", fk."Definition" SELECT tbl.relname, fk."Constraint", fk."Definition"

View File

@ -260,8 +260,12 @@ SELECT isolate_tenant_to_new_shard('lineitem_streaming', 100, 'CASCADE', shard_t
ERROR: table lineitem_streaming has already been isolated for the given value ERROR: table lineitem_streaming has already been isolated for the given value
SELECT isolate_tenant_to_new_shard('orders_streaming', 101, 'CASCADE', shard_transfer_mode => 'force_logical'); SELECT isolate_tenant_to_new_shard('orders_streaming', 101, 'CASCADE', shard_transfer_mode => 'force_logical');
ERROR: table orders_streaming has already been isolated for the given value ERROR: table orders_streaming has already been isolated for the given value
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 2 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- test corner cases: hash(-1995148554) = -2147483648 and hash(-1686493264) = 2147483647 -- test corner cases: hash(-1995148554) = -2147483648 and hash(-1686493264) = 2147483647
SELECT isolate_tenant_to_new_shard('lineitem_streaming', -1995148554, 'CASCADE', shard_transfer_mode => 'force_logical'); SELECT isolate_tenant_to_new_shard('lineitem_streaming', -1995148554, 'CASCADE', shard_transfer_mode => 'force_logical');
isolate_tenant_to_new_shard isolate_tenant_to_new_shard
@ -275,8 +279,12 @@ SELECT isolate_tenant_to_new_shard('orders_streaming', -1686493264, 'CASCADE', s
1230047 1230047
(1 row) (1 row)
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 2 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1995148554; SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1995148554;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -453,7 +461,12 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid >= 1230000 ORDER BY nodeport
ERROR: insert or update on table "lineitem_streaming_1230044" violates foreign key constraint "test_constraint_1230044" ERROR: insert or update on table "lineitem_streaming_1230044" violates foreign key constraint "test_constraint_1230044"
DETAIL: Key (l_orderkey)=(128) is not present in table "orders_streaming_1230046". DETAIL: Key (l_orderkey)=(128) is not present in table "orders_streaming_1230046".
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- connect to the worker node with metadata -- connect to the worker node with metadata
\c - mx_isolation_role_ent - :worker_1_port \c - mx_isolation_role_ent - :worker_1_port
SET search_path to "Tenant Isolation"; SET search_path to "Tenant Isolation";
@ -690,8 +703,12 @@ SELECT * FROM text_column;
hello | {} hello | {}
(1 row) (1 row)
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 1 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- test with invalid shard placements -- test with invalid shard placements
\c - postgres - :master_port \c - postgres - :master_port
SET search_path to "Tenant Isolation"; SET search_path to "Tenant Isolation";
@ -747,7 +764,12 @@ SELECT * FROM pg_dist_shard
(24 rows) (24 rows)
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- test failure scenarios with triggers on workers -- test failure scenarios with triggers on workers
\c - postgres - :worker_1_port \c - postgres - :worker_1_port
SET search_path to "Tenant Isolation"; SET search_path to "Tenant Isolation";
@ -1056,8 +1078,12 @@ SELECT count(*) FROM test_colocated_table_2;
(1 row) (1 row)
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
NOTICE: cleaned up 3 orphaned resources wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
\c - postgres - :worker_1_port \c - postgres - :worker_1_port
-- show the foreign keys of the main table & its colocated shard on other tables -- show the foreign keys of the main table & its colocated shard on other tables
SELECT tbl.relname, fk."Constraint", fk."Definition" SELECT tbl.relname, fk."Constraint", fk."Definition"

View File

@ -154,3 +154,16 @@ BEGIN
END LOOP; END LOOP;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- Introduce a function that waits until all cleanup records are deleted, for testing purposes
CREATE OR REPLACE FUNCTION wait_for_resource_cleanup() RETURNS void
SET client_min_messages TO WARNING
AS $$
DECLARE
record_count integer;
BEGIN
EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count;
WHILE record_count != 0 LOOP
CALL pg_catalog.citus_cleanup_orphaned_resources();
EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count;
END LOOP;
END$$ LANGUAGE plpgsql;

View File

@ -173,7 +173,7 @@ SELECT pg_reload_conf();
-- END: Split a shard along its co-located shards -- END: Split a shard along its co-located shards
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN: Validate Shard Info and Data -- BEGIN: Validate Shard Info and Data
@ -244,9 +244,7 @@ CALL pg_catalog.citus_cleanup_orphaned_resources();
-- END: Split a partition table directly -- END: Split a partition table directly
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN: Validate Shard Info and Data -- BEGIN: Validate Shard Info and Data

View File

@ -47,9 +47,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
'force_logical'); 'force_logical');
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
\c - - - :worker_1_port \c - - - :worker_1_port

View File

@ -149,9 +149,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
'force_logical'); 'force_logical');
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- Perform 3 way split -- Perform 3 way split
@ -163,9 +161,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
-- END : Split two shards : One with move and One without move. -- END : Split two shards : One with move and One without move.
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN : Move a shard post split. -- BEGIN : Move a shard post split.
@ -241,7 +237,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
'force_logical'); 'force_logical');
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
SET search_path TO "citus_split_test_schema"; SET search_path TO "citus_split_test_schema";
@ -267,9 +263,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
ARRAY[:worker_1_node, :worker_2_node]); ARRAY[:worker_1_node, :worker_2_node]);
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
@ -294,9 +288,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
'auto'); 'auto');
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport

View File

@ -144,7 +144,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
'block_writes'); 'block_writes');
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- Perform 3 way split -- Perform 3 way split
@ -156,7 +156,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
-- END : Split two shards : One with move and One without move. -- END : Split two shards : One with move and One without move.
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN : Move a shard post split. -- BEGIN : Move a shard post split.
@ -232,7 +232,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
'block_writes'); 'block_writes');
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
SET search_path TO "citus_split_test_schema"; SET search_path TO "citus_split_test_schema";

View File

@ -173,7 +173,7 @@ SELECT pg_reload_conf();
-- END: Split a shard along its co-located shards -- END: Split a shard along its co-located shards
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN: Validate Shard Info and Data -- BEGIN: Validate Shard Info and Data
@ -244,7 +244,7 @@ CALL pg_catalog.citus_cleanup_orphaned_resources();
-- END: Split a partition table directly -- END: Split a partition table directly
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN: Validate Shard Info and Data -- BEGIN: Validate Shard Info and Data

View File

@ -44,7 +44,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
-- cleanup leftovers -- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").cancel(' || :pid || ')');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
@ -56,7 +56,7 @@ SELECT count(*) FROM t;
-- Verify that shard can be moved after a temporary failure -- Verify that shard can be moved after a temporary failure
-- cleanup leftovers, as it can cause flakiness in the following test files -- cleanup leftovers, as it can cause flakiness in the following test files
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
SELECT * FROM shards_in_workers; SELECT * FROM shards_in_workers;
SELECT count(*) FROM t; SELECT count(*) FROM t;

View File

@ -71,7 +71,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
-- cleanup leftovers -- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- cancel on dropping subscription -- cancel on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").cancel(' || :pid || ')');
@ -79,7 +79,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
-- cleanup leftovers -- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- try again -- try again
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
@ -102,7 +102,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'loca
-- cleanup leftovers -- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
CALL citus_cleanup_orphaned_shards(); CALL citus_cleanup_orphaned_shards();
-- failure on setting lock_timeout (right before dropping subscriptions & replication slots) -- failure on setting lock_timeout (right before dropping subscriptions & replication slots)
@ -114,7 +114,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'loca
-- cleanup leftovers -- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
CALL citus_cleanup_orphaned_shards(); CALL citus_cleanup_orphaned_shards();
-- cancellation on disabling subscription (right before dropping it) -- cancellation on disabling subscription (right before dropping it)
@ -123,7 +123,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
-- cleanup leftovers -- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- disable maintenance daemon cleanup, to prevent the flaky test -- disable maintenance daemon cleanup, to prevent the flaky test
ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1; ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1;
@ -147,9 +147,7 @@ SELECT pg_reload_conf();
-- cleanup leftovers -- cleanup leftovers
-- then, verify we don't see any error for already dropped subscription -- then, verify we don't see any error for already dropped subscription
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- cancellation on dropping subscription -- cancellation on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')');
@ -169,7 +167,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
-- cleanup leftovers -- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- lets create few more indexes and fail with both -- lets create few more indexes and fail with both
-- parallel mode and sequential mode -- parallel mode and sequential mode

View File

@ -17,9 +17,7 @@ SET citus.shard_replication_factor TO 1;
SELECT pg_backend_pid() as pid \gset SELECT pg_backend_pid() as pid \gset
-- cleanup any leftovers from previous tests so we get consistent output -- cleanup any leftovers from previous tests so we get consistent output
SET client_min_messages TO WARNING; SELECT public.wait_for_resource_cleanup();
CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- Disable defer shard delete to stop auto cleanup. -- Disable defer shard delete to stop auto cleanup.
ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1; ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1;
@ -58,7 +56,7 @@ SELECT create_distributed_table('table_to_split', 'id');
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
@ -109,7 +107,7 @@ SELECT create_distributed_table('table_to_split', 'id');
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
@ -155,7 +153,7 @@ SELECT create_distributed_table('table_to_split', 'id');
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
@ -201,7 +199,7 @@ SELECT create_distributed_table('table_to_split', 'id');
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
@ -247,7 +245,7 @@ SELECT create_distributed_table('table_to_split', 'id');
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
@ -295,7 +293,7 @@ SELECT create_distributed_table('table_to_split', 'id');
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;

View File

@ -17,7 +17,7 @@ SELECT pg_backend_pid() as pid \gset
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
-- cleanup leftovers if any -- cleanup leftovers if any
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
CREATE TABLE table_1 (id int PRIMARY KEY); CREATE TABLE table_1 (id int PRIMARY KEY);
CREATE TABLE table_2 (ref_id int REFERENCES table_1(id) UNIQUE, data int); CREATE TABLE table_2 (ref_id int REFERENCES table_1(id) UNIQUE, data int);
@ -134,7 +134,7 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode
-- cleanup leftovers -- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- cancellation on dropping subscription -- cancellation on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").cancel(' || :pid || ')');
@ -142,7 +142,7 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode
-- cleanup leftovers -- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- failure on dropping publication -- failure on dropping publication
SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").killall()'); SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").killall()');
@ -150,7 +150,7 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode
-- cleanup leftovers -- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- cancellation on dropping publication -- cancellation on dropping publication
SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").cancel(' || :pid || ')');
@ -158,7 +158,7 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode
-- cleanup leftovers -- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- failure on dropping replication slot -- failure on dropping replication slot
SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").killall()'); SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").killall()');
@ -166,7 +166,7 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode
-- cleanup leftovers -- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- cancellation on dropping replication slot -- cancellation on dropping replication slot
SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").cancel(' || :pid || ')');
@ -174,7 +174,7 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode
-- cleanup leftovers -- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- failure on foreign key creation -- failure on foreign key creation
SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").kill()');

View File

@ -9,7 +9,6 @@ SELECT nextval('pg_catalog.pg_dist_placement_placementid_seq') AS last_placement
\gset \gset
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100000; ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100000;
CREATE SCHEMA "Tenant Isolation"; CREATE SCHEMA "Tenant Isolation";
SET search_path to "Tenant Isolation"; SET search_path to "Tenant Isolation";
@ -225,7 +224,7 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid >= 1230000 ORDER BY nodeport
\. \.
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- connect to the worker node with metadata -- connect to the worker node with metadata
\c - mx_isolation_role_ent - :worker_1_port \c - mx_isolation_role_ent - :worker_1_port
@ -345,7 +344,7 @@ SELECT * FROM pg_dist_shard
ORDER BY shardminvalue::BIGINT, logicalrelid; ORDER BY shardminvalue::BIGINT, logicalrelid;
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- test failure scenarios with triggers on workers -- test failure scenarios with triggers on workers
\c - postgres - :worker_1_port \c - postgres - :worker_1_port
@ -523,7 +522,7 @@ SELECT isolate_tenant_to_new_shard('test_colocated_table_2', 1, 'CASCADE', shard
SELECT count(*) FROM test_colocated_table_2; SELECT count(*) FROM test_colocated_table_2;
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
\c - postgres - :worker_1_port \c - postgres - :worker_1_port

View File

@ -175,13 +175,13 @@ SELECT isolate_tenant_to_new_shard('orders_streaming', 103, 'CASCADE', shard_tra
SELECT isolate_tenant_to_new_shard('lineitem_streaming', 100, 'CASCADE', shard_transfer_mode => 'force_logical'); SELECT isolate_tenant_to_new_shard('lineitem_streaming', 100, 'CASCADE', shard_transfer_mode => 'force_logical');
SELECT isolate_tenant_to_new_shard('orders_streaming', 101, 'CASCADE', shard_transfer_mode => 'force_logical'); SELECT isolate_tenant_to_new_shard('orders_streaming', 101, 'CASCADE', shard_transfer_mode => 'force_logical');
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- test corner cases: hash(-1995148554) = -2147483648 and hash(-1686493264) = 2147483647 -- test corner cases: hash(-1995148554) = -2147483648 and hash(-1686493264) = 2147483647
SELECT isolate_tenant_to_new_shard('lineitem_streaming', -1995148554, 'CASCADE', shard_transfer_mode => 'force_logical'); SELECT isolate_tenant_to_new_shard('lineitem_streaming', -1995148554, 'CASCADE', shard_transfer_mode => 'force_logical');
SELECT isolate_tenant_to_new_shard('orders_streaming', -1686493264, 'CASCADE', shard_transfer_mode => 'force_logical'); SELECT isolate_tenant_to_new_shard('orders_streaming', -1686493264, 'CASCADE', shard_transfer_mode => 'force_logical');
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1995148554; SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1995148554;
SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1686493264; SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1686493264;
@ -229,7 +229,7 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid >= 1230000 ORDER BY nodeport
\. \.
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- connect to the worker node with metadata -- connect to the worker node with metadata
\c - mx_isolation_role_ent - :worker_1_port \c - mx_isolation_role_ent - :worker_1_port
@ -329,7 +329,7 @@ INSERT INTO text_column VALUES ('hello','{}');
SELECT create_distributed_table('text_column','tenant_id'); SELECT create_distributed_table('text_column','tenant_id');
SELECT isolate_tenant_to_new_shard('text_column', 'hello', shard_transfer_mode => 'force_logical'); SELECT isolate_tenant_to_new_shard('text_column', 'hello', shard_transfer_mode => 'force_logical');
SELECT * FROM text_column; SELECT * FROM text_column;
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- test with invalid shard placements -- test with invalid shard placements
\c - postgres - :master_port \c - postgres - :master_port
@ -358,7 +358,7 @@ SELECT * FROM pg_dist_shard
ORDER BY shardminvalue::BIGINT, logicalrelid; ORDER BY shardminvalue::BIGINT, logicalrelid;
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
-- test failure scenarios with triggers on workers -- test failure scenarios with triggers on workers
\c - postgres - :worker_1_port \c - postgres - :worker_1_port
@ -526,7 +526,7 @@ SELECT isolate_tenant_to_new_shard('test_colocated_table_2', 1, 'CASCADE', shard
SELECT count(*) FROM test_colocated_table_2; SELECT count(*) FROM test_colocated_table_2;
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT public.wait_for_resource_cleanup();
\c - postgres - :worker_1_port \c - postgres - :worker_1_port

View File

@ -167,3 +167,16 @@ BEGIN
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
-- Introduce a function that waits until all cleanup records are deleted, for testing purposes
CREATE OR REPLACE FUNCTION wait_for_resource_cleanup() RETURNS void
SET client_min_messages TO WARNING
AS $$
DECLARE
record_count integer;
BEGIN
EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count;
WHILE record_count != 0 LOOP
CALL pg_catalog.citus_cleanup_orphaned_resources();
EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count;
END LOOP;
END$$ LANGUAGE plpgsql;