Allows parallel shard moves using logical replication

Following changes are part of the commit
- Switch from ShareUpdateExclusiveLock to ShareLock when locking
  colocated relations during a shard move.
  This blocks concurrent DDL/TRUNCATE on the tables while still
  allowing parallel shard moves for other colocated relations.

- Drop the leftover replication lock that previously serialized shard
  moves performed via logical replication.
  This lock was only needed when we used to drop and recreate the
  subscriptions/publications before each move. Since Citus now removes
  those objects later as part of the “unused distributed objects” cleanup,
  shard moves via logical replication can safely run in parallel without
  additional locking.
pull/7983/head
Muhammad Usama 2025-05-15 14:31:22 +05:00
parent 49e56001fd
commit a8a5f34dc9
3 changed files with 7 additions and 38 deletions

View File

@ -439,10 +439,9 @@ TransferShards(int64 shardId, char *sourceNodeName,
if (transferType == SHARD_TRANSFER_MOVE)
{
/*
* Block concurrent DDL / TRUNCATE commands on the relation. Similarly,
* block concurrent citus_move_shard_placement() on any shard of
* the same relation. This is OK for now since we're executing shard
* moves sequentially anyway.
* Block concurrent DDL / TRUNCATE commands on the relation. while,
* allow concurrent citus_move_shard_placement() on the shards of
* the same relation.
*/
LockColocatedRelationsForMove(colocatedTableList);
}
@ -752,7 +751,7 @@ IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetN
/*
* LockColocatedRelationsForMove takes a list of relations, locks all of them
* using ShareUpdateExclusiveLock
* using ShareLock
*/
static void
LockColocatedRelationsForMove(List *colocatedTableList)
@ -760,7 +759,7 @@ LockColocatedRelationsForMove(List *colocatedTableList)
Oid colocatedTableId = InvalidOid;
foreach_declared_oid(colocatedTableId, colocatedTableList)
{
LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock);
LockRelationOid(colocatedTableId, ShareLock);
}
}

View File

@ -132,7 +132,6 @@ static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command);
static void WaitForMiliseconds(long timeout);
static XLogRecPtr GetSubscriptionPosition(
GroupedLogicalRepTargets *groupedLogicalRepTargets);
static void AcquireLogicalReplicationLock(void);
static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode,
List *shardIntervals);
@ -156,7 +155,6 @@ void
LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort,
char *targetNodeName, int targetNodePort)
{
AcquireLogicalReplicationLock();
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
int connectionFlags = FORCE_NEW_CONNECTION;
@ -268,6 +266,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
*/
CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash);
CloseConnection(sourceConnection);
}
@ -497,25 +496,6 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList)
}
/*
* AcquireLogicalReplicationLock tries to acquire a lock for logical
* replication. We need this lock, because at the start of logical replication
* we clean up old subscriptions and publications. Because of this cleanup it's
* not safe to run multiple logical replication based shard moves at the same
* time. If multiple logical replication moves would run at the same time, the
* second move might clean up subscriptions and publications that are in use by
* another move.
*/
static void
AcquireLogicalReplicationLock(void)
{
LOCKTAG tag;
SET_LOCKTAG_LOGICAL_REPLICATION(tag);
LockAcquire(&tag, ExclusiveLock, false, false);
}
/*
* PrepareReplicationSubscriptionList returns list of shards to be logically
* replicated from given shard list. This is needed because Postgres does not

View File

@ -44,7 +44,7 @@ typedef enum AdvisoryLocktagClass
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8,
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9,
ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID = 10,
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12,
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, /* Not used anymore */
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13,
ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK = 14,
ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION = 15
@ -125,16 +125,6 @@ typedef enum CitusOperations
(uint32) operationId, \
ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID)
/* reuse advisory lock, but with different, unused field 4 (12)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */
#define SET_LOCKTAG_LOGICAL_REPLICATION(tag) \
SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \
(uint32) 0, \
(uint32) 0, \
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION)
/* reuse advisory lock, but with different, unused field 4 (14)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */