From a8a5f34dc92afb3f71bb1766d04e4d35d1ccf6cc Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Thu, 15 May 2025 14:31:22 +0500 Subject: [PATCH] Allows parallel shard moves using logical replication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Following changes are part of the commit - Switch from ShareUpdateExclusiveLock to ShareLock when locking colocated relations during a shard move. This blocks concurrent DDL/TRUNCATE on the tables while still allowing parallel shard moves for other colocated relations. - Drop the leftover replication lock that previously serialized shard moves performed via logical replication. This lock was only needed when we used to drop and recreate the subscriptions/publications before each move. Since Citus now removes those objects later as part of the “unused distributed objects” cleanup, shard moves via logical replication can safely run in parallel without additional locking. --- .../distributed/operations/shard_transfer.c | 11 +++++----- .../replication/multi_logical_replication.c | 22 +------------------ src/include/distributed/resource_lock.h | 12 +--------- 3 files changed, 7 insertions(+), 38 deletions(-) diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index bcc1d0b19..57c0f418b 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -439,10 +439,9 @@ TransferShards(int64 shardId, char *sourceNodeName, if (transferType == SHARD_TRANSFER_MOVE) { /* - * Block concurrent DDL / TRUNCATE commands on the relation. Similarly, - * block concurrent citus_move_shard_placement() on any shard of - * the same relation. This is OK for now since we're executing shard - * moves sequentially anyway. + * Block concurrent DDL / TRUNCATE commands on the relation. while, + * allow concurrent citus_move_shard_placement() on the shards of + * the same relation. */ LockColocatedRelationsForMove(colocatedTableList); } @@ -752,7 +751,7 @@ IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetN /* * LockColocatedRelationsForMove takes a list of relations, locks all of them - * using ShareUpdateExclusiveLock + * using ShareLock */ static void LockColocatedRelationsForMove(List *colocatedTableList) @@ -760,7 +759,7 @@ LockColocatedRelationsForMove(List *colocatedTableList) Oid colocatedTableId = InvalidOid; foreach_declared_oid(colocatedTableId, colocatedTableList) { - LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock); + LockRelationOid(colocatedTableId, ShareLock); } } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 7189216d0..6ab73bdd3 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -132,7 +132,6 @@ static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); static void WaitForMiliseconds(long timeout); static XLogRecPtr GetSubscriptionPosition( GroupedLogicalRepTargets *groupedLogicalRepTargets); -static void AcquireLogicalReplicationLock(void); static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode, List *shardIntervals); @@ -156,7 +155,6 @@ void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, int targetNodePort) { - AcquireLogicalReplicationLock(); char *superUser = CitusExtensionOwnerName(); char *databaseName = get_database_name(MyDatabaseId); int connectionFlags = FORCE_NEW_CONNECTION; @@ -268,6 +266,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo */ CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash); CloseConnection(sourceConnection); + } @@ -497,25 +496,6 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList) } -/* - * AcquireLogicalReplicationLock tries to acquire a lock for logical - * replication. We need this lock, because at the start of logical replication - * we clean up old subscriptions and publications. Because of this cleanup it's - * not safe to run multiple logical replication based shard moves at the same - * time. If multiple logical replication moves would run at the same time, the - * second move might clean up subscriptions and publications that are in use by - * another move. - */ -static void -AcquireLogicalReplicationLock(void) -{ - LOCKTAG tag; - SET_LOCKTAG_LOGICAL_REPLICATION(tag); - - LockAcquire(&tag, ExclusiveLock, false, false); -} - - /* * PrepareReplicationSubscriptionList returns list of shards to be logically * replicated from given shard list. This is needed because Postgres does not diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index a34ab7e89..04537a8de 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -44,7 +44,7 @@ typedef enum AdvisoryLocktagClass ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8, ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9, ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID = 10, - ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, + ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, /* Not used anymore */ ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13, ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK = 14, ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION = 15 @@ -125,16 +125,6 @@ typedef enum CitusOperations (uint32) operationId, \ ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID) -/* reuse advisory lock, but with different, unused field 4 (12) - * Also it has the database hardcoded to MyDatabaseId, to ensure the locks - * are local to each database */ -#define SET_LOCKTAG_LOGICAL_REPLICATION(tag) \ - SET_LOCKTAG_ADVISORY(tag, \ - MyDatabaseId, \ - (uint32) 0, \ - (uint32) 0, \ - ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION) - /* reuse advisory lock, but with different, unused field 4 (14) * Also it has the database hardcoded to MyDatabaseId, to ensure the locks * are local to each database */