diff --git a/src/test/regress/expected/citus_non_blocking_split_columnar.out b/src/test/regress/expected/citus_non_blocking_split_columnar.out index d1ce4d6a7..2d20fbc8a 100644 --- a/src/test/regress/expected/citus_non_blocking_split_columnar.out +++ b/src/test/regress/expected/citus_non_blocking_split_columnar.out @@ -274,8 +274,12 @@ SELECT pg_reload_conf(); -- END: Split a shard along its co-located shards -- BEGIN: Perform deferred cleanup. -CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 11 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- END: Perform deferred cleanup. -- BEGIN: Validate Shard Info and Data SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport @@ -525,9 +529,12 @@ NOTICE: cleaned up 11 orphaned resources -- END: Split a partition table directly -- BEGIN: Perform deferred cleanup. -SET client_min_messages TO WARNING; -CALL pg_catalog.citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- END: Perform deferred cleanup. -- BEGIN: Validate Shard Info and Data SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport diff --git a/src/test/regress/expected/citus_non_blocking_split_shard_cleanup.out b/src/test/regress/expected/citus_non_blocking_split_shard_cleanup.out index d7a1bc47d..e2685c2d7 100644 --- a/src/test/regress/expected/citus_non_blocking_split_shard_cleanup.out +++ b/src/test/regress/expected/citus_non_blocking_split_shard_cleanup.out @@ -48,9 +48,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points( (1 row) -- BEGIN: Perform deferred cleanup. -SET client_min_messages TO WARNING; -CALL pg_catalog.citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- END: Perform deferred cleanup. \c - - - :worker_1_port SET search_path TO "citus_split_test_schema"; diff --git a/src/test/regress/expected/citus_non_blocking_split_shards.out b/src/test/regress/expected/citus_non_blocking_split_shards.out index f08f9c428..af65563c2 100644 --- a/src/test/regress/expected/citus_non_blocking_split_shards.out +++ b/src/test/regress/expected/citus_non_blocking_split_shards.out @@ -237,9 +237,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points( (1 row) -- BEGIN: Perform deferred cleanup. -SET client_min_messages TO WARNING; -CALL pg_catalog.citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- END: Perform deferred cleanup. -- Perform 3 way split SELECT pg_catalog.citus_split_shard_by_split_points( @@ -254,9 +257,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points( -- END : Split two shards : One with move and One without move. -- BEGIN: Perform deferred cleanup. -SET client_min_messages TO WARNING; -CALL pg_catalog.citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- END: Perform deferred cleanup. -- BEGIN : Move a shard post split. SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes'); @@ -423,8 +429,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points( (1 row) -- BEGIN: Perform deferred cleanup. -CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- END: Perform deferred cleanup. SET search_path TO "citus_split_test_schema"; SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport @@ -476,9 +486,12 @@ ERROR: cannot use logical replication to transfer shards of the relation table_ DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY. HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'. -- BEGIN: Perform deferred cleanup. -SET client_min_messages TO WARNING; -CALL pg_catalog.citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- END: Perform deferred cleanup. SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard @@ -526,9 +539,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points( (1 row) -- BEGIN: Perform deferred cleanup. -SET client_min_messages TO WARNING; -CALL pg_catalog.citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- END: Perform deferred cleanup. SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard diff --git a/src/test/regress/expected/citus_split_shard_by_split_points.out b/src/test/regress/expected/citus_split_shard_by_split_points.out index 5e4b24190..7570267ba 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points.out @@ -233,8 +233,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points( (1 row) -- BEGIN: Perform deferred cleanup. -CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- END: Perform deferred cleanup. -- Perform 3 way split SELECT pg_catalog.citus_split_shard_by_split_points( @@ -249,8 +253,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points( -- END : Split two shards : One with move and One without move. -- BEGIN: Perform deferred cleanup. -CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- END: Perform deferred cleanup. -- BEGIN : Move a shard post split. SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes'); @@ -417,8 +425,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points( (1 row) -- BEGIN: Perform deferred cleanup. -CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- END: Perform deferred cleanup. SET search_path TO "citus_split_test_schema"; SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport diff --git a/src/test/regress/expected/citus_split_shard_columnar_partitioned.out b/src/test/regress/expected/citus_split_shard_columnar_partitioned.out index 09fb12fd9..97162e387 100644 --- a/src/test/regress/expected/citus_split_shard_columnar_partitioned.out +++ b/src/test/regress/expected/citus_split_shard_columnar_partitioned.out @@ -274,8 +274,12 @@ SELECT pg_reload_conf(); -- END: Split a shard along its co-located shards -- BEGIN: Perform deferred cleanup. -CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 11 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- END: Perform deferred cleanup. -- BEGIN: Validate Shard Info and Data SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport @@ -525,8 +529,12 @@ NOTICE: cleaned up 11 orphaned resources -- END: Split a partition table directly -- BEGIN: Perform deferred cleanup. -CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 11 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- END: Perform deferred cleanup. -- BEGIN: Validate Shard Info and Data SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport diff --git a/src/test/regress/expected/failure_on_create_subscription.out b/src/test/regress/expected/failure_on_create_subscription.out index ffc327e26..60c3277af 100644 --- a/src/test/regress/expected/failure_on_create_subscription.out +++ b/src/test/regress/expected/failure_on_create_subscription.out @@ -67,8 +67,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -CALL citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 1 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- @@ -101,8 +105,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -CALL citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 4 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); master_move_shard_placement --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_online_move_shard_placement.out b/src/test/regress/expected/failure_online_move_shard_placement.out index 7ca9b67c6..43cab7ae3 100644 --- a/src/test/regress/expected/failure_online_move_shard_placement.out +++ b/src/test/regress/expected/failure_online_move_shard_placement.out @@ -138,8 +138,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -CALL citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 2 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- cancel on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").cancel(' || :pid || ')'); mitmproxy @@ -156,8 +160,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -CALL citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 4 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- try again SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); master_move_shard_placement @@ -225,8 +233,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -CALL citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 2 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + CALL citus_cleanup_orphaned_shards(); NOTICE: cleaned up 1 orphaned shards -- failure on setting lock_timeout (right before dropping subscriptions & replication slots) @@ -273,8 +285,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -CALL citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 4 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + CALL citus_cleanup_orphaned_shards(); NOTICE: cleaned up 1 orphaned shards -- cancellation on disabling subscription (right before dropping it) @@ -293,8 +309,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -CALL citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 4 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- disable maintenance daemon cleanup, to prevent the flaky test ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1; SELECT pg_reload_conf(); @@ -344,9 +364,12 @@ SELECT pg_reload_conf(); -- cleanup leftovers -- then, verify we don't see any error for already dropped subscription -SET client_min_messages TO WARNING; -CALL citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); mitmproxy @@ -393,8 +416,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -CALL citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- lets create few more indexes and fail with both -- parallel mode and sequential mode CREATE INDEX index_failure_2 ON t(id); diff --git a/src/test/regress/expected/failure_split_cleanup.out b/src/test/regress/expected/failure_split_cleanup.out index 9e8bb17d2..e502af0d1 100644 --- a/src/test/regress/expected/failure_split_cleanup.out +++ b/src/test/regress/expected/failure_split_cleanup.out @@ -15,9 +15,12 @@ SET citus.shard_count TO 2; SET citus.shard_replication_factor TO 1; SELECT pg_backend_pid() as pid \gset -- cleanup any leftovers from previous tests so we get consistent output -SET client_min_messages TO WARNING; -CALL pg_catalog.citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- Disable defer shard delete to stop auto cleanup. ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1; SELECT pg_reload_conf(); @@ -98,8 +101,12 @@ CONTEXT: while executing command on localhost:xxxxx (0 rows) \c - postgres - :master_port - CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources + SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type @@ -204,8 +211,12 @@ ERROR: Failed to run worker_split_shard_replication_setup UDF. It should succes (0 rows) \c - postgres - :master_port - CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 4 orphaned resources + SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type @@ -316,8 +327,12 @@ CONTEXT: while executing command on localhost:xxxxx (0 rows) \c - postgres - :master_port - CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 5 orphaned resources + SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type @@ -434,8 +449,12 @@ CONTEXT: while executing command on localhost:xxxxx (1 row) \c - postgres - :master_port - CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 8 orphaned resources + SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type @@ -552,8 +571,12 @@ CONTEXT: while executing command on localhost:xxxxx (1 row) \c - postgres - :master_port - CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 8 orphaned resources + SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type @@ -675,8 +698,12 @@ CONTEXT: while executing command on localhost:xxxxx (1 row) \c - postgres - :master_port - CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 8 orphaned resources + SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type diff --git a/src/test/regress/expected/failure_tenant_isolation_nonblocking.out b/src/test/regress/expected/failure_tenant_isolation_nonblocking.out index a5f2cf5fd..e40842e2a 100644 --- a/src/test/regress/expected/failure_tenant_isolation_nonblocking.out +++ b/src/test/regress/expected/failure_tenant_isolation_nonblocking.out @@ -19,7 +19,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -- cleanup leftovers if any -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + CREATE TABLE table_1 (id int PRIMARY KEY); CREATE TABLE table_2 (ref_id int REFERENCES table_1(id) UNIQUE, data int); SELECT create_distributed_table('table_1', 'id'); @@ -273,7 +278,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").cancel(' || :pid || ')'); mitmproxy @@ -290,7 +300,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- failure on dropping publication SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").killall()'); mitmproxy @@ -308,7 +323,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- cancellation on dropping publication SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").cancel(' || :pid || ')'); mitmproxy @@ -325,7 +345,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- failure on dropping replication slot SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").killall()'); mitmproxy @@ -343,7 +368,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- cancellation on dropping replication slot SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").cancel(' || :pid || ')'); mitmproxy @@ -360,7 +390,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- failure on foreign key creation SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").kill()'); mitmproxy diff --git a/src/test/regress/expected/multi_tenant_isolation.out b/src/test/regress/expected/multi_tenant_isolation.out index 7277926be..102ea72b7 100644 --- a/src/test/regress/expected/multi_tenant_isolation.out +++ b/src/test/regress/expected/multi_tenant_isolation.out @@ -449,8 +449,12 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid >= 1230000 ORDER BY nodeport ERROR: insert or update on table "lineitem_streaming_1230044" violates foreign key constraint "test_constraint_1230044" DETAIL: Key (l_orderkey)=(128) is not present in table "orders_streaming_1230046". \c - postgres - :master_port -CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 2 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- connect to the worker node with metadata \c - mx_isolation_role_ent - :worker_1_port SET search_path to "Tenant Isolation"; @@ -716,7 +720,12 @@ SELECT * FROM pg_dist_shard (24 rows) \c - postgres - :master_port -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- test failure scenarios with triggers on workers \c - postgres - :worker_1_port SET search_path to "Tenant Isolation"; @@ -1017,8 +1026,12 @@ SELECT count(*) FROM test_colocated_table_2; (1 row) \c - postgres - :master_port -CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + \c - postgres - :worker_1_port -- show the foreign keys of the main table & its colocated shard on other tables SELECT tbl.relname, fk."Constraint", fk."Definition" diff --git a/src/test/regress/expected/multi_tenant_isolation_nonblocking.out b/src/test/regress/expected/multi_tenant_isolation_nonblocking.out index b2c8d62af..12661f70e 100644 --- a/src/test/regress/expected/multi_tenant_isolation_nonblocking.out +++ b/src/test/regress/expected/multi_tenant_isolation_nonblocking.out @@ -260,8 +260,12 @@ SELECT isolate_tenant_to_new_shard('lineitem_streaming', 100, 'CASCADE', shard_t ERROR: table lineitem_streaming has already been isolated for the given value SELECT isolate_tenant_to_new_shard('orders_streaming', 101, 'CASCADE', shard_transfer_mode => 'force_logical'); ERROR: table orders_streaming has already been isolated for the given value -CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 2 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- test corner cases: hash(-1995148554) = -2147483648 and hash(-1686493264) = 2147483647 SELECT isolate_tenant_to_new_shard('lineitem_streaming', -1995148554, 'CASCADE', shard_transfer_mode => 'force_logical'); isolate_tenant_to_new_shard @@ -275,8 +279,12 @@ SELECT isolate_tenant_to_new_shard('orders_streaming', -1686493264, 'CASCADE', s 1230047 (1 row) -CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 2 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1995148554; count --------------------------------------------------------------------- @@ -453,7 +461,12 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid >= 1230000 ORDER BY nodeport ERROR: insert or update on table "lineitem_streaming_1230044" violates foreign key constraint "test_constraint_1230044" DETAIL: Key (l_orderkey)=(128) is not present in table "orders_streaming_1230046". \c - postgres - :master_port -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- connect to the worker node with metadata \c - mx_isolation_role_ent - :worker_1_port SET search_path to "Tenant Isolation"; @@ -690,8 +703,12 @@ SELECT * FROM text_column; hello | {} (1 row) -CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 1 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- test with invalid shard placements \c - postgres - :master_port SET search_path to "Tenant Isolation"; @@ -747,7 +764,12 @@ SELECT * FROM pg_dist_shard (24 rows) \c - postgres - :master_port -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + -- test failure scenarios with triggers on workers \c - postgres - :worker_1_port SET search_path to "Tenant Isolation"; @@ -1056,8 +1078,12 @@ SELECT count(*) FROM test_colocated_table_2; (1 row) \c - postgres - :master_port -CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + \c - postgres - :worker_1_port -- show the foreign keys of the main table & its colocated shard on other tables SELECT tbl.relname, fk."Constraint", fk."Definition" diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index 640a8d9ee..f997e40d2 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -154,3 +154,16 @@ BEGIN END LOOP; END; $$ LANGUAGE plpgsql; +-- Introduce a function that waits until all cleanup records are deleted, for testing purposes +CREATE OR REPLACE FUNCTION wait_for_resource_cleanup() RETURNS void +SET client_min_messages TO WARNING +AS $$ +DECLARE +record_count integer; +BEGIN + EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count; + WHILE record_count != 0 LOOP + CALL pg_catalog.citus_cleanup_orphaned_resources(); + EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count; + END LOOP; +END$$ LANGUAGE plpgsql; diff --git a/src/test/regress/sql/citus_non_blocking_split_columnar.sql b/src/test/regress/sql/citus_non_blocking_split_columnar.sql index 21639f19a..ead6d3f37 100644 --- a/src/test/regress/sql/citus_non_blocking_split_columnar.sql +++ b/src/test/regress/sql/citus_non_blocking_split_columnar.sql @@ -173,7 +173,7 @@ SELECT pg_reload_conf(); -- END: Split a shard along its co-located shards -- BEGIN: Perform deferred cleanup. -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- END: Perform deferred cleanup. -- BEGIN: Validate Shard Info and Data @@ -244,9 +244,7 @@ CALL pg_catalog.citus_cleanup_orphaned_resources(); -- END: Split a partition table directly -- BEGIN: Perform deferred cleanup. -SET client_min_messages TO WARNING; -CALL pg_catalog.citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); -- END: Perform deferred cleanup. -- BEGIN: Validate Shard Info and Data diff --git a/src/test/regress/sql/citus_non_blocking_split_shard_cleanup.sql b/src/test/regress/sql/citus_non_blocking_split_shard_cleanup.sql index dab69dfbc..ba3f95215 100644 --- a/src/test/regress/sql/citus_non_blocking_split_shard_cleanup.sql +++ b/src/test/regress/sql/citus_non_blocking_split_shard_cleanup.sql @@ -47,9 +47,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 'force_logical'); -- BEGIN: Perform deferred cleanup. -SET client_min_messages TO WARNING; -CALL pg_catalog.citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); -- END: Perform deferred cleanup. \c - - - :worker_1_port diff --git a/src/test/regress/sql/citus_non_blocking_split_shards.sql b/src/test/regress/sql/citus_non_blocking_split_shards.sql index 2109a6902..05f2e7dfc 100644 --- a/src/test/regress/sql/citus_non_blocking_split_shards.sql +++ b/src/test/regress/sql/citus_non_blocking_split_shards.sql @@ -149,9 +149,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 'force_logical'); -- BEGIN: Perform deferred cleanup. -SET client_min_messages TO WARNING; -CALL pg_catalog.citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); -- END: Perform deferred cleanup. -- Perform 3 way split @@ -163,9 +161,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( -- END : Split two shards : One with move and One without move. -- BEGIN: Perform deferred cleanup. -SET client_min_messages TO WARNING; -CALL pg_catalog.citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); -- END: Perform deferred cleanup. -- BEGIN : Move a shard post split. @@ -241,7 +237,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 'force_logical'); -- BEGIN: Perform deferred cleanup. -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- END: Perform deferred cleanup. SET search_path TO "citus_split_test_schema"; @@ -267,9 +263,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( ARRAY[:worker_1_node, :worker_2_node]); -- BEGIN: Perform deferred cleanup. -SET client_min_messages TO WARNING; -CALL pg_catalog.citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); -- END: Perform deferred cleanup. SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport @@ -294,9 +288,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 'auto'); -- BEGIN: Perform deferred cleanup. -SET client_min_messages TO WARNING; -CALL pg_catalog.citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); -- END: Perform deferred cleanup. SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport diff --git a/src/test/regress/sql/citus_split_shard_by_split_points.sql b/src/test/regress/sql/citus_split_shard_by_split_points.sql index 24bf42951..c48dc22a4 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points.sql @@ -144,7 +144,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 'block_writes'); -- BEGIN: Perform deferred cleanup. -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- END: Perform deferred cleanup. -- Perform 3 way split @@ -156,7 +156,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( -- END : Split two shards : One with move and One without move. -- BEGIN: Perform deferred cleanup. -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- END: Perform deferred cleanup. -- BEGIN : Move a shard post split. @@ -232,7 +232,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 'block_writes'); -- BEGIN: Perform deferred cleanup. -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- END: Perform deferred cleanup. SET search_path TO "citus_split_test_schema"; diff --git a/src/test/regress/sql/citus_split_shard_columnar_partitioned.sql b/src/test/regress/sql/citus_split_shard_columnar_partitioned.sql index e7578879f..f1b3a3d13 100644 --- a/src/test/regress/sql/citus_split_shard_columnar_partitioned.sql +++ b/src/test/regress/sql/citus_split_shard_columnar_partitioned.sql @@ -173,7 +173,7 @@ SELECT pg_reload_conf(); -- END: Split a shard along its co-located shards -- BEGIN: Perform deferred cleanup. -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- END: Perform deferred cleanup. -- BEGIN: Validate Shard Info and Data @@ -244,7 +244,7 @@ CALL pg_catalog.citus_cleanup_orphaned_resources(); -- END: Split a partition table directly -- BEGIN: Perform deferred cleanup. -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- END: Perform deferred cleanup. -- BEGIN: Validate Shard Info and Data diff --git a/src/test/regress/sql/failure_on_create_subscription.sql b/src/test/regress/sql/failure_on_create_subscription.sql index a8c199775..3a0ae3b5e 100644 --- a/src/test/regress/sql/failure_on_create_subscription.sql +++ b/src/test/regress/sql/failure_on_create_subscription.sql @@ -44,7 +44,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").cancel(' || :pid || ')'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); @@ -56,7 +56,7 @@ SELECT count(*) FROM t; -- Verify that shard can be moved after a temporary failure -- cleanup leftovers, as it can cause flakiness in the following test files SELECT citus.mitmproxy('conn.allow()'); -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); SELECT * FROM shards_in_workers; SELECT count(*) FROM t; diff --git a/src/test/regress/sql/failure_online_move_shard_placement.sql b/src/test/regress/sql/failure_online_move_shard_placement.sql index dea2dac00..e5754b1c4 100644 --- a/src/test/regress/sql/failure_online_move_shard_placement.sql +++ b/src/test/regress/sql/failure_online_move_shard_placement.sql @@ -71,7 +71,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- cancel on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").cancel(' || :pid || ')'); @@ -79,7 +79,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- try again SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); @@ -102,7 +102,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'loca -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); CALL citus_cleanup_orphaned_shards(); -- failure on setting lock_timeout (right before dropping subscriptions & replication slots) @@ -114,7 +114,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'loca -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); CALL citus_cleanup_orphaned_shards(); -- cancellation on disabling subscription (right before dropping it) @@ -123,7 +123,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- disable maintenance daemon cleanup, to prevent the flaky test ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1; @@ -147,9 +147,7 @@ SELECT pg_reload_conf(); -- cleanup leftovers -- then, verify we don't see any error for already dropped subscription -SET client_min_messages TO WARNING; -CALL citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); @@ -169,7 +167,7 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- lets create few more indexes and fail with both -- parallel mode and sequential mode diff --git a/src/test/regress/sql/failure_split_cleanup.sql b/src/test/regress/sql/failure_split_cleanup.sql index 597a5f6bd..1b85d3d17 100644 --- a/src/test/regress/sql/failure_split_cleanup.sql +++ b/src/test/regress/sql/failure_split_cleanup.sql @@ -17,9 +17,7 @@ SET citus.shard_replication_factor TO 1; SELECT pg_backend_pid() as pid \gset -- cleanup any leftovers from previous tests so we get consistent output -SET client_min_messages TO WARNING; -CALL pg_catalog.citus_cleanup_orphaned_resources(); -RESET client_min_messages; +SELECT public.wait_for_resource_cleanup(); -- Disable defer shard delete to stop auto cleanup. ALTER SYSTEM SET citus.defer_shard_delete_interval TO -1; @@ -58,7 +56,7 @@ SELECT create_distributed_table('table_to_split', 'id'); SELECT subname FROM pg_subscription; \c - postgres - :master_port - CALL pg_catalog.citus_cleanup_orphaned_resources(); + SELECT public.wait_for_resource_cleanup(); SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; @@ -109,7 +107,7 @@ SELECT create_distributed_table('table_to_split', 'id'); SELECT subname FROM pg_subscription; \c - postgres - :master_port - CALL pg_catalog.citus_cleanup_orphaned_resources(); + SELECT public.wait_for_resource_cleanup(); SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; @@ -155,7 +153,7 @@ SELECT create_distributed_table('table_to_split', 'id'); SELECT subname FROM pg_subscription; \c - postgres - :master_port - CALL pg_catalog.citus_cleanup_orphaned_resources(); + SELECT public.wait_for_resource_cleanup(); SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; @@ -201,7 +199,7 @@ SELECT create_distributed_table('table_to_split', 'id'); SELECT subname FROM pg_subscription; \c - postgres - :master_port - CALL pg_catalog.citus_cleanup_orphaned_resources(); + SELECT public.wait_for_resource_cleanup(); SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; @@ -247,7 +245,7 @@ SELECT create_distributed_table('table_to_split', 'id'); SELECT subname FROM pg_subscription; \c - postgres - :master_port - CALL pg_catalog.citus_cleanup_orphaned_resources(); + SELECT public.wait_for_resource_cleanup(); SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; @@ -295,7 +293,7 @@ SELECT create_distributed_table('table_to_split', 'id'); SELECT subname FROM pg_subscription; \c - postgres - :master_port - CALL pg_catalog.citus_cleanup_orphaned_resources(); + SELECT public.wait_for_resource_cleanup(); SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; diff --git a/src/test/regress/sql/failure_tenant_isolation_nonblocking.sql b/src/test/regress/sql/failure_tenant_isolation_nonblocking.sql index 50cad7162..1eaf74d56 100644 --- a/src/test/regress/sql/failure_tenant_isolation_nonblocking.sql +++ b/src/test/regress/sql/failure_tenant_isolation_nonblocking.sql @@ -17,7 +17,7 @@ SELECT pg_backend_pid() as pid \gset SELECT citus.mitmproxy('conn.allow()'); -- cleanup leftovers if any -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); CREATE TABLE table_1 (id int PRIMARY KEY); CREATE TABLE table_2 (ref_id int REFERENCES table_1(id) UNIQUE, data int); @@ -134,7 +134,7 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").cancel(' || :pid || ')'); @@ -142,7 +142,7 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- failure on dropping publication SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").killall()'); @@ -150,7 +150,7 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- cancellation on dropping publication SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").cancel(' || :pid || ')'); @@ -158,7 +158,7 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- failure on dropping replication slot SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").killall()'); @@ -166,7 +166,7 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- cancellation on dropping replication slot SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").cancel(' || :pid || ')'); @@ -174,7 +174,7 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); -CALL citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- failure on foreign key creation SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").kill()'); diff --git a/src/test/regress/sql/multi_tenant_isolation.sql b/src/test/regress/sql/multi_tenant_isolation.sql index ad4376a5b..99bec5648 100644 --- a/src/test/regress/sql/multi_tenant_isolation.sql +++ b/src/test/regress/sql/multi_tenant_isolation.sql @@ -9,7 +9,6 @@ SELECT nextval('pg_catalog.pg_dist_placement_placementid_seq') AS last_placement \gset ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100000; - CREATE SCHEMA "Tenant Isolation"; SET search_path to "Tenant Isolation"; @@ -225,7 +224,7 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid >= 1230000 ORDER BY nodeport \. \c - postgres - :master_port -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- connect to the worker node with metadata \c - mx_isolation_role_ent - :worker_1_port @@ -345,7 +344,7 @@ SELECT * FROM pg_dist_shard ORDER BY shardminvalue::BIGINT, logicalrelid; \c - postgres - :master_port -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- test failure scenarios with triggers on workers \c - postgres - :worker_1_port @@ -523,7 +522,7 @@ SELECT isolate_tenant_to_new_shard('test_colocated_table_2', 1, 'CASCADE', shard SELECT count(*) FROM test_colocated_table_2; \c - postgres - :master_port -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); \c - postgres - :worker_1_port diff --git a/src/test/regress/sql/multi_tenant_isolation_nonblocking.sql b/src/test/regress/sql/multi_tenant_isolation_nonblocking.sql index f5f2997da..da90f19fc 100644 --- a/src/test/regress/sql/multi_tenant_isolation_nonblocking.sql +++ b/src/test/regress/sql/multi_tenant_isolation_nonblocking.sql @@ -175,13 +175,13 @@ SELECT isolate_tenant_to_new_shard('orders_streaming', 103, 'CASCADE', shard_tra SELECT isolate_tenant_to_new_shard('lineitem_streaming', 100, 'CASCADE', shard_transfer_mode => 'force_logical'); SELECT isolate_tenant_to_new_shard('orders_streaming', 101, 'CASCADE', shard_transfer_mode => 'force_logical'); -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- test corner cases: hash(-1995148554) = -2147483648 and hash(-1686493264) = 2147483647 SELECT isolate_tenant_to_new_shard('lineitem_streaming', -1995148554, 'CASCADE', shard_transfer_mode => 'force_logical'); SELECT isolate_tenant_to_new_shard('orders_streaming', -1686493264, 'CASCADE', shard_transfer_mode => 'force_logical'); -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1995148554; SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1686493264; @@ -229,7 +229,7 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid >= 1230000 ORDER BY nodeport \. \c - postgres - :master_port -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- connect to the worker node with metadata \c - mx_isolation_role_ent - :worker_1_port @@ -329,7 +329,7 @@ INSERT INTO text_column VALUES ('hello','{}'); SELECT create_distributed_table('text_column','tenant_id'); SELECT isolate_tenant_to_new_shard('text_column', 'hello', shard_transfer_mode => 'force_logical'); SELECT * FROM text_column; -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- test with invalid shard placements \c - postgres - :master_port @@ -358,7 +358,7 @@ SELECT * FROM pg_dist_shard ORDER BY shardminvalue::BIGINT, logicalrelid; \c - postgres - :master_port -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); -- test failure scenarios with triggers on workers \c - postgres - :worker_1_port @@ -526,7 +526,7 @@ SELECT isolate_tenant_to_new_shard('test_colocated_table_2', 1, 'CASCADE', shard SELECT count(*) FROM test_colocated_table_2; \c - postgres - :master_port -CALL pg_catalog.citus_cleanup_orphaned_resources(); +SELECT public.wait_for_resource_cleanup(); \c - postgres - :worker_1_port diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index 51cb2b129..dd8067adc 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -167,3 +167,16 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- Introduce a function that waits until all cleanup records are deleted, for testing purposes +CREATE OR REPLACE FUNCTION wait_for_resource_cleanup() RETURNS void +SET client_min_messages TO WARNING +AS $$ +DECLARE +record_count integer; +BEGIN + EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count; + WHILE record_count != 0 LOOP + CALL pg_catalog.citus_cleanup_orphaned_resources(); + EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count; + END LOOP; +END$$ LANGUAGE plpgsql;