diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 59e87f74a..c3dfa791e 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -1063,12 +1063,29 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo { if (targetPlacement->shardState == SHARD_STATE_TO_DELETE) { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg( - "shard " INT64_FORMAT " already exists in the target node", - shardId), - errdetail( - "The existing shard is marked for deletion, but could not be deleted because there are still active queries on it"))); + /* + * Trigger deletion of orphaned shards and hope that this removes + * the shard. + */ + DropOrphanedShardsInSeparateTransaction(); + shardPlacementList = ShardPlacementList(shardId); + targetPlacement = SearchShardPlacementInList(shardPlacementList, + targetNodeName, + targetNodePort); + + /* + * If it still doesn't remove the shard, then we error. + */ + if (targetPlacement != NULL) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg( + "shard " INT64_FORMAT + " still exists on the target node as an orphaned shard", + shardId), + errdetail( + "The existing shard is orphaned, but could not be deleted because there are still active queries on it"))); + } } else { diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index ab7fdfaf9..afa206594 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -12,60 +12,109 @@ #include "postgres.h" +#include "access/xact.h" +#include "postmaster/postmaster.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/shard_cleaner.h" +#include "distributed/shard_rebalancer.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/worker_transaction.h" /* declarations for dynamic loading */ -PG_FUNCTION_INFO_V1(master_defer_delete_shards); +PG_FUNCTION_INFO_V1(citus_cleanup_orphaned_shards); +PG_FUNCTION_INFO_V1(isolation_cleanup_orphaned_shards); static bool TryDropShard(GroupShardPlacement *placement); static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode); + /* - * master_defer_delete_shards implements a user-facing UDF to deleter orphaned shards that - * are still haning around in the system. These shards are orphaned by previous actions - * that were not directly able to delete the placements eg. shard moving or dropping of a - * distributed table while one of the data nodes was not online. + * citus_cleanup_orphaned_shards implements a user-facing UDF to delete + * orphaned shards that are still haning around in the system. These shards are + * orphaned by previous actions that were not directly able to delete the + * placements eg. shard moving or dropping of a distributed table while one of + * the data nodes was not online. * - * This function iterates through placements where shardstate is SHARD_STATE_TO_DELETE - * (shardstate = 4), drops the corresponding tables from the node and removes the - * placement information from the catalog. + * This function iterates through placements where shardstate is + * SHARD_STATE_TO_DELETE (shardstate = 4), drops the corresponding tables from + * the node and removes the placement information from the catalog. * - * The function takes no arguments and runs cluster wide + * The function takes no arguments and runs cluster wide. It cannot be run in a + * transaction, because holding the locks it takes for a long time is not good. + * While the locks are held, it is impossible for the background daemon to + * cleanup orphaned shards. */ Datum -master_defer_delete_shards(PG_FUNCTION_ARGS) +citus_cleanup_orphaned_shards(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + PreventInTransactionBlock(true, "citus_cleanup_orphaned_shards"); + + bool waitForLocks = true; + int droppedShardCount = DropOrphanedShards(waitForLocks); + if (droppedShardCount > 0) + { + ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount))); + } + + PG_RETURN_VOID(); +} + + +/* + * isolation_cleanup_orphaned_shards implements a test UDF that's the same as + * citus_cleanup_orphaned_shards. The only difference is that this command can + * be run in transactions, this is to test + */ +Datum +isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); EnsureCoordinator(); bool waitForLocks = true; - int droppedShardCount = DropMarkedShards(waitForLocks); + int droppedShardCount = DropOrphanedShards(waitForLocks); + if (droppedShardCount > 0) + { + ereport(NOTICE, (errmsg("cleaned up %d orphaned shards", droppedShardCount))); + } - PG_RETURN_INT32(droppedShardCount); + PG_RETURN_VOID(); } /* - * TryDropMarkedShards is a wrapper around DropMarkedShards that catches + * DropOrphanedShardsInSeparateTransaction cleans up orphaned shards by + * connecting to localhost. This is done, so that the locks that + * DropOrphanedShards takes are only held for a short time. + */ +void +DropOrphanedShardsInSeparateTransaction(void) +{ + ExecuteCriticalCommandInSeparateTransaction("CALL citus_cleanup_orphaned_shards()"); +} + + +/* + * TryDropOrphanedShards is a wrapper around DropOrphanedShards that catches * any errors to make it safe to use in the maintenance daemon. * * If dropping any of the shards failed this function returns -1, otherwise it * returns the number of dropped shards. */ int -TryDropMarkedShards(bool waitForLocks) +TryDropOrphanedShards(bool waitForLocks) { int droppedShardCount = 0; MemoryContext savedContext = CurrentMemoryContext; PG_TRY(); { - droppedShardCount = DropMarkedShards(waitForLocks); + droppedShardCount = DropOrphanedShards(waitForLocks); } PG_CATCH(); { @@ -84,7 +133,7 @@ TryDropMarkedShards(bool waitForLocks) /* - * DropMarkedShards removes shards that were marked SHARD_STATE_TO_DELETE before. + * DropOrphanedShards removes shards that were marked SHARD_STATE_TO_DELETE before. * * It does so by trying to take an exclusive lock on the shard and its * colocated placements before removing. If the lock cannot be obtained it @@ -103,7 +152,7 @@ TryDropMarkedShards(bool waitForLocks) * */ int -DropMarkedShards(bool waitForLocks) +DropOrphanedShards(bool waitForLocks) { int removedShardCount = 0; ListCell *shardPlacementCell = NULL; @@ -159,7 +208,7 @@ DropMarkedShards(bool waitForLocks) if (failedShardDropCount > 0) { - ereport(WARNING, (errmsg("Failed to drop %d old shards out of %d", + ereport(WARNING, (errmsg("Failed to drop %d orphaned shards out of %d", failedShardDropCount, list_length(shardPlacementList)))); } diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 92f37ec05..0049d1f0e 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -43,6 +43,7 @@ #include "distributed/repair_shards.h" #include "distributed/resource_lock.h" #include "distributed/shard_rebalancer.h" +#include "distributed/shard_cleaner.h" #include "distributed/tuplestore.h" #include "distributed/worker_protocol.h" #include "funcapi.h" @@ -700,6 +701,8 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, "unsupported"))); } + DropOrphanedShardsInSeparateTransaction(); + foreach(placementUpdateCell, placementUpdateList) { PlacementUpdateEvent *placementUpdate = lfirst(placementUpdateCell); @@ -910,17 +913,15 @@ citus_drain_node(PG_FUNCTION_ARGS) }; char *nodeName = text_to_cstring(nodeNameText); - int connectionFlag = FORCE_NEW_CONNECTION; - MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, - PostPortNumber); /* * This is done in a separate session. This way it's not undone if the * draining fails midway through. */ - ExecuteCriticalRemoteCommand(connection, psprintf( - "SELECT master_set_node_property(%s, %i, 'shouldhaveshards', false)", - quote_literal_cstr(nodeName), nodePort)); + ExecuteCriticalCommandInSeparateTransaction(psprintf( + "SELECT master_set_node_property(%s, %i, 'shouldhaveshards', false)", + quote_literal_cstr(nodeName), + nodePort)); RebalanceTableShards(&options, shardTransferModeOid); @@ -1692,20 +1693,32 @@ UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent, REBALANCE_PROGRESS_MOVING); ConflictShardPlacementUpdateOnlyWithIsolationTesting(shardId); - int connectionFlag = FORCE_NEW_CONNECTION; - MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, - PostPortNumber); /* * In case of failure, we throw an error such that rebalance_table_shards * fails early. */ - ExecuteCriticalRemoteCommand(connection, placementUpdateCommand->data); + ExecuteCriticalCommandInSeparateTransaction(placementUpdateCommand->data); UpdateColocatedShardPlacementProgress(shardId, sourceNode->workerName, sourceNode->workerPort, REBALANCE_PROGRESS_MOVED); +} + + +/* + * ExecuteCriticalCommandInSeparateTransaction runs a command in a separate + * transaction that is commited right away. This is useful for things that you + * don't want to rollback when the current transaction is rolled back. + */ +void +ExecuteCriticalCommandInSeparateTransaction(char *command) +{ + int connectionFlag = FORCE_NEW_CONNECTION; + MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, + PostPortNumber); + ExecuteCriticalRemoteCommand(connection, command); CloseConnection(connection); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 1f80eb45e..3bc921c55 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -645,7 +645,9 @@ RegisterCitusConfigVariables(void) DefineCustomBoolVariable( "citus.defer_drop_after_shard_move", - gettext_noop("When enabled a shard move will mark old shards for deletion"), + gettext_noop("When enabled a shard move will mark the original shards " + "for deletion after a successful move, instead of deleting " + "them right away."), gettext_noop("The deletion of a shard can sometimes run into a conflict with a " "long running transactions on a the shard during the drop phase of " "the shard move. This causes some moves to be rolled back after " diff --git a/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql b/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql index 387db171e..ec5a122fa 100644 --- a/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql +++ b/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql @@ -47,3 +47,5 @@ WHERE repmodel = 'c' DROP TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger ON pg_catalog.pg_dist_rebalance_strategy; DROP FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check(); + +#include "udfs/citus_cleanup_orphaned_shards/10.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql b/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql index af5c3b55b..5946473f9 100644 --- a/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql +++ b/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql @@ -84,3 +84,4 @@ CREATE TRIGGER pg_dist_rebalance_strategy_enterprise_check_trigger BEFORE INSERT OR UPDATE OR DELETE OR TRUNCATE ON pg_dist_rebalance_strategy FOR EACH STATEMENT EXECUTE FUNCTION citus_internal.pg_dist_rebalance_strategy_enterprise_check(); +DROP PROCEDURE pg_catalog.citus_cleanup_orphaned_shards(); diff --git a/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/10.1-1.sql b/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/10.1-1.sql new file mode 100644 index 000000000..ed5723602 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/10.1-1.sql @@ -0,0 +1,5 @@ +CREATE OR REPLACE PROCEDURE pg_catalog.citus_cleanup_orphaned_shards() + LANGUAGE C + AS 'citus', $$citus_cleanup_orphaned_shards$$; +COMMENT ON PROCEDURE pg_catalog.citus_cleanup_orphaned_shards() + IS 'cleanup orphaned shards'; diff --git a/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/latest.sql b/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/latest.sql new file mode 100644 index 000000000..ed5723602 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_cleanup_orphaned_shards/latest.sql @@ -0,0 +1,5 @@ +CREATE OR REPLACE PROCEDURE pg_catalog.citus_cleanup_orphaned_shards() + LANGUAGE C + AS 'citus', $$citus_cleanup_orphaned_shards$$; +COMMENT ON PROCEDURE pg_catalog.citus_cleanup_orphaned_shards() + IS 'cleanup orphaned shards'; diff --git a/src/backend/distributed/test/shard_rebalancer.c b/src/backend/distributed/test/shard_rebalancer.c index d43ffe446..4cccd851d 100644 --- a/src/backend/distributed/test/shard_rebalancer.c +++ b/src/backend/distributed/test/shard_rebalancer.c @@ -74,13 +74,13 @@ typedef struct RebalancePlanContext } RebalancePlacementContext; /* - * run_try_drop_marked_shards is a wrapper to run TryDropMarkedShards. + * run_try_drop_marked_shards is a wrapper to run TryDropOrphanedShards. */ Datum run_try_drop_marked_shards(PG_FUNCTION_ARGS) { bool waitForLocks = false; - TryDropMarkedShards(waitForLocks); + TryDropOrphanedShards(waitForLocks); PG_RETURN_VOID(); } diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 293698180..2295830d6 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -645,7 +645,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) lastShardCleanTime = GetCurrentTimestamp(); bool waitForLocks = false; - numberOfDroppedShards = TryDropMarkedShards(waitForLocks); + numberOfDroppedShards = TryDropOrphanedShards(waitForLocks); } CommitTransactionCommand(); diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index b48d89522..8a98254f9 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -17,7 +17,8 @@ extern bool DeferShardDeleteOnMove; extern double DesiredPercentFreeAfterMove; extern bool CheckAvailableSpaceBeforeMove; -extern int TryDropMarkedShards(bool waitForLocks); -extern int DropMarkedShards(bool waitForLocks); +extern int TryDropOrphanedShards(bool waitForLocks); +extern int DropOrphanedShards(bool waitForLocks); +extern void DropOrphanedShardsInSeparateTransaction(void); #endif /*CITUS_SHARD_CLEANER_H */ diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 096af4a58..de0684d68 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -190,6 +190,7 @@ extern List * RebalancePlacementUpdates(List *workerNodeList, List *shardPlaceme RebalancePlanFunctions *rebalancePlanFunctions); extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList, int shardReplicationFactor); +extern void ExecuteCriticalCommandInSeparateTransaction(char *command); #endif /* SHARD_REBALANCER_H */ diff --git a/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out b/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out index a144f48a5..bedfcc6f7 100644 --- a/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out +++ b/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out @@ -61,12 +61,8 @@ SELECT count(*) FROM referencing_table2; 101 (1 row) -SELECT 1 FROM public.master_defer_delete_shards(); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 2 orphaned shards SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; name | relid | refd_relid --------------------------------------------------------------------- @@ -108,12 +104,8 @@ SELECT count(*) FROM referencing_table2; 101 (1 row) -SELECT 1 FROM public.master_defer_delete_shards(); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 2 orphaned shards SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; name | relid | refd_relid --------------------------------------------------------------------- diff --git a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out index f11065c1b..199678ca0 100644 --- a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out +++ b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out @@ -10,23 +10,19 @@ step s1-move-placement: master_move_shard_placement +s1: NOTICE: cleaned up 1 orphaned shards step s1-drop-marked-shards: - SELECT public.master_defer_delete_shards(); + SET client_min_messages to NOTICE; + CALL isolation_cleanup_orphaned_shards(); -master_defer_delete_shards - -1 step s2-drop-marked-shards: SET client_min_messages to DEBUG1; - SELECT public.master_defer_delete_shards(); + CALL isolation_cleanup_orphaned_shards(); step s1-commit: COMMIT; step s2-drop-marked-shards: <... completed> -master_defer_delete_shards - -0 starting permutation: s1-begin s1-move-placement s2-drop-marked-shards s1-drop-marked-shards s1-commit step s1-begin: @@ -40,17 +36,13 @@ master_move_shard_placement step s2-drop-marked-shards: SET client_min_messages to DEBUG1; - SELECT public.master_defer_delete_shards(); + CALL isolation_cleanup_orphaned_shards(); -master_defer_delete_shards - -0 +s1: NOTICE: cleaned up 1 orphaned shards step s1-drop-marked-shards: - SELECT public.master_defer_delete_shards(); + SET client_min_messages to NOTICE; + CALL isolation_cleanup_orphaned_shards(); -master_defer_delete_shards - -1 step s1-commit: COMMIT; @@ -82,14 +74,63 @@ run_commands_on_session_level_connection_to_node step s1-drop-marked-shards: - SELECT public.master_defer_delete_shards(); + SET client_min_messages to NOTICE; + CALL isolation_cleanup_orphaned_shards(); s1: WARNING: canceling statement due to lock timeout -s1: WARNING: Failed to drop 1 old shards out of 1 step s1-drop-marked-shards: <... completed> -master_defer_delete_shards +s1: WARNING: Failed to drop 1 orphaned shards out of 1 +step s1-commit: + COMMIT; -0 +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + + +starting permutation: s1-begin s1-move-placement s2-start-session-level-connection s2-lock-table-on-worker s1-commit s1-begin s1-move-placement-back s1-commit s2-stop-connection +step s1-begin: + BEGIN; + +step s1-move-placement: + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + +master_move_shard_placement + + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node + + +step s2-lock-table-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN;'); + SELECT run_commands_on_session_level_connection_to_node('LOCK TABLE t1_120000'); + +run_commands_on_session_level_connection_to_node + + +run_commands_on_session_level_connection_to_node + + +step s1-commit: + COMMIT; + +step s1-begin: + BEGIN; + +step s1-move-placement-back: + SET client_min_messages to NOTICE; + SHOW log_error_verbosity; + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57638, 'localhost', 57637); + +log_error_verbosity + +verbose +ERROR: shard xxxxx still exists on the target node as an orphaned shard step s1-commit: COMMIT; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 63179c7c5..04c15a84e 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -573,6 +573,7 @@ SELECT * FROM print_extension_changes(); function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint) | function get_rebalance_table_shards_plan(regclass,real,integer,bigint[],boolean,name) TABLE(table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer) | | function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) void + | function citus_cleanup_orphaned_shards() | function citus_local_disk_space_stats() record | function create_distributed_table(regclass,text,citus.distribution_type,text,integer) void | function get_rebalance_progress() TABLE(sessionid integer, table_name regclass, shardid bigint, shard_size bigint, sourcename text, sourceport integer, targetname text, targetport integer, progress bigint, source_shard_size bigint, target_shard_size bigint) @@ -580,7 +581,7 @@ SELECT * FROM print_extension_changes(); | function worker_partitioned_relation_size(regclass) bigint | function worker_partitioned_relation_total_size(regclass) bigint | function worker_partitioned_table_size(regclass) bigint -(14 rows) +(15 rows) DROP TABLE prev_objects, extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_test_helpers_superuser.out b/src/test/regress/expected/multi_test_helpers_superuser.out index cfc3cf02b..01676131c 100644 --- a/src/test/regress/expected/multi_test_helpers_superuser.out +++ b/src/test/regress/expected/multi_test_helpers_superuser.out @@ -1,9 +1,3 @@ -CREATE OR REPLACE FUNCTION master_defer_delete_shards() - RETURNS int - LANGUAGE C STRICT - AS 'citus', $$master_defer_delete_shards$$; -COMMENT ON FUNCTION master_defer_delete_shards() - IS 'remove orphaned shards'; CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) RETURNS void LANGUAGE C STRICT diff --git a/src/test/regress/expected/shard_move_deferred_delete.out b/src/test/regress/expected/shard_move_deferred_delete.out index c125e2c53..ed0d46500 100644 --- a/src/test/regress/expected/shard_move_deferred_delete.out +++ b/src/test/regress/expected/shard_move_deferred_delete.out @@ -50,13 +50,14 @@ $cmd$); (localhost,57638,t,1) (2 rows) +-- Make sure this cannot be run in a transaction +BEGIN; +CALL citus_cleanup_orphaned_shards(); +ERROR: citus_cleanup_orphaned_shards cannot run inside a transaction block +COMMIT; -- execute delayed removal -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 1 orphaned shards -- we expect the shard to be on only the second worker SELECT run_command_on_workers($cmd$ SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; @@ -133,10 +134,14 @@ $cmd$); (localhost,57638,t,1) (2 rows) --- we expect to get an error since the old placement is still there +-- master_move_shard_placement automatically cleans up orphaned shards if +-- needed. SELECT master_move_shard_placement(20000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); -ERROR: shard xxxxx already exists in the target node -DETAIL: The existing shard is marked for deletion, but could not be deleted because there are still active queries on it + master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + SELECT run_command_on_workers($cmd$ -- override the function for testing purpose create or replace function pg_catalog.citus_local_disk_space_stats(OUT available_disk_size bigint, OUT total_disk_size bigint) diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 23f100038..33852f037 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -31,24 +31,14 @@ SELECT rebalance_table_shards('dist_table_test'); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT rebalance_table_shards(); rebalance_table_shards --------------------------------------------------------------------- (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- test that calling rebalance_table_shards without specifying relation -- wouldn't move shard of the citus local table. CREATE TABLE citus_local_table(a int, b int); @@ -65,12 +55,7 @@ SELECT rebalance_table_shards(); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; tablename @@ -101,12 +86,7 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running SELECT master_drain_node('localhost', :master_port); ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); pg_reload_conf @@ -126,12 +106,7 @@ SELECT master_drain_node('localhost', :master_port); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; tablename @@ -181,7 +156,6 @@ SELECT pg_sleep(.1); -- wait to make sure the config has changed before running SET citus.shard_replication_factor TO 2; SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes'); -NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); @@ -579,7 +553,7 @@ AS $$ pg_dist_shard_placement src USING (shardid), (SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass; - SELECT public.master_defer_delete_shards(); + CALL citus_cleanup_orphaned_shards(); $$; CALL create_unbalanced_shards('rebalance_test_table'); SET citus.shard_replication_factor TO 2; @@ -624,12 +598,7 @@ FROM ( WHERE logicalrelid = 'rebalance_test_table'::regclass ) T; ERROR: connection to the remote node foobar:57636 failed with the following error: could not translate host name "foobar" to address: -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); pg_reload_conf @@ -658,12 +627,7 @@ FROM ( (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -715,12 +679,7 @@ SELECT * FROM table_placements_per_node; 57638 | rebalance_test_table | 5 (2 rows) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); @@ -729,12 +688,7 @@ SELECT rebalance_table_shards('rebalance_test_table', (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -749,12 +703,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_tran (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -769,12 +718,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 0); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -790,12 +734,7 @@ SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_tran (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -969,12 +908,7 @@ SELECT COUNT(*) FROM imbalanced_table; -- Try force_logical SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical'); ERROR: the force_logical transfer mode is currently unsupported -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Test rebalance operation SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes'); rebalance_table_shards @@ -982,12 +916,7 @@ SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_m (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Confirm rebalance -- Shard counts in each node after rebalance SELECT * FROM public.table_placements_per_node; @@ -1024,12 +953,7 @@ FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; ERROR: Moving shards to a non-existing node is not supported HINT: Add the target node via SELECT citus_add_node('localhost', 10000); -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Try to move shards to a node where shards are not allowed SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); master_set_node_property @@ -1073,12 +997,7 @@ WHERE nodeport = :worker_2_port; (2 rows) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 2 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT create_distributed_table('colocated_rebalance_test2', 'id'); create_distributed_table --------------------------------------------------------------------- @@ -1106,12 +1025,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Confirm that nothing changed SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count @@ -1153,19 +1067,64 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); sessionid | table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport | progress | source_shard_size | target_shard_size --------------------------------------------------------------------- (0 rows) --- Confirm that the nodes are now there +-- Confirm that the shards are now there +SELECT * FROM public.table_placements_per_node; + nodeport | logicalrelid | count +--------------------------------------------------------------------- + 57637 | colocated_rebalance_test | 2 + 57638 | colocated_rebalance_test | 2 + 57637 | colocated_rebalance_test2 | 2 + 57638 | colocated_rebalance_test2 | 2 +(4 rows) + +CALL citus_cleanup_orphaned_shards(); +select * from pg_dist_placement; + placementid | shardid | shardstate | shardlength | groupid +--------------------------------------------------------------------- + 135 | 123023 | 1 | 0 | 14 + 138 | 123024 | 1 | 0 | 14 + 141 | 123027 | 1 | 0 | 14 + 142 | 123028 | 1 | 0 | 14 + 143 | 123021 | 1 | 0 | 16 + 144 | 123025 | 1 | 0 | 16 + 145 | 123022 | 1 | 0 | 16 + 146 | 123026 | 1 | 0 | 16 +(8 rows) + +-- Move all shards to worker1 again +SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') +FROM pg_dist_shard NATURAL JOIN pg_dist_placement NATURAL JOIN pg_dist_node +WHERE nodeport = :worker_2_port AND logicalrelid = 'colocated_rebalance_test'::regclass; + master_move_shard_placement +--------------------------------------------------------------------- + + +(2 rows) + +-- Confirm that the shards are now all on worker1 +SELECT * FROM public.table_placements_per_node; + nodeport | logicalrelid | count +--------------------------------------------------------------------- + 57637 | colocated_rebalance_test | 4 + 57637 | colocated_rebalance_test2 | 4 +(2 rows) + +-- Explicitly don't run citus_cleanup_orphaned_shards, rebalance_table_shards +-- should do that for automatically. +SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); + rebalance_table_shards +--------------------------------------------------------------------- + +(1 row) + +-- Confirm that the shards are now moved SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1216,12 +1175,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1244,12 +1198,7 @@ SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold : (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 2 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1271,12 +1220,7 @@ SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1293,12 +1237,7 @@ SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold : (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 2 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1335,12 +1274,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1362,12 +1296,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1404,12 +1333,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1431,12 +1355,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1462,12 +1381,7 @@ SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port; shouldhaveshards --------------------------------------------------------------------- @@ -1495,12 +1409,7 @@ SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'blo (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 6 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1607,12 +1516,7 @@ SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes') (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1627,12 +1531,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 1 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 1 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1648,12 +1548,7 @@ DETAIL: Using threshold of 0.01 (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1759,12 +1654,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 4 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1859,12 +1750,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 3 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 3 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1888,12 +1775,7 @@ SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes') (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1943,12 +1825,8 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 4 -(1 row) - +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 4 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1973,20 +1851,10 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_ ERROR: could not find rebalance strategy with name non_existing SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing'); ERROR: could not find rebalance strategy with name non_existing -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing'); ERROR: could not find rebalance strategy with name non_existing -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT citus_set_default_rebalance_strategy('non_existing'); ERROR: strategy with specified name does not exist UPDATE pg_dist_rebalance_strategy SET default_strategy=false; @@ -1994,20 +1862,10 @@ SELECT * FROM get_rebalance_table_shards_plan('tab'); ERROR: no rebalance_strategy was provided, but there is also no default strategy set SELECT * FROM rebalance_table_shards('tab'); ERROR: no rebalance_strategy was provided, but there is also no default strategy set -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT * FROM master_drain_node('localhost', :worker_2_port); ERROR: no rebalance_strategy was provided, but there is also no default strategy set -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count'; CREATE OR REPLACE FUNCTION shard_cost_no_arguments() RETURNS real AS $$ SELECT 1.0::real $$ LANGUAGE sql; @@ -2283,12 +2141,7 @@ SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='bloc (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 3 -(1 row) - +CALL citus_cleanup_orphaned_shards(); SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; count --------------------------------------------------------------------- @@ -2350,12 +2203,7 @@ SELECT rebalance_table_shards(); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 2 -(1 row) - +CALL citus_cleanup_orphaned_shards(); DROP TABLE t1, r1, r2; -- verify there are no distributed tables before we perform the following tests. Preceding -- test suites should clean up their distributed tables. @@ -2404,12 +2252,7 @@ SELECT rebalance_table_shards(); (1 row) -SELECT public.master_defer_delete_shards(); - master_defer_delete_shards ---------------------------------------------------------------------- - 0 -(1 row) - +CALL citus_cleanup_orphaned_shards(); -- verify the reference table is on all nodes after the rebalance SELECT count(*) FROM pg_dist_shard diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 064e40e89..2b7d7468d 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -38,6 +38,7 @@ ORDER BY 1; function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) function citus_add_secondary_node(text,integer,text,integer,name) function citus_blocking_pids(integer) + function citus_cleanup_orphaned_shards() function citus_conninfo_cache_invalidate() function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode) function citus_create_restore_point(text) @@ -245,5 +246,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(229 rows) +(230 rows) diff --git a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec index ee225f13a..bf8a10eb7 100644 --- a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec +++ b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec @@ -26,12 +26,11 @@ setup SELECT citus_internal.replace_isolation_tester_func(); SELECT citus_internal.refresh_isolation_tester_prepared_statement(); -CREATE OR REPLACE FUNCTION master_defer_delete_shards() - RETURNS int - LANGUAGE C STRICT - AS 'citus', $$master_defer_delete_shards$$; -COMMENT ON FUNCTION master_defer_delete_shards() - IS 'remove orphaned shards'; +CREATE OR REPLACE PROCEDURE isolation_cleanup_orphaned_shards() + LANGUAGE C + AS 'citus', $$isolation_cleanup_orphaned_shards$$; +COMMENT ON PROCEDURE isolation_cleanup_orphaned_shards() + IS 'cleanup orphaned shards'; SET citus.next_shard_id to 120000; SET citus.shard_count TO 8; @@ -63,6 +62,13 @@ step "s1-move-placement" SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); } +step "s1-move-placement-back" +{ + SET client_min_messages to NOTICE; + SHOW log_error_verbosity; + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57638, 'localhost', 57637); +} + step "s1-move-placement-without-deferred" { SET citus.defer_drop_after_shard_move TO OFF; SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); @@ -71,7 +77,8 @@ step "s1-move-placement-without-deferred" { step "s1-drop-marked-shards" { - SELECT public.master_defer_delete_shards(); + SET client_min_messages to NOTICE; + CALL isolation_cleanup_orphaned_shards(); } step "s1-lock-pg-dist-placement" { @@ -116,7 +123,7 @@ step "s2-select" { step "s2-drop-marked-shards" { SET client_min_messages to DEBUG1; - SELECT public.master_defer_delete_shards(); + CALL isolation_cleanup_orphaned_shards(); } step "s2-commit" { @@ -127,6 +134,8 @@ step "s2-commit" { permutation "s1-begin" "s1-move-placement" "s1-drop-marked-shards" "s2-drop-marked-shards" "s1-commit" permutation "s1-begin" "s1-move-placement" "s2-drop-marked-shards" "s1-drop-marked-shards" "s1-commit" permutation "s1-begin" "s1-move-placement" "s2-start-session-level-connection" "s2-lock-table-on-worker" "s1-drop-marked-shards" "s1-commit" "s2-stop-connection" +// make sure we give a clear error when we try to replace an orphaned shard that is still in use +permutation "s1-begin" "s1-move-placement" "s2-start-session-level-connection" "s2-lock-table-on-worker" "s1-commit" "s1-begin" "s1-move-placement-back" "s1-commit" "s2-stop-connection" // make sure we error if we cannot get the lock on pg_dist_placement permutation "s1-begin" "s1-lock-pg-dist-placement" "s2-drop-old-shards" "s1-commit" permutation "s1-begin" "s2-begin" "s2-select" "s1-move-placement-without-deferred" "s2-commit" "s1-commit" diff --git a/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql b/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql index 6df09f061..546eeb89e 100644 --- a/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql +++ b/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql @@ -45,14 +45,14 @@ SELECT master_move_shard_placement(15000009, 'localhost', :worker_1_port, 'local SELECT count(*) FROM referencing_table2; -SELECT 1 FROM public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; SELECT master_move_shard_placement(15000009, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes'); SELECT count(*) FROM referencing_table2; -SELECT 1 FROM public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; -- create a function to show the diff --git a/src/test/regress/sql/multi_test_helpers_superuser.sql b/src/test/regress/sql/multi_test_helpers_superuser.sql index 4026c2f00..0bd360b12 100644 --- a/src/test/regress/sql/multi_test_helpers_superuser.sql +++ b/src/test/regress/sql/multi_test_helpers_superuser.sql @@ -1,10 +1,3 @@ -CREATE OR REPLACE FUNCTION master_defer_delete_shards() - RETURNS int - LANGUAGE C STRICT - AS 'citus', $$master_defer_delete_shards$$; -COMMENT ON FUNCTION master_defer_delete_shards() - IS 'remove orphaned shards'; - CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) RETURNS void LANGUAGE C STRICT diff --git a/src/test/regress/sql/shard_move_deferred_delete.sql b/src/test/regress/sql/shard_move_deferred_delete.sql index ac82b0680..a052590d5 100644 --- a/src/test/regress/sql/shard_move_deferred_delete.sql +++ b/src/test/regress/sql/shard_move_deferred_delete.sql @@ -31,8 +31,13 @@ SELECT run_command_on_workers($cmd$ SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; $cmd$); +-- Make sure this cannot be run in a transaction +BEGIN; +CALL citus_cleanup_orphaned_shards(); +COMMIT; + -- execute delayed removal -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- we expect the shard to be on only the second worker SELECT run_command_on_workers($cmd$ @@ -70,7 +75,8 @@ SELECT run_command_on_workers($cmd$ SELECT count(*) FROM pg_class WHERE relname = 't1_20000000'; $cmd$); --- we expect to get an error since the old placement is still there +-- master_move_shard_placement automatically cleans up orphaned shards if +-- needed. SELECT master_move_shard_placement(20000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); diff --git a/src/test/regress/sql/shard_rebalancer.sql b/src/test/regress/sql/shard_rebalancer.sql index ce0d0899c..0e41bac74 100644 --- a/src/test/regress/sql/shard_rebalancer.sql +++ b/src/test/regress/sql/shard_rebalancer.sql @@ -13,9 +13,9 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); -- should just be noops even if we add the coordinator to the pg_dist_node SELECT rebalance_table_shards('dist_table_test'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT rebalance_table_shards(); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- test that calling rebalance_table_shards without specifying relation @@ -25,7 +25,7 @@ SELECT citus_add_local_table_to_metadata('citus_local_table'); INSERT INTO citus_local_table VALUES (1, 2); SELECT rebalance_table_shards(); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; @@ -38,14 +38,14 @@ SELECT pg_reload_conf(); SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC SELECT master_drain_node('localhost', :master_port); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC SELECT master_drain_node('localhost', :master_port); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- show that citus local table shard is still on the coordinator SELECT tablename FROM pg_catalog.pg_tables where tablename like 'citus_local_table_%'; @@ -396,7 +396,7 @@ AS $$ pg_dist_shard_placement src USING (shardid), (SELECT nodename, nodeport FROM pg_dist_shard_placement ORDER BY nodeport DESC LIMIT 1) dst WHERE src.nodeport < dst.nodeport AND s.logicalrelid = rel::regclass; - SELECT public.master_defer_delete_shards(); + CALL citus_cleanup_orphaned_shards(); $$; CALL create_unbalanced_shards('rebalance_test_table'); @@ -428,7 +428,7 @@ FROM ( FROM pg_dist_shard WHERE logicalrelid = 'rebalance_test_table'::regclass ) T; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); @@ -445,7 +445,7 @@ FROM ( FROM pg_dist_shard WHERE logicalrelid = 'rebalance_test_table'::regclass ) T; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; @@ -480,26 +480,26 @@ SELECT rebalance_table_shards('rebalance_test_table', RESET ROLE; -- Confirm no moves took place at all during these errors SELECT * FROM table_placements_per_node; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, max_shard_moves := 1, shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; -- Check that threshold=1 doesn't move any shards SELECT rebalance_table_shards('rebalance_test_table', threshold := 1, shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; -- Move the remaining shards using threshold=0 SELECT rebalance_table_shards('rebalance_test_table', threshold := 0); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; @@ -507,7 +507,7 @@ SELECT * FROM table_placements_per_node; -- any effects. SELECT rebalance_table_shards('rebalance_test_table', threshold := 0, shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_placements_per_node; @@ -602,11 +602,11 @@ SELECT COUNT(*) FROM imbalanced_table; -- Try force_logical SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='force_logical'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Test rebalance operation SELECT rebalance_table_shards('imbalanced_table', threshold:=0, shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Confirm rebalance -- Shard counts in each node after rebalance @@ -633,7 +633,7 @@ SELECT create_distributed_table('colocated_rebalance_test', 'id'); SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', 10000, 'block_writes') FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Try to move shards to a node where shards are not allowed SELECT * from master_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); @@ -660,7 +660,7 @@ UPDATE pg_dist_node SET noderole = 'primary' WHERE nodeport = :worker_1_port; SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT create_distributed_table('colocated_rebalance_test2', 'id'); @@ -671,7 +671,7 @@ SELECT * FROM public.table_placements_per_node; SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0, drain_only := true); -- Running with drain_only shouldn't do anything SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Confirm that nothing changed SELECT * FROM public.table_placements_per_node; @@ -684,11 +684,30 @@ SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', rebala SELECT * FROM get_rebalance_progress(); -- Actually do the rebalance SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- Check that we can call this function without a crash SELECT * FROM get_rebalance_progress(); --- Confirm that the nodes are now there +-- Confirm that the shards are now there +SELECT * FROM public.table_placements_per_node; + +CALL citus_cleanup_orphaned_shards(); +select * from pg_dist_placement; + + +-- Move all shards to worker1 again +SELECT master_move_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes') +FROM pg_dist_shard NATURAL JOIN pg_dist_placement NATURAL JOIN pg_dist_node +WHERE nodeport = :worker_2_port AND logicalrelid = 'colocated_rebalance_test'::regclass; + +-- Confirm that the shards are now all on worker1 +SELECT * FROM public.table_placements_per_node; + +-- Explicitly don't run citus_cleanup_orphaned_shards, rebalance_table_shards +-- should do that for automatically. +SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); + +-- Confirm that the shards are now moved SELECT * FROM public.table_placements_per_node; @@ -702,22 +721,22 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaves SELECT * FROM get_rebalance_table_shards_plan('colocated_rebalance_test', threshold := 0); SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM get_rebalance_table_shards_plan('non_colocated_rebalance_test', threshold := 0); SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards('colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM rebalance_table_shards('non_colocated_rebalance_test', threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- testing behaviour when setting shouldhaveshards to false and rebalancing all @@ -725,13 +744,13 @@ SELECT * FROM public.table_placements_per_node; SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); SELECT * FROM get_rebalance_table_shards_plan(threshold := 0, drain_only := true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes', drain_only := true); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- testing behaviour when setting shouldhaveshards to false and rebalancing all @@ -739,13 +758,13 @@ SELECT * FROM public.table_placements_per_node; SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); SELECT * FROM get_rebalance_table_shards_plan(threshold := 0); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Make it a data node again @@ -753,14 +772,14 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaves -- testing behaviour of master_drain_node SELECT * from master_drain_node('localhost', :worker_2_port, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); select shouldhaveshards from pg_dist_node where nodeport = :worker_2_port; SELECT * FROM public.table_placements_per_node; -- Put shards back SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); SELECT * FROM rebalance_table_shards(threshold := 0, shard_transfer_mode := 'block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; @@ -829,15 +848,15 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_d SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', threshold := 0); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes', threshold := 0); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; -- Check that sizes of colocated tables are added together for rebalances @@ -888,7 +907,7 @@ SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_d -- supports improvement_threshold SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'by_disk_size', improvement_threshold := 0); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'by_disk_size', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; ANALYZE tab, tab2; @@ -945,13 +964,13 @@ SELECT citus_add_rebalance_strategy( SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'capacity_high_worker_2'); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'capacity_high_worker_2', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT citus_set_default_rebalance_strategy('capacity_high_worker_2'); SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; CREATE FUNCTION only_worker_1(shardid bigint, nodeidarg int) @@ -972,7 +991,7 @@ SELECT citus_add_rebalance_strategy( SELECT citus_set_default_rebalance_strategy('only_worker_1'); SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM rebalance_table_shards('tab', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM public.table_placements_per_node; SELECT citus_set_default_rebalance_strategy('by_shard_count'); @@ -981,18 +1000,18 @@ SELECT * FROM get_rebalance_table_shards_plan('tab'); -- Check all the error handling cases SELECT * FROM get_rebalance_table_shards_plan('tab', rebalance_strategy := 'non_existing'); SELECT * FROM rebalance_table_shards('tab', rebalance_strategy := 'non_existing'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM master_drain_node('localhost', :worker_2_port, rebalance_strategy := 'non_existing'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT citus_set_default_rebalance_strategy('non_existing'); UPDATE pg_dist_rebalance_strategy SET default_strategy=false; SELECT * FROM get_rebalance_table_shards_plan('tab'); SELECT * FROM rebalance_table_shards('tab'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT * FROM master_drain_node('localhost', :worker_2_port); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); UPDATE pg_dist_rebalance_strategy SET default_strategy=true WHERE name='by_shard_count'; CREATE OR REPLACE FUNCTION shard_cost_no_arguments() @@ -1222,7 +1241,7 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0); SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; SELECT rebalance_table_shards('rebalance_test_table', shard_transfer_mode:='block_writes'); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'ref_table'::regclass; @@ -1255,7 +1274,7 @@ INSERT INTO r2 VALUES (1,2), (3,4); SELECT 1 from master_add_node('localhost', :worker_2_port); SELECT rebalance_table_shards(); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); DROP TABLE t1, r1, r2; @@ -1282,7 +1301,7 @@ WHERE logicalrelid = 'r1'::regclass; -- rebalance with _only_ a reference table, this should trigger the copy SELECT rebalance_table_shards(); -SELECT public.master_defer_delete_shards(); +CALL citus_cleanup_orphaned_shards(); -- verify the reference table is on all nodes after the rebalance SELECT count(*)