From eaa7d2badadfd67965d281d82b57babf82225561 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Mon, 17 May 2021 13:22:35 +0300 Subject: [PATCH] Not block maintenance daemon (#4972) It was possible to block maintenance daemon by taking an SHARE ROW EXCLUSIVE lock on pg_dist_placement. Until the lock is released maintenance daemon would be blocked. We should not block the maintenance daemon under any case hence now we try to get the pg_dist_placement lock without waiting, if we cannot get it then we don't try to drop the old placements. --- .../distributed/operations/shard_cleaner.c | 63 +++++++++++++++---- .../distributed/test/shard_rebalancer.c | 13 ++++ src/backend/distributed/utils/maintenanced.c | 4 +- src/include/distributed/shard_cleaner.h | 4 +- .../isolation_rebalancer_deferred_drop.out | 20 ++++++ .../isolation_rebalancer_deferred_drop.spec | 17 +++++ 6 files changed, 104 insertions(+), 17 deletions(-) diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index a10fe1373..ab7fdfaf9 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -24,6 +24,7 @@ PG_FUNCTION_INFO_V1(master_defer_delete_shards); static bool TryDropShard(GroupShardPlacement *placement); +static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode); /* * master_defer_delete_shards implements a user-facing UDF to deleter orphaned shards that @@ -43,8 +44,8 @@ master_defer_delete_shards(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); EnsureCoordinator(); - bool waitForCleanupLock = true; - int droppedShardCount = DropMarkedShards(waitForCleanupLock); + bool waitForLocks = true; + int droppedShardCount = DropMarkedShards(waitForLocks); PG_RETURN_INT32(droppedShardCount); } @@ -58,13 +59,13 @@ master_defer_delete_shards(PG_FUNCTION_ARGS) * returns the number of dropped shards. */ int -TryDropMarkedShards(bool waitForCleanupLock) +TryDropMarkedShards(bool waitForLocks) { int droppedShardCount = 0; MemoryContext savedContext = CurrentMemoryContext; PG_TRY(); { - droppedShardCount = DropMarkedShards(waitForCleanupLock); + droppedShardCount = DropMarkedShards(waitForLocks); } PG_CATCH(); { @@ -91,32 +92,45 @@ TryDropMarkedShards(bool waitForCleanupLock) * will be removed at a later time when there are no locks held anymore on * those placements. * + * If waitForLocks is false, then if we cannot take a lock on pg_dist_placement + * we continue without waiting. + * * Before doing any of this it will take an exclusive PlacementCleanup lock. * This is to ensure that this function is not being run concurrently. * Otherwise really bad race conditions are possible, such as removing all - * placements of a shard. waitForCleanupLock indicates if this function should - * wait for this lock or error out. + * placements of a shard. waitForLocks indicates if this function should + * wait for this lock or not. * */ int -DropMarkedShards(bool waitForCleanupLock) +DropMarkedShards(bool waitForLocks) { int removedShardCount = 0; ListCell *shardPlacementCell = NULL; + /* + * We should try to take the highest lock that we take + * later in this function for pg_dist_placement. We take RowExclusiveLock + * in DeleteShardPlacementRow. + */ + LOCKMODE lockmode = RowExclusiveLock; + if (!IsCoordinator()) { return 0; } - if (waitForCleanupLock) + if (waitForLocks) { LockPlacementCleanup(); } - else if (!TryLockPlacementCleanup()) + else { - ereport(WARNING, (errmsg("could not acquire lock to cleanup placements"))); - return 0; + Oid distPlacementId = DistPlacementRelationId(); + if (!TryLockRelationAndPlacementCleanup(distPlacementId, lockmode)) + { + return 0; + } } int failedShardDropCount = 0; @@ -153,10 +167,33 @@ DropMarkedShards(bool waitForCleanupLock) } +/* + * TryLockRelationAndCleanup tries to lock the given relation + * and the placement cleanup. If it cannot, it returns false. + * + */ +static bool +TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode) +{ + if (!ConditionalLockRelationOid(relationId, lockmode)) + { + ereport(DEBUG1, (errmsg( + "could not acquire shard lock to cleanup placements"))); + return false; + } + + if (!TryLockPlacementCleanup()) + { + ereport(DEBUG1, (errmsg("could not acquire lock to cleanup placements"))); + return false; + } + return true; +} + + /* * TryDropShard tries to drop the given shard placement and returns - * true on success. On failure, this method swallows errors and emits them - * as WARNINGs. + * true on success. */ static bool TryDropShard(GroupShardPlacement *placement) diff --git a/src/backend/distributed/test/shard_rebalancer.c b/src/backend/distributed/test/shard_rebalancer.c index 4bd137c8c..d43ffe446 100644 --- a/src/backend/distributed/test/shard_rebalancer.c +++ b/src/backend/distributed/test/shard_rebalancer.c @@ -21,6 +21,7 @@ #include "distributed/connection_management.h" #include "distributed/listutils.h" #include "distributed/multi_physical_planner.h" +#include "distributed/shard_cleaner.h" #include "distributed/shard_rebalancer.h" #include "funcapi.h" #include "miscadmin.h" @@ -50,6 +51,7 @@ static ShardCost GetShardCost(uint64 shardId, void *context); PG_FUNCTION_INFO_V1(shard_placement_rebalance_array); PG_FUNCTION_INFO_V1(shard_placement_replication_array); PG_FUNCTION_INFO_V1(worker_node_responsive); +PG_FUNCTION_INFO_V1(run_try_drop_marked_shards); typedef struct ShardPlacementTestInfo { @@ -71,6 +73,17 @@ typedef struct RebalancePlanContext List *shardPlacementTestInfoList; } RebalancePlacementContext; +/* + * run_try_drop_marked_shards is a wrapper to run TryDropMarkedShards. + */ +Datum +run_try_drop_marked_shards(PG_FUNCTION_ARGS) +{ + bool waitForLocks = false; + TryDropMarkedShards(waitForLocks); + PG_RETURN_VOID(); +} + /* * shard_placement_rebalance_array returns a list of operations which can make a diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index f595d06bf..af52ddf9f 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -644,8 +644,8 @@ CitusMaintenanceDaemonMain(Datum main_arg) */ lastShardCleanTime = GetCurrentTimestamp(); - bool waitForCleanupLock = false; - numberOfDroppedShards = TryDropMarkedShards(waitForCleanupLock); + bool waitForLocks = false; + numberOfDroppedShards = TryDropMarkedShards(waitForLocks); } CommitTransactionCommand(); diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index caad5a615..b48d89522 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -17,7 +17,7 @@ extern bool DeferShardDeleteOnMove; extern double DesiredPercentFreeAfterMove; extern bool CheckAvailableSpaceBeforeMove; -extern int TryDropMarkedShards(bool waitForCleanupLock); -extern int DropMarkedShards(bool waitForCleanupLock); +extern int TryDropMarkedShards(bool waitForLocks); +extern int DropMarkedShards(bool waitForLocks); #endif /*CITUS_SHARD_CLEANER_H */ diff --git a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out index 779e70252..36918f3a6 100644 --- a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out +++ b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out @@ -18,6 +18,7 @@ master_defer_delete_shards 1 step s2-drop-marked-shards: + SET client_min_messages to DEBUG1; SELECT public.master_defer_delete_shards(); step s1-commit: @@ -40,6 +41,7 @@ master_move_shard_placement step s2-drop-marked-shards: + SET client_min_messages to DEBUG1; SELECT public.master_defer_delete_shards(); master_defer_delete_shards @@ -99,3 +101,21 @@ step s2-stop-connection: stop_session_level_connection_to_node + +starting permutation: s1-begin s1-lock-pg-dist-placement s2-drop-old-shards s1-commit +step s1-begin: + BEGIN; + +step s1-lock-pg-dist-placement: + LOCK TABLE pg_dist_placement IN SHARE ROW EXCLUSIVE MODE; + +s2: DEBUG: could not acquire shard lock to cleanup placements +step s2-drop-old-shards: + SELECT run_try_drop_marked_shards(); + +run_try_drop_marked_shards + + +step s1-commit: + COMMIT; + diff --git a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec index 6e91e97be..554068822 100644 --- a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec +++ b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec @@ -3,6 +3,11 @@ setup { + CREATE OR REPLACE FUNCTION run_try_drop_marked_shards() + RETURNS VOID + AS 'citus' + LANGUAGE C STRICT VOLATILE; + CREATE OR REPLACE FUNCTION start_session_level_connection_to_node(text, integer) RETURNS void LANGUAGE C STRICT VOLATILE @@ -65,6 +70,10 @@ step "s1-drop-marked-shards" SELECT public.master_defer_delete_shards(); } +step "s1-lock-pg-dist-placement" { + LOCK TABLE pg_dist_placement IN SHARE ROW EXCLUSIVE MODE; +} + step "s1-commit" { COMMIT; @@ -72,6 +81,10 @@ step "s1-commit" session "s2" +step "s2-drop-old-shards" { + SELECT run_try_drop_marked_shards(); +} + step "s2-start-session-level-connection" { SELECT start_session_level_connection_to_node('localhost', 57637); @@ -90,9 +103,13 @@ step "s2-lock-table-on-worker" step "s2-drop-marked-shards" { + SET client_min_messages to DEBUG1; SELECT public.master_defer_delete_shards(); } + permutation "s1-begin" "s1-move-placement" "s1-drop-marked-shards" "s2-drop-marked-shards" "s1-commit" permutation "s1-begin" "s1-move-placement" "s2-drop-marked-shards" "s1-drop-marked-shards" "s1-commit" permutation "s1-begin" "s1-move-placement" "s2-start-session-level-connection" "s2-lock-table-on-worker" "s1-drop-marked-shards" "s1-commit" "s2-stop-connection" +// make sure we error if we cannot get the lock on pg_dist_placement +permutation "s1-begin" "s1-lock-pg-dist-placement" "s2-drop-old-shards" "s1-commit"