From 7015049ea5d545087bbd6807d24733afd2e87c69 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Wed, 2 Jun 2021 12:57:44 +0200 Subject: [PATCH 1/5] Add citus_cleanup_orphaned_shards UDF Sometimes the background daemon doesn't cleanup orphaned shards quickly enough. It's useful to have a UDF to trigger this removal when needed. We already had a UDF like this but it was only used during testing. This exposes that UDF to users. As a safety measure it cannot be run in a transaction, because that would cause the background daemon to stop cleaning up shards while this transaction is running. --- .../distributed/operations/shard_cleaner.c | 56 +++- .../distributed/sql/citus--10.0-3--10.1-1.sql | 2 + .../sql/downgrades/citus--10.1-1--10.0-3.sql | 1 + .../citus_cleanup_orphaned_shards/10.1-1.sql | 5 + .../citus_cleanup_orphaned_shards/latest.sql | 5 + ...reign_key_to_reference_shard_rebalance.out | 16 +- .../isolation_rebalancer_deferred_drop.out | 32 +- src/test/regress/expected/multi_extension.out | 3 +- .../expected/multi_test_helpers_superuser.out | 6 - .../expected/shard_move_deferred_delete.out | 13 +- .../regress/expected/shard_rebalancer.out | 300 +++--------------- .../expected/upgrade_list_citus_objects.out | 3 +- .../isolation_rebalancer_deferred_drop.spec | 16 +- ...reign_key_to_reference_shard_rebalance.sql | 4 +- .../sql/multi_test_helpers_superuser.sql | 7 - .../sql/shard_move_deferred_delete.sql | 7 +- src/test/regress/sql/shard_rebalancer.sql | 86 ++--- 17 files changed, 190 insertions(+), 372 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/10.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/latest.sql diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index ab7fdfaf9..80b243d51 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -12,6 +12,7 @@ #include "postgres.h" +#include "access/xact.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" @@ -21,33 +22,66 @@ /* declarations for dynamic loading */ -PG_FUNCTION_INFO_V1(master_defer_delete_shards); +PG_FUNCTION_INFO_V1(citus_cleanup_orphaned_shards); +PG_FUNCTION_INFO_V1(isolation_cleanup_orphaned_shards); static bool TryDropShard(GroupShardPlacement *placement); static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode); + /* - * master_defer_delete_shards implements a user-facing UDF to deleter orphaned shards that - * are still haning around in the system. These shards are orphaned by previous actions - * that were not directly able to delete the placements eg. shard moving or dropping of a - * distributed table while one of the data nodes was not online. + * citus_cleanup_orphaned_shards implements a user-facing UDF to delete + * orphaned shards that are still haning around in the system. These shards are + * orphaned by previous actions that were not directly able to delete the + * placements eg. shard moving or dropping of a distributed table while one of + * the data nodes was not online. * - * This function iterates through placements where shardstate is SHARD_STATE_TO_DELETE - * (shardstate = 4), drops the corresponding tables from the node and removes the - * placement information from the catalog. + * This function iterates through placements where shardstate is + * SHARD_STATE_TO_DELETE (shardstate = 4), drops the corresponding tables from + * the node and removes the placement information from the catalog. * - * The function takes no arguments and runs cluster wide + * The function takes no arguments and runs cluster wide. It cannot be run in a + * transaction, because holding the locks it takes for a long time is not good. + * While the locks are held, it is impossible for the background daemon to + * cleanup orphaned shards. */ Datum -master_defer_delete_shards(PG_FUNCTION_ARGS) +citus_cleanup_orphaned_shards(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + PreventInTransactionBlock(true, "citus_cleanup_orphaned_shards"); + + bool waitForLocks = true; + int droppedShardCount = DropMarkedShards(waitForLocks); + if (droppedShardCount > 0) + { + ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount))); + } + + PG_RETURN_VOID(); +} + + +/* + * isolation_cleanup_orphaned_shards implements a test UDF that's the same as + * citus_cleanup_orphaned_shards. The only difference is that this command can + * be run in transactions, this is to test + */ +Datum +isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); EnsureCoordinator(); bool waitForLocks = true; int droppedShardCount = DropMarkedShards(waitForLocks); + if (droppedShardCount > 0) + { + ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount))); + } - PG_RETURN_INT32(droppedShardCount); + PG_RETURN_VOID(); } diff --git a/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql b/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql index 387db171e..ec5a122fa 100644 --- a/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql +++ b/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql @@ -47,3 +47,5 @@ WHERE repmodel = 'c' DROP TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger ON pg_catalog.pg_dist_rebalance_strategy; DROP FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check(); + +#include "udfs/citus_cleanup_orphaned_shards/10.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql b/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql index af5c3b55b..5946473f9 100644 --- a/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql +++ b/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql @@ -84,3 +84,4 @@ CREATE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger BEFORE INSERT OR UPDATE OR DELETE OR TRUNCATE ON pg_dist_rebalance_strategy FOR EACH STATEMENT EXECUTE FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check(); +DROP PROCEDURE pg_catalog.citus_cleanup_orphaned_shards(); diff --git a/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/10.1-1.sql b/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/10.1-1.sql new file mode 100644 index 000000000..ed5723602 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/10.1-1.sql @@ -0,0 +1,5 @@ +CREATE OR REPLACE PROCEDURE pg_catalog.citus_cleanup_orphaned_shards() + LANGUAGE C + AS 'citus', $$citus_cleanup_orphaned_shards$$; +COMMENT ON PROCEDURE pg_catalog.citus_cleanup_orphaned_shards() + IS 'cleanup orphaned shards'; diff --git a/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/latest.sql b/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/latest.sql new file mode 100644 index 000000000..ed5723602 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/latest.sql @@ -0,0 +1,5 @@ +CREATE OR REPLACE PROCEDURE pg_catalog.citus_cleanup_orphaned_shards() + LANGUAGE C + AS 'citus', $$citus_cleanup_orphaned_shards$$; +COMMENT ON PROCEDURE pg_catalog.citus_cleanup_orphaned_shards() + IS 'cleanup orphaned shards'; diff --git a/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out b/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out index a144f48a5..bedfcc6f7 100644 --- a/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out +++ b/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out @@ -61,12 +61,8 @@ SELECT count(*) FROM referencing_table2; 101 (1 row) -SELECT 1 FROM public.master_defer_delete_shards(); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 2 orphaned shards SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; name | relid | refd_relid --------------------------------------------------------------------- @@ -108,12 +104,8 @@ SELECT count(*) FROM referencing_table2; 101 (1 row) -SELECT 1 FROM public.master_defer_delete_shards(); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 2 orphaned shards SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; name | relid | refd_relid --------------------------------------------------------------------- diff --git a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out index f11065c1b..187d33632 100644 --- a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out +++ b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out @@ -10,23 +10,19 @@ step s1-move-placement: master_move_shard_placement +s1: NOTICE: cleaned up 1 orphaned shards step s1-drop-marked-shards: - SELECT public.master_defer_delete_shards(); + SET client_min_messages to NOTICE; + CALL isolation_cleanup_orphaned_shards(); -master_defer_delete_shards - -1 step s2-drop-marked-shards: SET client_min_messages to DEBUG1; - SELECT public.master_defer_delete_shards(); + CALL isolation_cleanup_orphaned_shards(); step s1-commit: COMMIT; step s2-drop-marked-shards: <... completed> -master_defer_delete_shards - -0 starting permutation: s1-begin s1-move-placement s2-drop-marked-shards s1-drop-marked-shards s1-commit step s1-begin: @@ -40,17 +36,13 @@ master_move_shard_placement step s2-drop-marked-shards: SET client_min_messages to DEBUG1; - SELECT public.master_defer_delete_shards(); + CALL isolation_cleanup_orphaned_shards(); -master_defer_delete_shards - -0 +s1: NOTICE: cleaned up 1 orphaned shards step s1-drop-marked-shards: - SELECT public.master_defer_delete_shards(); + SET client_min_messages to NOTICE; + CALL isolation_cleanup_orphaned_shards(); -master_defer_delete_shards - -1 step s1-commit: COMMIT; @@ -82,14 +74,12 @@ run_commands_on_session_level_connection_to_node step s1-drop-marked-shards: - SELECT public.master_defer_delete_shards(); + SET client_min_messages to NOTICE; + CALL isolation_cleanup_orphaned_shards(); s1: WARNING: canceling statement due to lock timeout -s1: WARNING: Failed to drop 1 old shards out of 1 step s1-drop-marked-shards: <... completed> -master_defer_delete_shards - -0 +s1: WARNING: Failed to drop 1 old shards out of 1 step s1-commit: COMMIT; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 63179c7c5..04c15a84e 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -573,6 +573,7 @@ SELECT * FROM print_extension_changes(); function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint) | function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name) TABLE(table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer) | | function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) void + | function citus_cleanup_orphaned_shards() | function citus_local_disk_space_stats() record | function create_distributed_table(regclass,text,citus.distribution_type,text,integer) void | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint) @@ -580,7 +581,7 @@ SELECT * FROM print_extension_changes(); | function worker_partitioned_relation_size(regclass) bigint | function worker_partitioned_relation_total_size(regclass) bigint | function worker_partitioned_table_size(regclass) bigint -(14 rows) +(15 rows) DROP TABLE prev_objects, extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_test_helpers_superuser.out b/src/test/regress/expected/multi_test_helpers_superuser.out index cfc3cf02b..01676131c 100644 --- a/src/test/regress/expected/multi_test_helpers_superuser.out +++ b/src/test/regress/expected/multi_test_helpers_superuser.out @@ -1,9 +1,3 @@ -CREATE OR REPLACE FUNCTION master_defer_delete_shards() - RETURNS int - LANGUAGE C STRICT - AS 'citus', $$master_defer_delete_shards$$; -COMMENT ON FUNCTION master_defer_delete_shards() - IS 'remove orphaned shards'; CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) RETURNS void LANGUAGE C STRICT diff --git a/src/test/regress/expected/shard_move_deferred_delete.out b/src/test/regress/expected/shard_move_deferred_delete.out index c125e2c53..ae3fbfa00 100644 --- a/src/test/regress/expected/shard_move_deferred_delete.out +++ b/src/test/regress/expected/shard_move_deferred_delete.out @@ -50,13 +50,14 @@ $cmd$); (localhost,57638,t,1) (2 rows) +-- Make sure this cannot be run in a transaction +BEGIN; +CALL citus_cleanup_orphaned_shards(); +ERROR: citus_cleanup_orphaned_shards cannot run inside a transaction block +COMMIT; -- execute delayed removal -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 1 orphaned shards -- we expect the shard to be on only the second worker SELECT run_command_on_workers($cmd$ SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 23f100038..037107848 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -31,24 +31,14 @@ SELECT rebalance_table_shards('dist_table_test'); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT rebalance_table_shards(); rebalance_table_shards --------------------------------------------------------------------- (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- test that calling rebalance_table_shards without specifying relation -- wouldn't move shard of the citus local table. CREATE TABLE citus_local_table(a int, b int); @@ -65,12 +55,7 @@ SELECT rebalance_table_shards(); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; tablename @@ -101,12 +86,7 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running SELECT master_drain_node('localhost', :master_port); ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); pg_reload_conf @@ -126,12 +106,7 @@ SELECT master_drain_node('localhost', :master_port); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; tablename @@ -579,7 +554,7 @@ AS $$ pg_dist_shard_placement src USING (shardid), (SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass; - SELECT public.master_defer_delete_shards(); + CALL citus_cleanup_orphaned_shards(); $$; CALL create_unbalanced_shards('rebalance_test_table'); SET citus.shard_replication_factor TO 2; @@ -624,12 +599,7 @@ FROM ( WHERE logicalrelid = 'rebalance_test_table'::regclass ) T; ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); pg_reload_conf @@ -658,12 +628,7 @@ FROM ( (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -715,12 +680,7 @@ SELECT * FROM table_placements_per_node; 57638 | rebalance_test_table | 5 (2 rows) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); @@ -729,12 +689,7 @@ SELECT rebalance_table_shards('rebalance_test_table', (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -749,12 +704,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_tran (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -769,12 +719,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 0); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -790,12 +735,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_tran (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -969,12 +909,7 @@ SELECT COUNT(*) FROM imbalanced_table; -- Try force_logical SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical'); ERROR: the force_logical transfer mode is currently unsupported -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Test rebalance operation SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes'); rebalance_table_shards @@ -982,12 +917,7 @@ SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_m (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Confirm rebalance -- Shard counts in each node after rebalance SELECT * FROM public.table_placements_per_node; @@ -1024,12 +954,7 @@ FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; ERROR: Moving shards to a non-existing node is not supported HINT: Add the target node via SELECT citus_add_node('localhost', 10000); -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Try to move shards to a node where shards are not allowed SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); master_set_node_property @@ -1073,12 +998,7 @@ WHERE nodeport = :worker_2_port; (2 rows) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 2 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT create_distributed_table('colocated_rebalance_test2', 'id'); create_distributed_table --------------------------------------------------------------------- @@ -1106,12 +1026,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Confirm that nothing changed SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count @@ -1153,12 +1068,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size @@ -1216,12 +1126,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1244,12 +1149,7 @@ SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold : (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 2 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1271,12 +1171,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1293,12 +1188,7 @@ SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold : (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 2 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1335,12 +1225,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1362,12 +1247,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1404,12 +1284,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1431,12 +1306,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1462,12 +1332,7 @@ SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port; shouldhaveshards --------------------------------------------------------------------- @@ -1495,12 +1360,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1607,12 +1467,7 @@ SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes') (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1627,12 +1482,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 1 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1648,12 +1499,7 @@ DETAIL: Using threshold of 0.01 (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1759,12 +1605,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 4 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1859,12 +1701,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 3 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 3 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1888,12 +1726,7 @@ SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes') (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1943,12 +1776,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 4 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1973,20 +1802,10 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_ ERROR: could not find rebalance strategy with name non_existing SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing'); ERROR: could not find rebalance strategy with name non_existing -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing'); ERROR: could not find rebalance strategy with name non_existing -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT citus_set_default_rebalance_strategy('non_existing'); ERROR: strategy with specified name does not exist UPDATE pg_dist_rebalance_strategy SET default_strategy=false; @@ -1994,20 +1813,10 @@ SELECT * FROM get_rebalance_table_shards_plan('tab'); ERROR: no rebalance_strategy was provided, but there is also no default strategy set SELECT * FROM rebalance_table_shards('tab'); ERROR: no rebalance_strategy was provided, but there is also no default strategy set -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM master_drain_node('localhost', :worker_2_port); ERROR: no rebalance_strategy was provided, but there is also no default strategy set -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count'; CREATE OR REPLACE FUNCTION shard_cost_no_arguments() RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql; @@ -2283,12 +2092,7 @@ SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='bloc (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 3 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; count --------------------------------------------------------------------- @@ -2350,12 +2154,7 @@ SELECT rebalance_table_shards(); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 2 -(1 row) - +CALL citus_cleanup_orphaned_shards(); DROP TABLE t1, r1, r2; -- verify there are no distributed tables before we perform the following tests. Preceding -- test suites should clean up their distributed tables. @@ -2404,12 +2203,7 @@ SELECT rebalance_table_shards(); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- verify the reference table is on all nodes after the rebalance SELECT count(*) FROM pg_dist_shard diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 064e40e89..2b7d7468d 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -38,6 +38,7 @@ ORDER BY 1; function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) function citus_add_secondary_node(text,integer,text,integer,name) function citus_blocking_pids(integer) + function citus_cleanup_orphaned_shards() function citus_conninfo_cache_invalidate() function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode) function citus_create_restore_point(text) @@ -245,5 +246,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(229 rows) +(230 rows) diff --git a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec index ee225f13a..150776aba 100644 --- a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec +++ b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec @@ -26,12 +26,11 @@ setup SELECT citus_internal.replace_isolation_tester_func(); SELECT citus_internal.refresh_isolation_tester_prepared_statement(); -CREATE OR REPLACE FUNCTION master_defer_delete_shards() - RETURNS int - LANGUAGE C STRICT - AS 'citus', $$master_defer_delete_shards$$; -COMMENT ON FUNCTION master_defer_delete_shards() - IS 'remove orphaned shards'; +CREATE OR REPLACE PROCEDURE isolation_cleanup_orphaned_shards() + LANGUAGE C + AS 'citus', $$isolation_cleanup_orphaned_shards$$; +COMMENT ON PROCEDURE isolation_cleanup_orphaned_shards() + IS 'cleanup orphaned shards'; SET citus.next_shard_id to 120000; SET citus.shard_count TO 8; @@ -71,7 +70,8 @@ step "s1-move-placement-without-deferred" { step "s1-drop-marked-shards" { - SELECT public.master_defer_delete_shards(); + SET client_min_messages to NOTICE; + CALL isolation_cleanup_orphaned_shards(); } step "s1-lock-pg-dist-placement" { @@ -116,7 +116,7 @@ step "s2-select" { step "s2-drop-marked-shards" { SET client_min_messages to DEBUG1; - SELECT public.master_defer_delete_shards(); + CALL isolation_cleanup_orphaned_shards(); } step "s2-commit" { diff --git a/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql b/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql index 6df09f061..546eeb89e 100644 --- a/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql +++ b/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql @@ -45,14 +45,14 @@ SELECT master_move_shard_placement(15000009, 'localhost', :worker_1_port, 'local SELECT count(*) FROM referencing_table2; -SELECT 1 FROM public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; SELECT master_move_shard_placement(15000009, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); SELECT count(*) FROM referencing_table2; -SELECT 1 FROM public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; -- create a function to show the diff --git a/src/test/regress/sql/multi_test_helpers_superuser.sql b/src/test/regress/sql/multi_test_helpers_superuser.sql index 4026c2f00..0bd360b12 100644 --- a/src/test/regress/sql/multi_test_helpers_superuser.sql +++ b/src/test/regress/sql/multi_test_helpers_superuser.sql @@ -1,10 +1,3 @@ -CREATE OR REPLACE FUNCTION master_defer_delete_shards() - RETURNS int - LANGUAGE C STRICT - AS 'citus', $$master_defer_delete_shards$$; -COMMENT ON FUNCTION master_defer_delete_shards() - IS 'remove orphaned shards'; - CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) RETURNS void LANGUAGE C STRICT diff --git a/src/test/regress/sql/shard_move_deferred_delete.sql b/src/test/regress/sql/shard_move_deferred_delete.sql index ac82b0680..e33bc3f82 100644 --- a/src/test/regress/sql/shard_move_deferred_delete.sql +++ b/src/test/regress/sql/shard_move_deferred_delete.sql @@ -31,8 +31,13 @@ SELECT run_command_on_workers($cmd$ SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; $cmd$); +-- Make sure this cannot be run in a transaction +BEGIN; +CALL citus_cleanup_orphaned_shards(); +COMMIT; + -- execute delayed removal -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- we expect the shard to be on only the second worker SELECT run_command_on_workers($cmd$ diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index ce0d0899c..636fa2341 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -13,9 +13,9 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); -- should just be noops even if we add the coordinator to the pg_dist_node SELECT rebalance_table_shards('dist_table_test'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT rebalance_table_shards(); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- test that calling rebalance_table_shards without specifying relation @@ -25,7 +25,7 @@ SELECT citus_add_local_table_to_metadata('citus_local_table'); INSERT INTO citus_local_table VALUES (1, 2); SELECT rebalance_table_shards(); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; @@ -38,14 +38,14 @@ SELECT pg_reload_conf(); SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC SELECT master_drain_node('localhost', :master_port); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC SELECT master_drain_node('localhost', :master_port); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; @@ -396,7 +396,7 @@ AS $$ pg_dist_shard_placement src USING (shardid), (SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass; - SELECT public.master_defer_delete_shards(); + CALL citus_cleanup_orphaned_shards(); $$; CALL create_unbalanced_shards('rebalance_test_table'); @@ -428,7 +428,7 @@ FROM ( FROM pg_dist_shard WHERE logicalrelid = 'rebalance_test_table'::regclass ) T; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); @@ -445,7 +445,7 @@ FROM ( FROM pg_dist_shard WHERE logicalrelid = 'rebalance_test_table'::regclass ) T; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; @@ -480,26 +480,26 @@ SELECT rebalance_table_shards('rebalance_test_table', RESET ROLE; -- Confirm no moves took place at all during these errors SELECT * FROM table_placements_per_node; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; -- Check that threshold=1 doesn't move any shards SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; -- Move the remaining shards using threshold=0 SELECT rebalance_table_shards('rebalance_test_table', threshold := 0); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; @@ -507,7 +507,7 @@ SELECT * FROM table_placements_per_node; -- any effects. SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; @@ -602,11 +602,11 @@ SELECT COUNT(*) FROM imbalanced_table; -- Try force_logical SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Test rebalance operation SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Confirm rebalance -- Shard counts in each node after rebalance @@ -633,7 +633,7 @@ SELECT create_distributed_table('colocated_rebalance_test', 'id'); SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', 10000, 'block_writes') FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Try to move shards to a node where shards are not allowed SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); @@ -660,7 +660,7 @@ UPDATE pg_dist_node SET noderole = 'primary' WHERE nodeport = :worker_1_port; SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT create_distributed_table('colocated_rebalance_test2', 'id'); @@ -671,7 +671,7 @@ SELECT * FROM public.table_placements_per_node; SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0, drain_only := true); -- Running with drain_only shouldn't do anything SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Confirm that nothing changed SELECT * FROM public.table_placements_per_node; @@ -684,7 +684,7 @@ SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', rebala SELECT * FROM get_rebalance_progress(); -- Actually do the rebalance SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); @@ -702,22 +702,22 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaves SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0); SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM get_rebalance_table_shards_plan('non_colocated_rebalance_test', threshold := 0); SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- testing behaviour when setting shouldhaveshards to false and rebalancing all @@ -725,13 +725,13 @@ SELECT * FROM public.table_placements_per_node; SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); SELECT * FROM get_rebalance_table_shards_plan(threshold := 0, drain_only := true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- testing behaviour when setting shouldhaveshards to false and rebalancing all @@ -739,13 +739,13 @@ SELECT * FROM public.table_placements_per_node; SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); SELECT * FROM get_rebalance_table_shards_plan(threshold := 0); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Make it a data node again @@ -753,14 +753,14 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaves -- testing behaviour of master_drain_node SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port; SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; @@ -829,15 +829,15 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_d SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', threshold := 0); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes', threshold := 0); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Check that sizes of colocated tables are added together for rebalances @@ -888,7 +888,7 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_d -- supports improvement_threshold SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', improvement_threshold := 0); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; ANALYZE tab, tab2; @@ -945,13 +945,13 @@ SELECT citus_add_rebalance_strategy( SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_2'); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_2', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT citus_set_default_rebalance_strategy('capacity_high_worker_2'); SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; CREATE FUNCTION only_worker_1(shardid bigint, nodeidarg int) @@ -972,7 +972,7 @@ SELECT citus_add_rebalance_strategy( SELECT citus_set_default_rebalance_strategy('only_worker_1'); SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT citus_set_default_rebalance_strategy('by_shard_count'); @@ -981,18 +981,18 @@ SELECT * FROM get_rebalance_table_shards_plan('tab'); -- Check all the error handling cases SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_existing'); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT citus_set_default_rebalance_strategy('non_existing'); UPDATE pg_dist_rebalance_strategy SET default_strategy=false; SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM rebalance_table_shards('tab'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM master_drain_node('localhost', :worker_2_port); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count'; CREATE OR REPLACE FUNCTION shard_cost_no_arguments() @@ -1222,7 +1222,7 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; @@ -1255,7 +1255,7 @@ INSERT INTO r2 VALUES (1,2), (3,4); SELECT 1 from master_add_node('localhost', :worker_2_port); SELECT rebalance_table_shards(); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); DROP TABLE t1, r1, r2; @@ -1282,7 +1282,7 @@ WHERE logicalrelid = 'r1'::regclass; -- rebalance with _only_ a reference table, this should trigger the copy SELECT rebalance_table_shards(); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- verify the reference table is on all nodes after the rebalance SELECT count(*) From 280b9ae018ba5a87acaa3392a1c91379b7bc52c1 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Wed, 2 Jun 2021 16:48:45 +0200 Subject: [PATCH 2/5] Cleanup orphaned shards at the start of a rebalance In case the background daemon hasn't cleaned up shards yet, we do this manually at the start of a rebalance. --- .../distributed/operations/shard_cleaner.c | 18 +++++++ .../distributed/operations/shard_rebalancer.c | 3 ++ src/include/distributed/shard_cleaner.h | 1 + .../regress/expected/shard_rebalancer.out | 53 ++++++++++++++++++- src/test/regress/sql/shard_rebalancer.sql | 21 +++++++- 5 files changed, 93 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index 80b243d51..d01fd6eb3 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -13,10 +13,12 @@ #include "postgres.h" #include "access/xact.h" +#include "postmaster/postmaster.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/shard_cleaner.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/worker_transaction.h" @@ -85,6 +87,22 @@ isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS) } +/* + * DropMarkedShardsInDifferentTransaction cleans up orphaned shards by + * connecting to localhost. This is done, so that the locks that + * DropMarkedShards takes are only held for a short time. + */ +void +DropMarkedShardsInDifferentTransaction(void) +{ + int connectionFlag = FORCE_NEW_CONNECTION; + MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, + PostPortNumber); + ExecuteCriticalRemoteCommand(connection, "CALL citus_cleanup_orphaned_shards();"); + CloseConnection(connection); +} + + /* * TryDropMarkedShards is a wrapper around DropMarkedShards that catches * any errors to make it safe to use in the maintenance daemon. diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 92f37ec05..d9832f306 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -43,6 +43,7 @@ #include "distributed/repair_shards.h" #include "distributed/resource_lock.h" #include "distributed/shard_rebalancer.h" +#include "distributed/shard_cleaner.h" #include "distributed/tuplestore.h" #include "distributed/worker_protocol.h" #include "funcapi.h" @@ -700,6 +701,8 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, "unsupported"))); } + DropMarkedShardsInDifferentTransaction(); + foreach(placementUpdateCell, placementUpdateList) { PlacementUpdateEvent *placementUpdate = lfirst(placementUpdateCell); diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index b48d89522..103097397 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -19,5 +19,6 @@ extern bool CheckAvailableSpaceBeforeMove; extern int TryDropMarkedShards(bool waitForLocks); extern int DropMarkedShards(bool waitForLocks); +extern void DropMarkedShardsInDifferentTransaction(void); #endif /*CITUS_SHARD_CLEANER_H */ diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 037107848..33852f037 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -156,7 +156,6 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running SET citus.shard_replication_factor TO 2; SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes'); -NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); @@ -1075,7 +1074,57 @@ SELECT * FROM get_rebalance_progress(); --------------------------------------------------------------------- (0 rows) --- Confirm that the nodes are now there +-- Confirm that the shards are now there +SELECT * FROM public.table_placements_per_node; + nodeport | logicalrelid | count +--------------------------------------------------------------------- + 57637 | colocated_rebalance_test | 2 + 57638 | colocated_rebalance_test | 2 + 57637 | colocated_rebalance_test2 | 2 + 57638 | colocated_rebalance_test2 | 2 +(4 rows) + +CALL citus_cleanup_orphaned_shards(); +select * from pg_dist_placement; + placementid | shardid | shardstate | shardlength | groupid +--------------------------------------------------------------------- + 135 | 123023 | 1 | 0 | 14 + 138 | 123024 | 1 | 0 | 14 + 141 | 123027 | 1 | 0 | 14 + 142 | 123028 | 1 | 0 | 14 + 143 | 123021 | 1 | 0 | 16 + 144 | 123025 | 1 | 0 | 16 + 145 | 123022 | 1 | 0 | 16 + 146 | 123026 | 1 | 0 | 16 +(8 rows) + +-- Move all shards to worker1 again +SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') +FROM pg_dist_shard NATURAL JOIN pg_dist_placement NATURAL JOIN pg_dist_node +WHERE nodeport = :worker_2_port AND logicalrelid = 'colocated_rebalance_test'::regclass; + master_move_shard_placement +--------------------------------------------------------------------- + + +(2 rows) + +-- Confirm that the shards are now all on worker1 +SELECT * FROM public.table_placements_per_node; + nodeport | logicalrelid | count +--------------------------------------------------------------------- + 57637 | colocated_rebalance_test | 4 + 57637 | colocated_rebalance_test2 | 4 +(2 rows) + +-- Explicitly don't run citus_cleanup_orphaned_shards, rebalance_table_shards +-- should do that for automatically. +SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +-- Confirm that the shards are now moved SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index 636fa2341..0e41bac74 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -688,7 +688,26 @@ CALL citus_cleanup_orphaned_shards(); -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); --- Confirm that the nodes are now there +-- Confirm that the shards are now there +SELECT * FROM public.table_placements_per_node; + +CALL citus_cleanup_orphaned_shards(); +select * from pg_dist_placement; + + +-- Move all shards to worker1 again +SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') +FROM pg_dist_shard NATURAL JOIN pg_dist_placement NATURAL JOIN pg_dist_node +WHERE nodeport = :worker_2_port AND logicalrelid = 'colocated_rebalance_test'::regclass; + +-- Confirm that the shards are now all on worker1 +SELECT * FROM public.table_placements_per_node; + +-- Explicitly don't run citus_cleanup_orphaned_shards, rebalance_table_shards +-- should do that for automatically. +SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); + +-- Confirm that the shards are now moved SELECT * FROM public.table_placements_per_node; From 503c70b619b39f15d90cc196948828b414369e02 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Thu, 3 Jun 2021 11:38:27 +0200 Subject: [PATCH 3/5] Cleanup orphaned shards before moving when necessary A shard move would fail if there was an orphaned version of the shard on the target node. With this change before actually fail, we try to clean up orphaned shards to see if that fixes the issue. --- .../distributed/operations/repair_shards.c | 29 ++++++++--- .../isolation_rebalancer_deferred_drop.out | 51 +++++++++++++++++++ .../expected/shard_move_deferred_delete.out | 10 ++-- .../isolation_rebalancer_deferred_drop.spec | 9 ++++ .../sql/shard_move_deferred_delete.sql | 3 +- 5 files changed, 92 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 59e87f74a..7938a954f 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -1063,12 +1063,29 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo { if (targetPlacement->shardState == SHARD_STATE_TO_DELETE) { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg( - "shard " INT64_FORMAT " already exists in the target node", - shardId), - errdetail( - "The existing shard is marked for deletion, but could not be deleted because there are still active queries on it"))); + /* + * Trigger deletion of orphaned shards and hope that this removes + * the shard. + */ + DropMarkedShardsInDifferentTransaction(); + shardPlacementList = ShardPlacementList(shardId); + targetPlacement = SearchShardPlacementInList(shardPlacementList, + targetNodeName, + targetNodePort); + + /* + * If it still doesn't remove the shard, then we error. + */ + if (targetPlacement != NULL) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg( + "shard " INT64_FORMAT + " still exists on the target node as an orphaned shard", + shardId), + errdetail( + "The existing shard is orphaned, but could not be deleted because there are still active queries on it"))); + } } else { diff --git a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out index 187d33632..0a982936e 100644 --- a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out +++ b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out @@ -90,6 +90,57 @@ stop_session_level_connection_to_node +starting permutation: s1-begin s1-move-placement s2-start-session-level-connection s2-lock-table-on-worker s1-commit s1-begin s1-move-placement-back s1-commit s2-stop-connection +step s1-begin: + BEGIN; + +step s1-move-placement: + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + +master_move_shard_placement + + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node + + +step s2-lock-table-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN;'); + SELECT run_commands_on_session_level_connection_to_node('LOCK TABLE t1_120000'); + +run_commands_on_session_level_connection_to_node + + +run_commands_on_session_level_connection_to_node + + +step s1-commit: + COMMIT; + +step s1-begin: + BEGIN; + +step s1-move-placement-back: + SET client_min_messages to NOTICE; + SHOW log_error_verbosity; + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57638, 'localhost', 57637); + +log_error_verbosity + +verbose +ERROR: shard xxxxx still exists on the target node as an orphaned shard +step s1-commit: + COMMIT; + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + + starting permutation: s1-begin s1-lock-pg-dist-placement s2-drop-old-shards s1-commit step s1-begin: BEGIN; diff --git a/src/test/regress/expected/shard_move_deferred_delete.out b/src/test/regress/expected/shard_move_deferred_delete.out index ae3fbfa00..ed0d46500 100644 --- a/src/test/regress/expected/shard_move_deferred_delete.out +++ b/src/test/regress/expected/shard_move_deferred_delete.out @@ -134,10 +134,14 @@ $cmd$); (localhost,57638,t,1) (2 rows) --- we expect to get an error since the old placement is still there +-- master_move_shard_placement automatically cleans up orphaned shards if +-- needed. SELECT master_move_shard_placement(20000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); -ERROR: shard xxxxx already exists in the target node -DETAIL: The existing shard is marked for deletion, but could not be deleted because there are still active queries on it + master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + SELECT run_command_on_workers($cmd$ -- override the function for testing purpose create or replace function pg_catalog.citus_local_disk_space_stats(OUT available_disk_size bigint, OUT total_disk_size bigint) diff --git a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec index 150776aba..bf8a10eb7 100644 --- a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec +++ b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec @@ -62,6 +62,13 @@ step "s1-move-placement" SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); } +step "s1-move-placement-back" +{ + SET client_min_messages to NOTICE; + SHOW log_error_verbosity; + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57638, 'localhost', 57637); +} + step "s1-move-placement-without-deferred" { SET citus.defer_drop_after_shard_move TO OFF; SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); @@ -127,6 +134,8 @@ step "s2-commit" { permutation "s1-begin" "s1-move-placement" "s1-drop-marked-shards" "s2-drop-marked-shards" "s1-commit" permutation "s1-begin" "s1-move-placement" "s2-drop-marked-shards" "s1-drop-marked-shards" "s1-commit" permutation "s1-begin" "s1-move-placement" "s2-start-session-level-connection" "s2-lock-table-on-worker" "s1-drop-marked-shards" "s1-commit" "s2-stop-connection" +// make sure we give a clear error when we try to replace an orphaned shard that is still in use +permutation "s1-begin" "s1-move-placement" "s2-start-session-level-connection" "s2-lock-table-on-worker" "s1-commit" "s1-begin" "s1-move-placement-back" "s1-commit" "s2-stop-connection" // make sure we error if we cannot get the lock on pg_dist_placement permutation "s1-begin" "s1-lock-pg-dist-placement" "s2-drop-old-shards" "s1-commit" permutation "s1-begin" "s2-begin" "s2-select" "s1-move-placement-without-deferred" "s2-commit" "s1-commit" diff --git a/src/test/regress/sql/shard_move_deferred_delete.sql b/src/test/regress/sql/shard_move_deferred_delete.sql index e33bc3f82..a052590d5 100644 --- a/src/test/regress/sql/shard_move_deferred_delete.sql +++ b/src/test/regress/sql/shard_move_deferred_delete.sql @@ -75,7 +75,8 @@ SELECT run_command_on_workers($cmd$ SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; $cmd$); --- we expect to get an error since the old placement is still there +-- master_move_shard_placement automatically cleans up orphaned shards if +-- needed. SELECT master_move_shard_placement(20000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); From 3f60e4f3944d2024dae62f970dee7e23c932e966 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Fri, 4 Jun 2021 11:16:12 +0200 Subject: [PATCH 4/5] Add ExecuteCriticalCommandInDifferentTransaction function We use this pattern multiple times throughout the codebase now. Seems like a good moment to abstract it away. --- .../distributed/operations/repair_shards.c | 2 +- .../distributed/operations/shard_cleaner.c | 11 +++---- .../distributed/operations/shard_rebalancer.c | 32 ++++++++++++------- src/include/distributed/shard_cleaner.h | 2 +- src/include/distributed/shard_rebalancer.h | 1 + 5 files changed, 28 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 7938a954f..121b0afdc 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -1067,7 +1067,7 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo * Trigger deletion of orphaned shards and hope that this removes * the shard. */ - DropMarkedShardsInDifferentTransaction(); + DropMarkedShardsInSeparateTransaction(); shardPlacementList = ShardPlacementList(shardId); targetPlacement = SearchShardPlacementInList(shardPlacementList, targetNodeName, diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index d01fd6eb3..bbd6a666c 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -18,6 +18,7 @@ #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/shard_cleaner.h" +#include "distributed/shard_rebalancer.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/worker_transaction.h" @@ -88,18 +89,14 @@ isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS) /* - * DropMarkedShardsInDifferentTransaction cleans up orphaned shards by + * DropMarkedShardsInSeparateTransaction cleans up orphaned shards by * connecting to localhost. This is done, so that the locks that * DropMarkedShards takes are only held for a short time. */ void -DropMarkedShardsInDifferentTransaction(void) +DropMarkedShardsInSeparateTransaction(void) { - int connectionFlag = FORCE_NEW_CONNECTION; - MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, - PostPortNumber); - ExecuteCriticalRemoteCommand(connection, "CALL citus_cleanup_orphaned_shards();"); - CloseConnection(connection); + ExecuteCriticalCommandInSeparateTransaction("CALL citus_cleanup_orphaned_shards()"); } diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index d9832f306..93716e624 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -701,7 +701,7 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, "unsupported"))); } - DropMarkedShardsInDifferentTransaction(); + DropMarkedShardsInSeparateTransaction(); foreach(placementUpdateCell, placementUpdateList) { @@ -913,17 +913,15 @@ citus_drain_node(PG_FUNCTION_ARGS) }; char *nodeName = text_to_cstring(nodeNameText); - int connectionFlag = FORCE_NEW_CONNECTION; - MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, - PostPortNumber); /* * This is done in a separate session. This way it's not undone if the * draining fails midway through. */ - ExecuteCriticalRemoteCommand(connection, psprintf( - "SELECT master_set_node_property(%s, %i, 'shouldhaveshards', false)", - quote_literal_cstr(nodeName), nodePort)); + ExecuteCriticalCommandInSeparateTransaction(psprintf( + "SELECT master_set_node_property(%s, %i, 'shouldhaveshards', false)", + quote_literal_cstr(nodeName), + nodePort)); RebalanceTableShards(&options, shardTransferModeOid); @@ -1695,20 +1693,32 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, REBALANCE_PROGRESS_MOVING); ConflictShardPlacementUpdateOnlyWithIsolationTesting(shardId); - int connectionFlag = FORCE_NEW_CONNECTION; - MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, - PostPortNumber); /* * In case of failure, we throw an error such that rebalance_table_shards * fails early. */ - ExecuteCriticalRemoteCommand(connection, placementUpdateCommand->data); + ExecuteCriticalCommandInSeparateTransaction(placementUpdateCommand->data); UpdateColocatedShardPlacementProgress(shardId, sourceNode->workerName, sourceNode->workerPort, REBALANCE_PROGRESS_MOVED); +} + + +/* + * ExecuteCriticalCommandInSeparateTransaction runs a command in a separate + * transaction that is commited right away. This is useful for things that you + * don't want to rollback when the current transaction is rolled back. + */ +void +ExecuteCriticalCommandInSeparateTransaction(char *command) +{ + int connectionFlag = FORCE_NEW_CONNECTION; + MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, + PostPortNumber); + ExecuteCriticalRemoteCommand(connection, command); CloseConnection(connection); } diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index 103097397..83daf5cff 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -19,6 +19,6 @@ extern bool CheckAvailableSpaceBeforeMove; extern int TryDropMarkedShards(bool waitForLocks); extern int DropMarkedShards(bool waitForLocks); -extern void DropMarkedShardsInDifferentTransaction(void); +extern void DropMarkedShardsInSeparateTransaction(void); #endif /*CITUS_SHARD_CLEANER_H */ diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 096af4a58..de0684d68 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -190,6 +190,7 @@ extern List * RebalancePlacementUpdates(List *workerNodeList, List *shardPlaceme RebalancePlanFunctions *rebalancePlanFunctions); extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList, int shardReplicationFactor); +extern void ExecuteCriticalCommandInSeparateTransaction(char *command); #endif /* SHARD_REBALANCER_H */ From 1a83628195c310fdeb07370d47498fca20222b8a Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Fri, 4 Jun 2021 11:31:06 +0200 Subject: [PATCH 5/5] Use "orphaned shards" naming in more places We were not very consistent in how we named these shards. --- .../distributed/operations/repair_shards.c | 2 +- .../distributed/operations/shard_cleaner.c | 22 +++++++++---------- .../distributed/operations/shard_rebalancer.c | 2 +- src/backend/distributed/shared_library_init.c | 4 +++- .../distributed/test/shard_rebalancer.c | 4 ++-- src/backend/distributed/utils/maintenanced.c | 2 +- src/include/distributed/shard_cleaner.h | 6 ++--- .../isolation_rebalancer_deferred_drop.out | 2 +- 8 files changed, 23 insertions(+), 21 deletions(-) diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 121b0afdc..c3dfa791e 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -1067,7 +1067,7 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo * Trigger deletion of orphaned shards and hope that this removes * the shard. */ - DropMarkedShardsInSeparateTransaction(); + DropOrphanedShardsInSeparateTransaction(); shardPlacementList = ShardPlacementList(shardId); targetPlacement = SearchShardPlacementInList(shardPlacementList, targetNodeName, diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index bbd6a666c..afa206594 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -56,7 +56,7 @@ citus_cleanup_orphaned_shards(PG_FUNCTION_ARGS) PreventInTransactionBlock(true, "citus_cleanup_orphaned_shards"); bool waitForLocks = true; - int droppedShardCount = DropMarkedShards(waitForLocks); + int droppedShardCount = DropOrphanedShards(waitForLocks); if (droppedShardCount > 0) { ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount))); @@ -78,7 +78,7 @@ isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS) EnsureCoordinator(); bool waitForLocks = true; - int droppedShardCount = DropMarkedShards(waitForLocks); + int droppedShardCount = DropOrphanedShards(waitForLocks); if (droppedShardCount > 0) { ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount))); @@ -89,32 +89,32 @@ isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS) /* - * DropMarkedShardsInSeparateTransaction cleans up orphaned shards by + * DropOrphanedShardsInSeparateTransaction cleans up orphaned shards by * connecting to localhost. This is done, so that the locks that - * DropMarkedShards takes are only held for a short time. + * DropOrphanedShards takes are only held for a short time. */ void -DropMarkedShardsInSeparateTransaction(void) +DropOrphanedShardsInSeparateTransaction(void) { ExecuteCriticalCommandInSeparateTransaction("CALL citus_cleanup_orphaned_shards()"); } /* - * TryDropMarkedShards is a wrapper around DropMarkedShards that catches + * TryDropOrphanedShards is a wrapper around DropOrphanedShards that catches * any errors to make it safe to use in the maintenance daemon. * * If dropping any of the shards failed this function returns -1, otherwise it * returns the number of dropped shards. */ int -TryDropMarkedShards(bool waitForLocks) +TryDropOrphanedShards(bool waitForLocks) { int droppedShardCount = 0; MemoryContext savedContext = CurrentMemoryContext; PG_TRY(); { - droppedShardCount = DropMarkedShards(waitForLocks); + droppedShardCount = DropOrphanedShards(waitForLocks); } PG_CATCH(); { @@ -133,7 +133,7 @@ TryDropMarkedShards(bool waitForLocks) /* - * DropMarkedShards removes shards that were marked SHARD_STATE_TO_DELETE before. + * DropOrphanedShards removes shards that were marked SHARD_STATE_TO_DELETE before. * * It does so by trying to take an exclusive lock on the shard and its * colocated placements before removing. If the lock cannot be obtained it @@ -152,7 +152,7 @@ TryDropMarkedShards(bool waitForLocks) * */ int -DropMarkedShards(bool waitForLocks) +DropOrphanedShards(bool waitForLocks) { int removedShardCount = 0; ListCell *shardPlacementCell = NULL; @@ -208,7 +208,7 @@ DropMarkedShards(bool waitForLocks) if (failedShardDropCount > 0) { - ereport(WARNING, (errmsg("Failed to drop %d old shards out of %d", + ereport(WARNING, (errmsg("Failed to drop %d orphaned shards out of %d", failedShardDropCount, list_length(shardPlacementList)))); } diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 93716e624..0049d1f0e 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -701,7 +701,7 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, "unsupported"))); } - DropMarkedShardsInSeparateTransaction(); + DropOrphanedShardsInSeparateTransaction(); foreach(placementUpdateCell, placementUpdateList) { diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 1f80eb45e..3bc921c55 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -645,7 +645,9 @@ RegisterCitusConfigVariables(void) DefineCustomBoolVariable( "citus.defer_drop_after_shard_move", - gettext_noop("When enabled a shard move will mark old shards for deletion"), + gettext_noop("When enabled a shard move will mark the original shards " + "for deletion after a successful move, instead of deleting " + "them right away."), gettext_noop("The deletion of a shard can sometimes run into a conflict with a " "long running transactions on a the shard during the drop phase of " "the shard move. This causes some moves to be rolled back after " diff --git a/src/backend/distributed/test/shard_rebalancer.c b/src/backend/distributed/test/shard_rebalancer.c index d43ffe446..4cccd851d 100644 --- a/src/backend/distributed/test/shard_rebalancer.c +++ b/src/backend/distributed/test/shard_rebalancer.c @@ -74,13 +74,13 @@ typedef struct RebalancePlanContext } RebalancePlacementContext; /* - * run_try_drop_marked_shards is a wrapper to run TryDropMarkedShards. + * run_try_drop_marked_shards is a wrapper to run TryDropOrphanedShards. */ Datum run_try_drop_marked_shards(PG_FUNCTION_ARGS) { bool waitForLocks = false; - TryDropMarkedShards(waitForLocks); + TryDropOrphanedShards(waitForLocks); PG_RETURN_VOID(); } diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 293698180..2295830d6 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -645,7 +645,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) lastShardCleanTime = GetCurrentTimestamp(); bool waitForLocks = false; - numberOfDroppedShards = TryDropMarkedShards(waitForLocks); + numberOfDroppedShards = TryDropOrphanedShards(waitForLocks); } CommitTransactionCommand(); diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index 83daf5cff..8a98254f9 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -17,8 +17,8 @@ extern bool DeferShardDeleteOnMove; extern double DesiredPercentFreeAfterMove; extern bool CheckAvailableSpaceBeforeMove; -extern int TryDropMarkedShards(bool waitForLocks); -extern int DropMarkedShards(bool waitForLocks); -extern void DropMarkedShardsInSeparateTransaction(void); +extern int TryDropOrphanedShards(bool waitForLocks); +extern int DropOrphanedShards(bool waitForLocks); +extern void DropOrphanedShardsInSeparateTransaction(void); #endif /*CITUS_SHARD_CLEANER_H */ diff --git a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out index 0a982936e..199678ca0 100644 --- a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out +++ b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out @@ -79,7 +79,7 @@ step s1-drop-marked-shards: s1: WARNING: canceling statement due to lock timeout step s1-drop-marked-shards: <... completed> -s1: WARNING: Failed to drop 1 old shards out of 1 +s1: WARNING: Failed to drop 1 orphaned shards out of 1 step s1-commit: COMMIT;