Fix data-race with concurrent calls of DropMarkedShards (#4909)

* Fix problews with concurrent calls of DropMarkedShards

When trying to enable `citus.defer_drop_after_shard_move` by default it
turned out that DropMarkedShards was not safe to call concurrently.
This could especially cause big problems when also moving shards at the
same time. During tests it was possible to trigger a state where a shard
that was moved would not be available on any of the nodes anymore after
the move.

Currently DropMarkedShards is only called in production by the
maintenaince deamon. Since this is only a single process triggering such
a race is currently impossible in production settings. In future changes
we will want to call DropMarkedShards from other places too though.

* Add some isolation tests

Co-authored-by: Jelte Fennema <github-tech@jeltef.nl>
pull/4915/head
SaitTalhaNisanci 2021-04-21 10:59:48 +03:00 committed by GitHub
parent 33c620f232
commit 93c2dcf3d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 195 additions and 8 deletions

View File

@ -16,6 +16,7 @@
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/shard_cleaner.h"
#include "distributed/resource_lock.h"
#include "distributed/worker_transaction.h"
@ -23,7 +24,7 @@
PG_FUNCTION_INFO_V1(master_defer_delete_shards);
static int DropMarkedShards(void);
static int DropMarkedShards(bool waitForCleanupLock);
/*
@ -44,7 +45,8 @@ master_defer_delete_shards(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
EnsureCoordinator();
int droppedShardCount = DropMarkedShards();
bool waitForCleanupLock = true;
int droppedShardCount = DropMarkedShards(waitForCleanupLock);
PG_RETURN_INT32(droppedShardCount);
}
@ -55,14 +57,14 @@ master_defer_delete_shards(PG_FUNCTION_ARGS)
* any errors to make it safe to use in the maintenance daemon.
*/
int
TryDropMarkedShards(void)
TryDropMarkedShards(bool waitForCleanupLock)
{
int droppedShardCount = 0;
MemoryContext savedContext = CurrentMemoryContext;
PG_TRY();
{
droppedShardCount = DropMarkedShards();
droppedShardCount = DropMarkedShards(waitForCleanupLock);
}
PG_CATCH();
{
@ -88,9 +90,15 @@ TryDropMarkedShards(void)
* group and continues with others. The group that has been skipped will be
* removed at a later time when there are no locks held anymore on those
* placements.
*
* Before doing any of this it will take an exclusive PlacementCleanup lock.
* This is to ensure that this function is not being run concurrently.
* Otherwise really bad race conditions are possible, such as removing all
* placements of a shard. waitForCleanupLock indicates if this function should
* wait for this lock or returns with a warning.
*/
static int
DropMarkedShards(void)
DropMarkedShards(bool waitForCleanupLock)
{
int removedShardCount = 0;
ListCell *shardPlacementCell = NULL;
@ -100,6 +108,16 @@ DropMarkedShards(void)
return removedShardCount;
}
if (waitForCleanupLock)
{
LockPlacementCleanup();
}
else if (!TryLockPlacementCleanup())
{
ereport(WARNING, (errmsg("could not acquire lock to cleanup placements")));
return 0;
}
List *shardPlacementList = AllShardPlacementsWithShardPlacementState(
SHARD_STATE_TO_DELETE);
foreach(shardPlacementCell, shardPlacementList)

View File

@ -644,7 +644,8 @@ CitusMaintenanceDaemonMain(Datum main_arg)
*/
lastShardCleanTime = GetCurrentTimestamp();
numberOfDroppedShards = TryDropMarkedShards();
bool waitForCleanupLock = false;
numberOfDroppedShards = TryDropMarkedShards(waitForCleanupLock);
}
CommitTransactionCommand();

View File

@ -387,6 +387,37 @@ SetLocktagForShardDistributionMetadata(int64 shardId, LOCKTAG *tag)
}
/*
* LockPlacementCleanup takes an exclusive lock to ensure that only one process
* can cleanup placements at the same time.
*/
void
LockPlacementCleanup(void)
{
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = false;
SET_LOCKTAG_PLACEMENT_CLEANUP(tag);
(void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait);
}
/*
* TryLockPlacementCleanup takes an exclusive lock to ensure that only one
* process can cleanup placements at the same time.
*/
bool
TryLockPlacementCleanup(void)
{
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = true;
SET_LOCKTAG_PLACEMENT_CLEANUP(tag);
bool lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait);
return lockAcquired;
}
/*
* LockReferencedReferenceShardDistributionMetadata acquires shard distribution
* metadata locks with the given lock mode on the reference tables which has a

View File

@ -39,9 +39,10 @@ typedef enum AdvisoryLocktagClass
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7,
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8,
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9,
ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP = 10,
/* Columnar lock types */
ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION = 10
ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION = 11
} AdvisoryLocktagClass;
/* CitusOperations has constants for citus operations */
@ -108,8 +109,21 @@ typedef enum CitusOperations
0, \
ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION)
/* reuse advisory lock, but with different, unused field 4 (10)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */
#define SET_LOCKTAG_PLACEMENT_CLEANUP(tag) \
SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \
(uint32) 0, \
(uint32) 0, \
ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP)
/* Lock shard/relation metadata for safe modifications */
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
extern void LockPlacementCleanup(void);
extern bool TryLockPlacementCleanup(void);
extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList);
extern void BlockWritesToShardList(List *shardList);

View File

@ -15,6 +15,6 @@
extern int DeferShardDeleteInterval;
extern bool DeferShardDeleteOnMove;
extern int TryDropMarkedShards(void);
extern int TryDropMarkedShards(bool waitForCleanupLock);
#endif /*CITUS_SHARD_CLEANER_H */

View File

@ -0,0 +1,56 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s1-move-placement s1-drop-marked-shards s2-drop-marked-shards s1-commit
step s1-begin:
BEGIN;
step s1-move-placement:
SET citus.defer_drop_after_shard_move TO ON;
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
master_move_shard_placement
step s1-drop-marked-shards:
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
1
step s2-drop-marked-shards:
SELECT public.master_defer_delete_shards();
<waiting ...>
step s1-commit:
COMMIT;
step s2-drop-marked-shards: <... completed>
master_defer_delete_shards
0
starting permutation: s1-begin s1-move-placement s2-drop-marked-shards s1-drop-marked-shards s1-commit
step s1-begin:
BEGIN;
step s1-move-placement:
SET citus.defer_drop_after_shard_move TO ON;
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
master_move_shard_placement
step s2-drop-marked-shards:
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
0
step s1-drop-marked-shards:
SELECT public.master_defer_delete_shards();
master_defer_delete_shards
1
step s1-commit:
COMMIT;

View File

@ -72,6 +72,7 @@ test: isolation_blocking_move_multi_shard_commands
test: isolation_blocking_move_single_shard_commands_on_mx
test: isolation_blocking_move_multi_shard_commands_on_mx
test: isolation_shard_rebalancer
test: isolation_rebalancer_deferred_drop
# MX tests
test: isolation_reference_on_mx

View File

@ -0,0 +1,66 @@
// we use 15 as the partition key value through out the test
// so setting the corresponding shard here is useful
setup
{
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
CREATE OR REPLACE FUNCTION master_defer_delete_shards()
RETURNS int
LANGUAGE C STRICT
AS 'citus', $$master_defer_delete_shards$$;
COMMENT ON FUNCTION master_defer_delete_shards()
IS 'remove orphaned shards';
SET citus.shard_count TO 8;
SET citus.shard_replication_factor TO 1;
SET citus.defer_drop_after_shard_move TO ON;
CREATE TABLE t1 (x int PRIMARY KEY, y int);
SELECT create_distributed_table('t1', 'x');
SELECT get_shard_id_for_distribution_column('t1', 15) INTO selected_shard;
}
teardown
{
SELECT citus_internal.restore_isolation_tester_func();
DROP TABLE selected_shard;
DROP TABLE t1;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-move-placement"
{
SET citus.defer_drop_after_shard_move TO ON;
SELECT master_move_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
}
step "s1-drop-marked-shards"
{
SELECT public.master_defer_delete_shards();
}
step "s1-commit"
{
COMMIT;
}
session "s2"
step "s2-drop-marked-shards"
{
SELECT public.master_defer_delete_shards();
}
permutation "s1-begin" "s1-move-placement" "s1-drop-marked-shards" "s2-drop-marked-shards" "s1-commit"
permutation "s1-begin" "s1-move-placement" "s2-drop-marked-shards" "s1-drop-marked-shards" "s1-commit"