diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index f8e8d851b..59e068459 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -16,6 +16,7 @@ #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/shard_cleaner.h" +#include "distributed/resource_lock.h" #include "distributed/worker_transaction.h" @@ -23,7 +24,7 @@ PG_FUNCTION_INFO_V1(master_defer_delete_shards); -static int DropMarkedShards(void); +static int DropMarkedShards(bool waitForCleanupLock); /* @@ -44,7 +45,8 @@ master_defer_delete_shards(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); EnsureCoordinator(); - int droppedShardCount = DropMarkedShards(); + bool waitForCleanupLock = true; + int droppedShardCount = DropMarkedShards(waitForCleanupLock); PG_RETURN_INT32(droppedShardCount); } @@ -55,14 +57,14 @@ master_defer_delete_shards(PG_FUNCTION_ARGS) * any errors to make it safe to use in the maintenance daemon. */ int -TryDropMarkedShards(void) +TryDropMarkedShards(bool waitForCleanupLock) { int droppedShardCount = 0; MemoryContext savedContext = CurrentMemoryContext; PG_TRY(); { - droppedShardCount = DropMarkedShards(); + droppedShardCount = DropMarkedShards(waitForCleanupLock); } PG_CATCH(); { @@ -88,9 +90,15 @@ TryDropMarkedShards(void) * group and continues with others. The group that has been skipped will be * removed at a later time when there are no locks held anymore on those * placements. + * + * Before doing any of this it will take an exclusive PlacementCleanup lock. + * This is to ensure that this function is not being run concurrently. + * Otherwise really bad race conditions are possible, such as removing all + * placements of a shard. waitForCleanupLock indicates if this function should + * wait for this lock or returns with a warning. */ static int -DropMarkedShards(void) +DropMarkedShards(bool waitForCleanupLock) { int removedShardCount = 0; ListCell *shardPlacementCell = NULL; @@ -100,6 +108,16 @@ DropMarkedShards(void) return removedShardCount; } + if (waitForCleanupLock) + { + LockPlacementCleanup(); + } + else if (!TryLockPlacementCleanup()) + { + ereport(WARNING, (errmsg("could not acquire lock to cleanup placements"))); + return 0; + } + List *shardPlacementList = AllShardPlacementsWithShardPlacementState( SHARD_STATE_TO_DELETE); foreach(shardPlacementCell, shardPlacementList) diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 60da47aaf..f595d06bf 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -644,7 +644,8 @@ CitusMaintenanceDaemonMain(Datum main_arg) */ lastShardCleanTime = GetCurrentTimestamp(); - numberOfDroppedShards = TryDropMarkedShards(); + bool waitForCleanupLock = false; + numberOfDroppedShards = TryDropMarkedShards(waitForCleanupLock); } CommitTransactionCommand(); diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index ba5a41824..602157339 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -387,6 +387,37 @@ SetLocktagForShardDistributionMetadata(int64 shardId, LOCKTAG *tag) } +/* + * LockPlacementCleanup takes an exclusive lock to ensure that only one process + * can cleanup placements at the same time. + */ +void +LockPlacementCleanup(void) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = false; + SET_LOCKTAG_PLACEMENT_CLEANUP(tag); + (void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); +} + + +/* + * TryLockPlacementCleanup takes an exclusive lock to ensure that only one + * process can cleanup placements at the same time. + */ +bool +TryLockPlacementCleanup(void) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = true; + SET_LOCKTAG_PLACEMENT_CLEANUP(tag); + bool lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); + return lockAcquired; +} + + /* * LockReferencedReferenceShardDistributionMetadata acquires shard distribution * metadata locks with the given lock mode on the reference tables which has a diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 213854f8d..5db9403b4 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -39,9 +39,10 @@ typedef enum AdvisoryLocktagClass ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7, ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8, ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9, + ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP = 10, /* Columnar lock types */ - ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION = 10 + ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION = 11 } AdvisoryLocktagClass; /* CitusOperations has constants for citus operations */ @@ -108,8 +109,21 @@ typedef enum CitusOperations 0, \ ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION) +/* reuse advisory lock, but with different, unused field 4 (10) + * Also it has the database hardcoded to MyDatabaseId, to ensure the locks + * are local to each database */ +#define SET_LOCKTAG_PLACEMENT_CLEANUP(tag) \ + SET_LOCKTAG_ADVISORY(tag, \ + MyDatabaseId, \ + (uint32) 0, \ + (uint32) 0, \ + ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP) + + /* Lock shard/relation metadata for safe modifications */ extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode); +extern void LockPlacementCleanup(void); +extern bool TryLockPlacementCleanup(void); extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList); extern void BlockWritesToShardList(List *shardList); diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index caa739d7e..4caf87c76 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -15,6 +15,6 @@ extern int DeferShardDeleteInterval; extern bool DeferShardDeleteOnMove; -extern int TryDropMarkedShards(void); +extern int TryDropMarkedShards(bool waitForCleanupLock); #endif /*CITUS_SHARD_CLEANER_H */ diff --git a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out new file mode 100644 index 000000000..fca41ec08 --- /dev/null +++ b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out @@ -0,0 +1,56 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-move-placement s1-drop-marked-shards s2-drop-marked-shards s1-commit +step s1-begin: + BEGIN; + +step s1-move-placement: + SET citus.defer_drop_after_shard_move TO ON; + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + +master_move_shard_placement + + +step s1-drop-marked-shards: + SELECT public.master_defer_delete_shards(); + +master_defer_delete_shards + +1 +step s2-drop-marked-shards: + SELECT public.master_defer_delete_shards(); + +step s1-commit: + COMMIT; + +step s2-drop-marked-shards: <... completed> +master_defer_delete_shards + +0 + +starting permutation: s1-begin s1-move-placement s2-drop-marked-shards s1-drop-marked-shards s1-commit +step s1-begin: + BEGIN; + +step s1-move-placement: + SET citus.defer_drop_after_shard_move TO ON; + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); + +master_move_shard_placement + + +step s2-drop-marked-shards: + SELECT public.master_defer_delete_shards(); + +master_defer_delete_shards + +0 +step s1-drop-marked-shards: + SELECT public.master_defer_delete_shards(); + +master_defer_delete_shards + +1 +step s1-commit: + COMMIT; + diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index fb2c60a5e..127bafd01 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -72,6 +72,7 @@ test: isolation_blocking_move_multi_shard_commands test: isolation_blocking_move_single_shard_commands_on_mx test: isolation_blocking_move_multi_shard_commands_on_mx test: isolation_shard_rebalancer +test: isolation_rebalancer_deferred_drop # MX tests test: isolation_reference_on_mx diff --git a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec new file mode 100644 index 000000000..4ccb33583 --- /dev/null +++ b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec @@ -0,0 +1,66 @@ +// we use 15 as the partition key value through out the test +// so setting the corresponding shard here is useful +setup +{ + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); + +CREATE OR REPLACE FUNCTION master_defer_delete_shards() + RETURNS int + LANGUAGE C STRICT + AS 'citus', $$master_defer_delete_shards$$; +COMMENT ON FUNCTION master_defer_delete_shards() + IS 'remove orphaned shards'; + + SET citus.shard_count TO 8; + SET citus.shard_replication_factor TO 1; + SET citus.defer_drop_after_shard_move TO ON; + CREATE TABLE t1 (x int PRIMARY KEY, y int); + SELECT create_distributed_table('t1', 'x'); + + SELECT get_shard_id_for_distribution_column('t1', 15) INTO selected_shard; +} + +teardown +{ + SELECT citus_internal.restore_isolation_tester_func(); + + DROP TABLE selected_shard; + DROP TABLE t1; +} + + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-move-placement" +{ + SET citus.defer_drop_after_shard_move TO ON; + SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638); +} + +step "s1-drop-marked-shards" +{ + SELECT public.master_defer_delete_shards(); +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-drop-marked-shards" +{ + SELECT public.master_defer_delete_shards(); +} + +permutation "s1-begin" "s1-move-placement" "s1-drop-marked-shards" "s2-drop-marked-shards" "s1-commit" +permutation "s1-begin" "s1-move-placement" "s2-drop-marked-shards" "s1-drop-marked-shards" "s1-commit" + +