diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index ab7fdfaf9..80b243d51 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -12,6 +12,7 @@ #include "postgres.h" +#include "access/xact.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" @@ -21,33 +22,66 @@ /* declarations for dynamic loading */ -PG_FUNCTION_INFO_V1(master_defer_delete_shards); +PG_FUNCTION_INFO_V1(citus_cleanup_orphaned_shards); +PG_FUNCTION_INFO_V1(isolation_cleanup_orphaned_shards); static bool TryDropShard(GroupShardPlacement *placement); static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode); + /* - * master_defer_delete_shards implements a user-facing UDF to deleter orphaned shards that - * are still haning around in the system. These shards are orphaned by previous actions - * that were not directly able to delete the placements eg. shard moving or dropping of a - * distributed table while one of the data nodes was not online. + * citus_cleanup_orphaned_shards implements a user-facing UDF to delete + * orphaned shards that are still haning around in the system. These shards are + * orphaned by previous actions that were not directly able to delete the + * placements eg. shard moving or dropping of a distributed table while one of + * the data nodes was not online. * - * This function iterates through placements where shardstate is SHARD_STATE_TO_DELETE - * (shardstate = 4), drops the corresponding tables from the node and removes the - * placement information from the catalog. + * This function iterates through placements where shardstate is + * SHARD_STATE_TO_DELETE (shardstate = 4), drops the corresponding tables from + * the node and removes the placement information from the catalog. * - * The function takes no arguments and runs cluster wide + * The function takes no arguments and runs cluster wide. It cannot be run in a + * transaction, because holding the locks it takes for a long time is not good. + * While the locks are held, it is impossible for the background daemon to + * cleanup orphaned shards. */ Datum -master_defer_delete_shards(PG_FUNCTION_ARGS) +citus_cleanup_orphaned_shards(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + PreventInTransactionBlock(true, "citus_cleanup_orphaned_shards"); + + bool waitForLocks = true; + int droppedShardCount = DropMarkedShards(waitForLocks); + if (droppedShardCount > 0) + { + ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount))); + } + + PG_RETURN_VOID(); +} + + +/* + * isolation_cleanup_orphaned_shards implements a test UDF that's the same as + * citus_cleanup_orphaned_shards. The only difference is that this command can + * be run in transactions, this is to test + */ +Datum +isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); EnsureCoordinator(); bool waitForLocks = true; int droppedShardCount = DropMarkedShards(waitForLocks); + if (droppedShardCount > 0) + { + ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount))); + } - PG_RETURN_INT32(droppedShardCount); + PG_RETURN_VOID(); } diff --git a/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql b/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql index 387db171e..ec5a122fa 100644 --- a/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql +++ b/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql @@ -47,3 +47,5 @@ WHERE repmodel = 'c' DROP TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger ON pg_catalog.pg_dist_rebalance_strategy; DROP FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check(); + +#include "udfs/citus_cleanup_orphaned_shards/10.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql b/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql index af5c3b55b..5946473f9 100644 --- a/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql +++ b/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql @@ -84,3 +84,4 @@ CREATE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger BEFORE INSERT OR UPDATE OR DELETE OR TRUNCATE ON pg_dist_rebalance_strategy FOR EACH STATEMENT EXECUTE FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check(); +DROP PROCEDURE pg_catalog.citus_cleanup_orphaned_shards(); diff --git a/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/10.1-1.sql b/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/10.1-1.sql new file mode 100644 index 000000000..ed5723602 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/10.1-1.sql @@ -0,0 +1,5 @@ +CREATE OR REPLACE PROCEDURE pg_catalog.citus_cleanup_orphaned_shards() + LANGUAGE C + AS 'citus', $$citus_cleanup_orphaned_shards$$; +COMMENT ON PROCEDURE pg_catalog.citus_cleanup_orphaned_shards() + IS 'cleanup orphaned shards'; diff --git a/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/latest.sql b/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/latest.sql new file mode 100644 index 000000000..ed5723602 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/latest.sql @@ -0,0 +1,5 @@ +CREATE OR REPLACE PROCEDURE pg_catalog.citus_cleanup_orphaned_shards() + LANGUAGE C + AS 'citus', $$citus_cleanup_orphaned_shards$$; +COMMENT ON PROCEDURE pg_catalog.citus_cleanup_orphaned_shards() + IS 'cleanup orphaned shards'; diff --git a/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out b/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out index a144f48a5..bedfcc6f7 100644 --- a/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out +++ b/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out @@ -61,12 +61,8 @@ SELECT count(*) FROM referencing_table2; 101 (1 row) -SELECT 1 FROM public.master_defer_delete_shards(); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 2 orphaned shards SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; name | relid | refd_relid --------------------------------------------------------------------- @@ -108,12 +104,8 @@ SELECT count(*) FROM referencing_table2; 101 (1 row) -SELECT 1 FROM public.master_defer_delete_shards(); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 2 orphaned shards SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; name | relid | refd_relid --------------------------------------------------------------------- diff --git a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out index f11065c1b..187d33632 100644 --- a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out +++ b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out @@ -10,23 +10,19 @@ step s1-move-placement: master_move_shard_placement +s1: NOTICE: cleaned up 1 orphaned shards step s1-drop-marked-shards: - SELECT public.master_defer_delete_shards(); + SET client_min_messages to NOTICE; + CALL isolation_cleanup_orphaned_shards(); -master_defer_delete_shards - -1 step s2-drop-marked-shards: SET client_min_messages to DEBUG1; - SELECT public.master_defer_delete_shards(); + CALL isolation_cleanup_orphaned_shards(); step s1-commit: COMMIT; step s2-drop-marked-shards: <... completed> -master_defer_delete_shards - -0 starting permutation: s1-begin s1-move-placement s2-drop-marked-shards s1-drop-marked-shards s1-commit step s1-begin: @@ -40,17 +36,13 @@ master_move_shard_placement step s2-drop-marked-shards: SET client_min_messages to DEBUG1; - SELECT public.master_defer_delete_shards(); + CALL isolation_cleanup_orphaned_shards(); -master_defer_delete_shards - -0 +s1: NOTICE: cleaned up 1 orphaned shards step s1-drop-marked-shards: - SELECT public.master_defer_delete_shards(); + SET client_min_messages to NOTICE; + CALL isolation_cleanup_orphaned_shards(); -master_defer_delete_shards - -1 step s1-commit: COMMIT; @@ -82,14 +74,12 @@ run_commands_on_session_level_connection_to_node step s1-drop-marked-shards: - SELECT public.master_defer_delete_shards(); + SET client_min_messages to NOTICE; + CALL isolation_cleanup_orphaned_shards(); s1: WARNING: canceling statement due to lock timeout -s1: WARNING: Failed to drop 1 old shards out of 1 step s1-drop-marked-shards: <... completed> -master_defer_delete_shards - -0 +s1: WARNING: Failed to drop 1 old shards out of 1 step s1-commit: COMMIT; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 63179c7c5..04c15a84e 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -573,6 +573,7 @@ SELECT * FROM print_extension_changes(); function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint) | function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name) TABLE(table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer) | | function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) void + | function citus_cleanup_orphaned_shards() | function citus_local_disk_space_stats() record | function create_distributed_table(regclass,text,citus.distribution_type,text,integer) void | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint) @@ -580,7 +581,7 @@ SELECT * FROM print_extension_changes(); | function worker_partitioned_relation_size(regclass) bigint | function worker_partitioned_relation_total_size(regclass) bigint | function worker_partitioned_table_size(regclass) bigint -(14 rows) +(15 rows) DROP TABLE prev_objects, extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_test_helpers_superuser.out b/src/test/regress/expected/multi_test_helpers_superuser.out index cfc3cf02b..01676131c 100644 --- a/src/test/regress/expected/multi_test_helpers_superuser.out +++ b/src/test/regress/expected/multi_test_helpers_superuser.out @@ -1,9 +1,3 @@ -CREATE OR REPLACE FUNCTION master_defer_delete_shards() - RETURNS int - LANGUAGE C STRICT - AS 'citus', $$master_defer_delete_shards$$; -COMMENT ON FUNCTION master_defer_delete_shards() - IS 'remove orphaned shards'; CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) RETURNS void LANGUAGE C STRICT diff --git a/src/test/regress/expected/shard_move_deferred_delete.out b/src/test/regress/expected/shard_move_deferred_delete.out index c125e2c53..ae3fbfa00 100644 --- a/src/test/regress/expected/shard_move_deferred_delete.out +++ b/src/test/regress/expected/shard_move_deferred_delete.out @@ -50,13 +50,14 @@ $cmd$); (localhost,57638,t,1) (2 rows) +-- Make sure this cannot be run in a transaction +BEGIN; +CALL citus_cleanup_orphaned_shards(); +ERROR: citus_cleanup_orphaned_shards cannot run inside a transaction block +COMMIT; -- execute delayed removal -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 1 orphaned shards -- we expect the shard to be on only the second worker SELECT run_command_on_workers($cmd$ SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 23f100038..037107848 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -31,24 +31,14 @@ SELECT rebalance_table_shards('dist_table_test'); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT rebalance_table_shards(); rebalance_table_shards --------------------------------------------------------------------- (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- test that calling rebalance_table_shards without specifying relation -- wouldn't move shard of the citus local table. CREATE TABLE citus_local_table(a int, b int); @@ -65,12 +55,7 @@ SELECT rebalance_table_shards(); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; tablename @@ -101,12 +86,7 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running SELECT master_drain_node('localhost', :master_port); ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); pg_reload_conf @@ -126,12 +106,7 @@ SELECT master_drain_node('localhost', :master_port); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; tablename @@ -579,7 +554,7 @@ AS $$ pg_dist_shard_placement src USING (shardid), (SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass; - SELECT public.master_defer_delete_shards(); + CALL citus_cleanup_orphaned_shards(); $$; CALL create_unbalanced_shards('rebalance_test_table'); SET citus.shard_replication_factor TO 2; @@ -624,12 +599,7 @@ FROM ( WHERE logicalrelid = 'rebalance_test_table'::regclass ) T; ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); pg_reload_conf @@ -658,12 +628,7 @@ FROM ( (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -715,12 +680,7 @@ SELECT * FROM table_placements_per_node; 57638 | rebalance_test_table | 5 (2 rows) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); @@ -729,12 +689,7 @@ SELECT rebalance_table_shards('rebalance_test_table', (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -749,12 +704,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_tran (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -769,12 +719,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 0); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -790,12 +735,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_tran (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -969,12 +909,7 @@ SELECT COUNT(*) FROM imbalanced_table; -- Try force_logical SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical'); ERROR: the force_logical transfer mode is currently unsupported -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Test rebalance operation SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes'); rebalance_table_shards @@ -982,12 +917,7 @@ SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_m (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Confirm rebalance -- Shard counts in each node after rebalance SELECT * FROM public.table_placements_per_node; @@ -1024,12 +954,7 @@ FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; ERROR: Moving shards to a non-existing node is not supported HINT: Add the target node via SELECT citus_add_node('localhost', 10000); -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Try to move shards to a node where shards are not allowed SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); master_set_node_property @@ -1073,12 +998,7 @@ WHERE nodeport = :worker_2_port; (2 rows) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 2 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT create_distributed_table('colocated_rebalance_test2', 'id'); create_distributed_table --------------------------------------------------------------------- @@ -1106,12 +1026,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Confirm that nothing changed SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count @@ -1153,12 +1068,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size @@ -1216,12 +1126,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1244,12 +1149,7 @@ SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold : (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 2 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1271,12 +1171,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1293,12 +1188,7 @@ SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold : (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 2 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1335,12 +1225,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1362,12 +1247,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1404,12 +1284,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1431,12 +1306,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1462,12 +1332,7 @@ SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port; shouldhaveshards --------------------------------------------------------------------- @@ -1495,12 +1360,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1607,12 +1467,7 @@ SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes') (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1627,12 +1482,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 1 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1648,12 +1499,7 @@ DETAIL: Using threshold of 0.01 (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1759,12 +1605,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 4 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1859,12 +1701,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 3 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 3 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1888,12 +1726,7 @@ SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes') (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1943,12 +1776,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 4 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1973,20 +1802,10 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_ ERROR: could not find rebalance strategy with name non_existing SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing'); ERROR: could not find rebalance strategy with name non_existing -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing'); ERROR: could not find rebalance strategy with name non_existing -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT citus_set_default_rebalance_strategy('non_existing'); ERROR: strategy with specified name does not exist UPDATE pg_dist_rebalance_strategy SET default_strategy=false; @@ -1994,20 +1813,10 @@ SELECT * FROM get_rebalance_table_shards_plan('tab'); ERROR: no rebalance_strategy was provided, but there is also no default strategy set SELECT * FROM rebalance_table_shards('tab'); ERROR: no rebalance_strategy was provided, but there is also no default strategy set -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM master_drain_node('localhost', :worker_2_port); ERROR: no rebalance_strategy was provided, but there is also no default strategy set -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count'; CREATE OR REPLACE FUNCTION shard_cost_no_arguments() RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql; @@ -2283,12 +2092,7 @@ SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='bloc (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 3 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; count --------------------------------------------------------------------- @@ -2350,12 +2154,7 @@ SELECT rebalance_table_shards(); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 2 -(1 row) - +CALL citus_cleanup_orphaned_shards(); DROP TABLE t1, r1, r2; -- verify there are no distributed tables before we perform the following tests. Preceding -- test suites should clean up their distributed tables. @@ -2404,12 +2203,7 @@ SELECT rebalance_table_shards(); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- verify the reference table is on all nodes after the rebalance SELECT count(*) FROM pg_dist_shard diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 064e40e89..2b7d7468d 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -38,6 +38,7 @@ ORDER BY 1; function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) function citus_add_secondary_node(text,integer,text,integer,name) function citus_blocking_pids(integer) + function citus_cleanup_orphaned_shards() function citus_conninfo_cache_invalidate() function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode) function citus_create_restore_point(text) @@ -245,5 +246,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(229 rows) +(230 rows) diff --git a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec index ee225f13a..150776aba 100644 --- a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec +++ b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec @@ -26,12 +26,11 @@ setup SELECT citus_internal.replace_isolation_tester_func(); SELECT citus_internal.refresh_isolation_tester_prepared_statement(); -CREATE OR REPLACE FUNCTION master_defer_delete_shards() - RETURNS int - LANGUAGE C STRICT - AS 'citus', $$master_defer_delete_shards$$; -COMMENT ON FUNCTION master_defer_delete_shards() - IS 'remove orphaned shards'; +CREATE OR REPLACE PROCEDURE isolation_cleanup_orphaned_shards() + LANGUAGE C + AS 'citus', $$isolation_cleanup_orphaned_shards$$; +COMMENT ON PROCEDURE isolation_cleanup_orphaned_shards() + IS 'cleanup orphaned shards'; SET citus.next_shard_id to 120000; SET citus.shard_count TO 8; @@ -71,7 +70,8 @@ step "s1-move-placement-without-deferred" { step "s1-drop-marked-shards" { - SELECT public.master_defer_delete_shards(); + SET client_min_messages to NOTICE; + CALL isolation_cleanup_orphaned_shards(); } step "s1-lock-pg-dist-placement" { @@ -116,7 +116,7 @@ step "s2-select" { step "s2-drop-marked-shards" { SET client_min_messages to DEBUG1; - SELECT public.master_defer_delete_shards(); + CALL isolation_cleanup_orphaned_shards(); } step "s2-commit" { diff --git a/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql b/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql index 6df09f061..546eeb89e 100644 --- a/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql +++ b/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql @@ -45,14 +45,14 @@ SELECT master_move_shard_placement(15000009, 'localhost', :worker_1_port, 'local SELECT count(*) FROM referencing_table2; -SELECT 1 FROM public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; SELECT master_move_shard_placement(15000009, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); SELECT count(*) FROM referencing_table2; -SELECT 1 FROM public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; -- create a function to show the diff --git a/src/test/regress/sql/multi_test_helpers_superuser.sql b/src/test/regress/sql/multi_test_helpers_superuser.sql index 4026c2f00..0bd360b12 100644 --- a/src/test/regress/sql/multi_test_helpers_superuser.sql +++ b/src/test/regress/sql/multi_test_helpers_superuser.sql @@ -1,10 +1,3 @@ -CREATE OR REPLACE FUNCTION master_defer_delete_shards() - RETURNS int - LANGUAGE C STRICT - AS 'citus', $$master_defer_delete_shards$$; -COMMENT ON FUNCTION master_defer_delete_shards() - IS 'remove orphaned shards'; - CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) RETURNS void LANGUAGE C STRICT diff --git a/src/test/regress/sql/shard_move_deferred_delete.sql b/src/test/regress/sql/shard_move_deferred_delete.sql index ac82b0680..e33bc3f82 100644 --- a/src/test/regress/sql/shard_move_deferred_delete.sql +++ b/src/test/regress/sql/shard_move_deferred_delete.sql @@ -31,8 +31,13 @@ SELECT run_command_on_workers($cmd$ SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; $cmd$); +-- Make sure this cannot be run in a transaction +BEGIN; +CALL citus_cleanup_orphaned_shards(); +COMMIT; + -- execute delayed removal -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- we expect the shard to be on only the second worker SELECT run_command_on_workers($cmd$ diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index ce0d0899c..636fa2341 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -13,9 +13,9 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); -- should just be noops even if we add the coordinator to the pg_dist_node SELECT rebalance_table_shards('dist_table_test'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT rebalance_table_shards(); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- test that calling rebalance_table_shards without specifying relation @@ -25,7 +25,7 @@ SELECT citus_add_local_table_to_metadata('citus_local_table'); INSERT INTO citus_local_table VALUES (1, 2); SELECT rebalance_table_shards(); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; @@ -38,14 +38,14 @@ SELECT pg_reload_conf(); SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC SELECT master_drain_node('localhost', :master_port); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC SELECT master_drain_node('localhost', :master_port); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; @@ -396,7 +396,7 @@ AS $$ pg_dist_shard_placement src USING (shardid), (SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass; - SELECT public.master_defer_delete_shards(); + CALL citus_cleanup_orphaned_shards(); $$; CALL create_unbalanced_shards('rebalance_test_table'); @@ -428,7 +428,7 @@ FROM ( FROM pg_dist_shard WHERE logicalrelid = 'rebalance_test_table'::regclass ) T; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); @@ -445,7 +445,7 @@ FROM ( FROM pg_dist_shard WHERE logicalrelid = 'rebalance_test_table'::regclass ) T; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; @@ -480,26 +480,26 @@ SELECT rebalance_table_shards('rebalance_test_table', RESET ROLE; -- Confirm no moves took place at all during these errors SELECT * FROM table_placements_per_node; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; -- Check that threshold=1 doesn't move any shards SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; -- Move the remaining shards using threshold=0 SELECT rebalance_table_shards('rebalance_test_table', threshold := 0); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; @@ -507,7 +507,7 @@ SELECT * FROM table_placements_per_node; -- any effects. SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; @@ -602,11 +602,11 @@ SELECT COUNT(*) FROM imbalanced_table; -- Try force_logical SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Test rebalance operation SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Confirm rebalance -- Shard counts in each node after rebalance @@ -633,7 +633,7 @@ SELECT create_distributed_table('colocated_rebalance_test', 'id'); SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', 10000, 'block_writes') FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Try to move shards to a node where shards are not allowed SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); @@ -660,7 +660,7 @@ UPDATE pg_dist_node SET noderole = 'primary' WHERE nodeport = :worker_1_port; SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT create_distributed_table('colocated_rebalance_test2', 'id'); @@ -671,7 +671,7 @@ SELECT * FROM public.table_placements_per_node; SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0, drain_only := true); -- Running with drain_only shouldn't do anything SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Confirm that nothing changed SELECT * FROM public.table_placements_per_node; @@ -684,7 +684,7 @@ SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', rebala SELECT * FROM get_rebalance_progress(); -- Actually do the rebalance SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); @@ -702,22 +702,22 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaves SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0); SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM get_rebalance_table_shards_plan('non_colocated_rebalance_test', threshold := 0); SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- testing behaviour when setting shouldhaveshards to false and rebalancing all @@ -725,13 +725,13 @@ SELECT * FROM public.table_placements_per_node; SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); SELECT * FROM get_rebalance_table_shards_plan(threshold := 0, drain_only := true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- testing behaviour when setting shouldhaveshards to false and rebalancing all @@ -739,13 +739,13 @@ SELECT * FROM public.table_placements_per_node; SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); SELECT * FROM get_rebalance_table_shards_plan(threshold := 0); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Make it a data node again @@ -753,14 +753,14 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaves -- testing behaviour of master_drain_node SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port; SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; @@ -829,15 +829,15 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_d SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', threshold := 0); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes', threshold := 0); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Check that sizes of colocated tables are added together for rebalances @@ -888,7 +888,7 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_d -- supports improvement_threshold SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', improvement_threshold := 0); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; ANALYZE tab, tab2; @@ -945,13 +945,13 @@ SELECT citus_add_rebalance_strategy( SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_2'); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_2', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT citus_set_default_rebalance_strategy('capacity_high_worker_2'); SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; CREATE FUNCTION only_worker_1(shardid bigint, nodeidarg int) @@ -972,7 +972,7 @@ SELECT citus_add_rebalance_strategy( SELECT citus_set_default_rebalance_strategy('only_worker_1'); SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT citus_set_default_rebalance_strategy('by_shard_count'); @@ -981,18 +981,18 @@ SELECT * FROM get_rebalance_table_shards_plan('tab'); -- Check all the error handling cases SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_existing'); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT citus_set_default_rebalance_strategy('non_existing'); UPDATE pg_dist_rebalance_strategy SET default_strategy=false; SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM rebalance_table_shards('tab'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM master_drain_node('localhost', :worker_2_port); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count'; CREATE OR REPLACE FUNCTION shard_cost_no_arguments() @@ -1222,7 +1222,7 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; @@ -1255,7 +1255,7 @@ INSERT INTO r2 VALUES (1,2), (3,4); SELECT 1 from master_add_node('localhost', :worker_2_port); SELECT rebalance_table_shards(); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); DROP TABLE t1, r1, r2; @@ -1282,7 +1282,7 @@ WHERE logicalrelid = 'r1'::regclass; -- rebalance with _only_ a reference table, this should trigger the copy SELECT rebalance_table_shards(); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- verify the reference table is on all nodes after the rebalance SELECT count(*)