mirror of https://github.com/citusdata/citus.git
Not block maintenance daemon (#4972)
It was possible to block maintenance daemon by taking an SHARE ROW EXCLUSIVE lock on pg_dist_placement. Until the lock is released maintenance daemon would be blocked. We should not block the maintenance daemon under any case hence now we try to get the pg_dist_placement lock without waiting, if we cannot get it then we don't try to drop the old placements.pull/4984/head
parent
b649dffabd
commit
eaa7d2bada
|
@ -24,6 +24,7 @@
|
|||
PG_FUNCTION_INFO_V1(master_defer_delete_shards);
|
||||
|
||||
static bool TryDropShard(GroupShardPlacement *placement);
|
||||
static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode);
|
||||
|
||||
/*
|
||||
* master_defer_delete_shards implements a user-facing UDF to deleter orphaned shards that
|
||||
|
@ -43,8 +44,8 @@ master_defer_delete_shards(PG_FUNCTION_ARGS)
|
|||
CheckCitusVersion(ERROR);
|
||||
EnsureCoordinator();
|
||||
|
||||
bool waitForCleanupLock = true;
|
||||
int droppedShardCount = DropMarkedShards(waitForCleanupLock);
|
||||
bool waitForLocks = true;
|
||||
int droppedShardCount = DropMarkedShards(waitForLocks);
|
||||
|
||||
PG_RETURN_INT32(droppedShardCount);
|
||||
}
|
||||
|
@ -58,13 +59,13 @@ master_defer_delete_shards(PG_FUNCTION_ARGS)
|
|||
* returns the number of dropped shards.
|
||||
*/
|
||||
int
|
||||
TryDropMarkedShards(bool waitForCleanupLock)
|
||||
TryDropMarkedShards(bool waitForLocks)
|
||||
{
|
||||
int droppedShardCount = 0;
|
||||
MemoryContext savedContext = CurrentMemoryContext;
|
||||
PG_TRY();
|
||||
{
|
||||
droppedShardCount = DropMarkedShards(waitForCleanupLock);
|
||||
droppedShardCount = DropMarkedShards(waitForLocks);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
|
@ -91,32 +92,45 @@ TryDropMarkedShards(bool waitForCleanupLock)
|
|||
* will be removed at a later time when there are no locks held anymore on
|
||||
* those placements.
|
||||
*
|
||||
* If waitForLocks is false, then if we cannot take a lock on pg_dist_placement
|
||||
* we continue without waiting.
|
||||
*
|
||||
* Before doing any of this it will take an exclusive PlacementCleanup lock.
|
||||
* This is to ensure that this function is not being run concurrently.
|
||||
* Otherwise really bad race conditions are possible, such as removing all
|
||||
* placements of a shard. waitForCleanupLock indicates if this function should
|
||||
* wait for this lock or error out.
|
||||
* placements of a shard. waitForLocks indicates if this function should
|
||||
* wait for this lock or not.
|
||||
*
|
||||
*/
|
||||
int
|
||||
DropMarkedShards(bool waitForCleanupLock)
|
||||
DropMarkedShards(bool waitForLocks)
|
||||
{
|
||||
int removedShardCount = 0;
|
||||
ListCell *shardPlacementCell = NULL;
|
||||
|
||||
/*
|
||||
* We should try to take the highest lock that we take
|
||||
* later in this function for pg_dist_placement. We take RowExclusiveLock
|
||||
* in DeleteShardPlacementRow.
|
||||
*/
|
||||
LOCKMODE lockmode = RowExclusiveLock;
|
||||
|
||||
if (!IsCoordinator())
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (waitForCleanupLock)
|
||||
if (waitForLocks)
|
||||
{
|
||||
LockPlacementCleanup();
|
||||
}
|
||||
else if (!TryLockPlacementCleanup())
|
||||
else
|
||||
{
|
||||
ereport(WARNING, (errmsg("could not acquire lock to cleanup placements")));
|
||||
return 0;
|
||||
Oid distPlacementId = DistPlacementRelationId();
|
||||
if (!TryLockRelationAndPlacementCleanup(distPlacementId, lockmode))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int failedShardDropCount = 0;
|
||||
|
@ -153,10 +167,33 @@ DropMarkedShards(bool waitForCleanupLock)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* TryLockRelationAndCleanup tries to lock the given relation
|
||||
* and the placement cleanup. If it cannot, it returns false.
|
||||
*
|
||||
*/
|
||||
static bool
|
||||
TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode)
|
||||
{
|
||||
if (!ConditionalLockRelationOid(relationId, lockmode))
|
||||
{
|
||||
ereport(DEBUG1, (errmsg(
|
||||
"could not acquire shard lock to cleanup placements")));
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!TryLockPlacementCleanup())
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("could not acquire lock to cleanup placements")));
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TryDropShard tries to drop the given shard placement and returns
|
||||
* true on success. On failure, this method swallows errors and emits them
|
||||
* as WARNINGs.
|
||||
* true on success.
|
||||
*/
|
||||
static bool
|
||||
TryDropShard(GroupShardPlacement *placement)
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "distributed/connection_management.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/shard_cleaner.h"
|
||||
#include "distributed/shard_rebalancer.h"
|
||||
#include "funcapi.h"
|
||||
#include "miscadmin.h"
|
||||
|
@ -50,6 +51,7 @@ static ShardCost GetShardCost(uint64 shardId, void *context);
|
|||
PG_FUNCTION_INFO_V1(shard_placement_rebalance_array);
|
||||
PG_FUNCTION_INFO_V1(shard_placement_replication_array);
|
||||
PG_FUNCTION_INFO_V1(worker_node_responsive);
|
||||
PG_FUNCTION_INFO_V1(run_try_drop_marked_shards);
|
||||
|
||||
typedef struct ShardPlacementTestInfo
|
||||
{
|
||||
|
@ -71,6 +73,17 @@ typedef struct RebalancePlanContext
|
|||
List *shardPlacementTestInfoList;
|
||||
} RebalancePlacementContext;
|
||||
|
||||
/*
|
||||
* run_try_drop_marked_shards is a wrapper to run TryDropMarkedShards.
|
||||
*/
|
||||
Datum
|
||||
run_try_drop_marked_shards(PG_FUNCTION_ARGS)
|
||||
{
|
||||
bool waitForLocks = false;
|
||||
TryDropMarkedShards(waitForLocks);
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* shard_placement_rebalance_array returns a list of operations which can make a
|
||||
|
|
|
@ -644,8 +644,8 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
*/
|
||||
lastShardCleanTime = GetCurrentTimestamp();
|
||||
|
||||
bool waitForCleanupLock = false;
|
||||
numberOfDroppedShards = TryDropMarkedShards(waitForCleanupLock);
|
||||
bool waitForLocks = false;
|
||||
numberOfDroppedShards = TryDropMarkedShards(waitForLocks);
|
||||
}
|
||||
|
||||
CommitTransactionCommand();
|
||||
|
|
|
@ -17,7 +17,7 @@ extern bool DeferShardDeleteOnMove;
|
|||
extern double DesiredPercentFreeAfterMove;
|
||||
extern bool CheckAvailableSpaceBeforeMove;
|
||||
|
||||
extern int TryDropMarkedShards(bool waitForCleanupLock);
|
||||
extern int DropMarkedShards(bool waitForCleanupLock);
|
||||
extern int TryDropMarkedShards(bool waitForLocks);
|
||||
extern int DropMarkedShards(bool waitForLocks);
|
||||
|
||||
#endif /*CITUS_SHARD_CLEANER_H */
|
||||
|
|
|
@ -18,6 +18,7 @@ master_defer_delete_shards
|
|||
|
||||
1
|
||||
step s2-drop-marked-shards:
|
||||
SET client_min_messages to DEBUG1;
|
||||
SELECT public.master_defer_delete_shards();
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
|
@ -40,6 +41,7 @@ master_move_shard_placement
|
|||
|
||||
|
||||
step s2-drop-marked-shards:
|
||||
SET client_min_messages to DEBUG1;
|
||||
SELECT public.master_defer_delete_shards();
|
||||
|
||||
master_defer_delete_shards
|
||||
|
@ -99,3 +101,21 @@ step s2-stop-connection:
|
|||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-lock-pg-dist-placement s2-drop-old-shards s1-commit
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-lock-pg-dist-placement:
|
||||
LOCK TABLE pg_dist_placement IN SHARE ROW EXCLUSIVE MODE;
|
||||
|
||||
s2: DEBUG: could not acquire shard lock to cleanup placements
|
||||
step s2-drop-old-shards:
|
||||
SELECT run_try_drop_marked_shards();
|
||||
|
||||
run_try_drop_marked_shards
|
||||
|
||||
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
|
|
|
@ -3,6 +3,11 @@
|
|||
setup
|
||||
{
|
||||
|
||||
CREATE OR REPLACE FUNCTION run_try_drop_marked_shards()
|
||||
RETURNS VOID
|
||||
AS 'citus'
|
||||
LANGUAGE C STRICT VOLATILE;
|
||||
|
||||
CREATE OR REPLACE FUNCTION start_session_level_connection_to_node(text, integer)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT VOLATILE
|
||||
|
@ -65,6 +70,10 @@ step "s1-drop-marked-shards"
|
|||
SELECT public.master_defer_delete_shards();
|
||||
}
|
||||
|
||||
step "s1-lock-pg-dist-placement" {
|
||||
LOCK TABLE pg_dist_placement IN SHARE ROW EXCLUSIVE MODE;
|
||||
}
|
||||
|
||||
step "s1-commit"
|
||||
{
|
||||
COMMIT;
|
||||
|
@ -72,6 +81,10 @@ step "s1-commit"
|
|||
|
||||
session "s2"
|
||||
|
||||
step "s2-drop-old-shards" {
|
||||
SELECT run_try_drop_marked_shards();
|
||||
}
|
||||
|
||||
step "s2-start-session-level-connection"
|
||||
{
|
||||
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||
|
@ -90,9 +103,13 @@ step "s2-lock-table-on-worker"
|
|||
|
||||
step "s2-drop-marked-shards"
|
||||
{
|
||||
SET client_min_messages to DEBUG1;
|
||||
SELECT public.master_defer_delete_shards();
|
||||
}
|
||||
|
||||
|
||||
permutation "s1-begin" "s1-move-placement" "s1-drop-marked-shards" "s2-drop-marked-shards" "s1-commit"
|
||||
permutation "s1-begin" "s1-move-placement" "s2-drop-marked-shards" "s1-drop-marked-shards" "s1-commit"
|
||||
permutation "s1-begin" "s1-move-placement" "s2-start-session-level-connection" "s2-lock-table-on-worker" "s1-drop-marked-shards" "s1-commit" "s2-stop-connection"
|
||||
// make sure we error if we cannot get the lock on pg_dist_placement
|
||||
permutation "s1-begin" "s1-lock-pg-dist-placement" "s2-drop-old-shards" "s1-commit"
|
||||
|
|
Loading…
Reference in New Issue