From 49e56001fd92e37e1df22f56e85e8f820dd7fdd5 Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Tue, 6 May 2025 12:31:53 +0500 Subject: [PATCH 1/3] Parallelize shard rebalancing to reduce rebalance time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit introduces the following changes: - Break out reference-table shard copies into independent background tasks - Introduce a new “single-shard” flag to the TransferShard APIs so they can either transfer all collected shards for a shard ID (existing behavior), or copy just the one shard specified by shardID - In citus_rebalance_start(), use this flag to spawn a separate background task for each reference-table shard, creating the table and loading its data without any constraints - Added a SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY flag so TransferShards() can defer foreign-key and other constraint creation - After all reference-table copies complete, schedule a final task that applies all deferred constraints in one batch - Introduce an advisory lock to serialize rebalance operations; downgrade previous colocation locks from ExclusiveLock to RowExclusiveLock so they don’t conflict with the rebalance - Remove intra-colocation-group dependencies so shards in the same group can move independently - Increase default citus.max_background_task_executors_per_node from 1 to 4 --- .../distributed/operations/shard_rebalancer.c | 107 ++--- .../distributed/operations/shard_transfer.c | 364 +++++++++++------- src/backend/distributed/shared_library_init.c | 2 +- .../distributed/sql/citus--13.0-1--13.1-1.sql | 1 + .../citus_copy_shard_placement/13.1-1.sql | 16 + .../distributed/utils/background_jobs.c | 2 +- .../distributed/utils/reference_table_utils.c | 294 +++++++++++++- .../distributed/reference_table_utils.h | 1 + src/include/distributed/resource_lock.h | 3 +- src/include/distributed/shard_transfer.h | 43 ++- 10 files changed, 641 insertions(+), 192 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_copy_shard_placement/13.1-1.sql diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 074f1bed0..fe51fafa4 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -220,7 +220,6 @@ typedef struct ShardMoveSourceNodeHashEntry */ typedef struct ShardMoveDependencies { - HTAB *colocationDependencies; HTAB *nodeDependencies; } ShardMoveDependencies; @@ -274,6 +273,7 @@ static int64 RebalanceTableShardsBackground(RebalanceOptions *options, Oid static void AcquireRebalanceColocationLock(Oid relationId, const char *operationName); static void ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, char *noticeOperation); +static void AcquireRebalanceOperationLock(const char *operationName); static float4 CalculateUtilization(float4 totalCost, float4 capacity); static Form_pg_dist_rebalance_strategy GetRebalanceStrategy(Name name); static void EnsureShardCostUDF(Oid functionOid); @@ -297,8 +297,9 @@ static void ErrorOnConcurrentRebalance(RebalanceOptions *); static List * GetSetCommandListForNewConnections(void); static int64 GetColocationId(PlacementUpdateEvent *move); static ShardMoveDependencies InitializeShardMoveDependencies(); -static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 - colocationId, +static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, + int64 *refTablesDepTaskIds, + int refTablesDepTaskIdsCount, ShardMoveDependencies shardMoveDependencies, int *nDepends); static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, @@ -767,6 +768,31 @@ AcquireRebalanceColocationLock(Oid relationId, const char *operationName) } } +/* + * AcquireRebalanceOperationLock does not allow concurrent rebalance + * operations. + */ +static void +AcquireRebalanceOperationLock(const char *operationName) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = true; + + SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_REBALANCE_OPERATION); + + LockAcquireResult lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock, + dontWait); + if (!lockAcquired) + { + ereport(ERROR, (errmsg("could not acquire the lock required for %s operation", + operationName), + errdetail("It means that either a concurrent shard move " + "or shard copy is happening."), + errhint("Make sure that the concurrent operation has " + "finished and re-run the command"))); + } +} /* * AcquirePlacementColocationLock tries to acquire a lock for @@ -1954,6 +1980,8 @@ ErrorOnConcurrentRebalance(RebalanceOptions *options) AcquireRebalanceColocationLock(relationId, options->operationName); } + AcquireRebalanceOperationLock(options->operationName); + int64 jobId = 0; if (HasNonTerminalJobOfType("rebalance", &jobId)) { @@ -1991,10 +2019,6 @@ static ShardMoveDependencies InitializeShardMoveDependencies() { ShardMoveDependencies shardMoveDependencies; - shardMoveDependencies.colocationDependencies = CreateSimpleHashWithNameAndSize(int64, - ShardMoveDependencyInfo, - "colocationDependencyHashMap", - 6); shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int32, ShardMoveSourceNodeHashEntry, "nodeDependencyHashMap", @@ -2008,23 +2032,14 @@ InitializeShardMoveDependencies() * the move must take a dependency on, given the shard move dependencies as input. */ static int64 * -GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, +GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 *refTablesDepTaskIds, + int refTablesDepTaskIdsCount, ShardMoveDependencies shardMoveDependencies, int *nDepends) { HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64, "shardMoveDependencyList", 0); bool found; - - /* Check if there exists a move in the same colocation group scheduled earlier. */ - ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( - shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, &found); - - if (found) - { - hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); - } - /* * Check if there exists moves scheduled earlier whose source node * overlaps with the current move's target node. @@ -2045,6 +2060,19 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, } } + /* + * shard copy can only start after finishing copy of reference table shards + * so each shard task will have a dependency on the task that indicates the + * copy complete of reference tables + */ + while (refTablesDepTaskIdsCount > 0) + { + int64 refTableTaskId = *refTablesDepTaskIds; + hash_search(dependsList, &refTableTaskId, HASH_ENTER, NULL); + refTablesDepTaskIds++; + refTablesDepTaskIdsCount--; + } + *nDepends = hash_get_num_entries(dependsList); int64 *dependsArray = NULL; @@ -2076,9 +2104,6 @@ static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId, ShardMoveDependencies shardMoveDependencies) { - ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( - shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL); - shardMoveDependencyInfo->taskId = taskId; bool found; ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search( @@ -2174,30 +2199,18 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo initStringInfo(&buf); List *referenceTableIdList = NIL; - int64 replicateRefTablesTaskId = 0; + int64 *refTablesDepTaskIds = NULL; + int refTablesDepTaskIdsCount = 0; if (HasNodesWithMissingReferenceTables(&referenceTableIdList)) { - if (shardTransferMode == TRANSFER_MODE_AUTOMATIC) - { - VerifyTablesHaveReplicaIdentity(referenceTableIdList); - } - - /* - * Reference tables need to be copied to (newly-added) nodes, this needs to be the - * first task before we can move any other table. - */ - appendStringInfo(&buf, - "SELECT pg_catalog.replicate_reference_tables(%s)", - quote_literal_cstr(shardTranferModeLabel)); - - int32 nodesInvolved[] = { 0 }; - - /* replicate_reference_tables permissions require superuser */ - Oid superUserId = CitusExtensionOwner(); - BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data, 0, - NULL, 0, nodesInvolved); - replicateRefTablesTaskId = task->taskid; + refTablesDepTaskIds = ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(jobId, TRANSFER_MODE_BLOCK_WRITES, &refTablesDepTaskIdsCount); + ereport(DEBUG2, + (errmsg("%d dependent copy reference table tasks for job %ld", + refTablesDepTaskIdsCount, jobId), + errdetail("Rebalance scheduled as background job"), + errhint("To monitor progress, run: SELECT * FROM " + "citus_rebalance_status();"))); } PlacementUpdateEvent *move = NULL; @@ -2219,17 +2232,11 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo int nDepends = 0; - int64 *dependsArray = GenerateTaskMoveDependencyList(move, colocationId, + int64 *dependsArray = GenerateTaskMoveDependencyList(move, refTablesDepTaskIds, + refTablesDepTaskIdsCount, shardMoveDependencies, &nDepends); - if (nDepends == 0 && replicateRefTablesTaskId > 0) - { - nDepends = 1; - dependsArray = palloc(nDepends * sizeof(int64)); - dependsArray[0] = replicateRefTablesTaskId; - } - int32 nodesInvolved[2] = { 0 }; nodesInvolved[0] = move->sourceNode->nodeId; nodesInvolved[1] = move->targetNode->nodeId; diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index b7d07b2cf..bcc1d0b19 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -107,16 +107,18 @@ static void ErrorIfSameNode(char *sourceNodeName, int sourceNodePort, static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, bool useLogicalReplication, - const char *operationName); + const char *operationName, uint32 optionFlags); static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort); + int32 targetNodePort, + uint32 optionFlags); static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, - char *targetNodeName, int32 targetNodePort); + char *targetNodeName, int32 targetNodePort, + uint32 optionFlags); static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, int32 targetNodePort); @@ -174,6 +176,7 @@ PG_FUNCTION_INFO_V1(master_copy_shard_placement); PG_FUNCTION_INFO_V1(citus_move_shard_placement); PG_FUNCTION_INFO_V1(citus_move_shard_placement_with_nodeid); PG_FUNCTION_INFO_V1(master_move_shard_placement); +PG_FUNCTION_INFO_V1(citus_copy_one_shard_placement); double DesiredPercentFreeAfterMove = 10; bool CheckAvailableSpaceBeforeMove = true; @@ -203,7 +206,7 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } @@ -232,7 +235,7 @@ citus_copy_shard_placement_with_nodeid(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, targetNode->workerName, targetNode->workerPort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } @@ -267,13 +270,41 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } +/* + * master_copy_shard_placement is a wrapper function for old UDF name. + */ +Datum +citus_copy_one_shard_placement(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + int64 shardId = PG_GETARG_INT64(0); + uint32 sourceNodeId = PG_GETARG_INT32(1); + uint32 targetNodeId = PG_GETARG_INT32(2); + uint32 flags = PG_GETARG_INT32(3); + Oid shardReplicationModeOid = PG_GETARG_OID(4); + + bool missingOk = false; + WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk); + WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk); + + char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); + + TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, + targetNode->workerName, targetNode->workerPort, + shardReplicationMode, SHARD_TRANSFER_COPY, flags); + + PG_RETURN_VOID(); +} + /* * citus_move_shard_placement moves given shard (and its co-located shards) from one * node to the other node. To accomplish this it entirely recreates the table structure @@ -315,7 +346,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_MOVE); + shardReplicationMode, SHARD_TRANSFER_MOVE, 0); PG_RETURN_VOID(); } @@ -343,12 +374,10 @@ citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS) char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, targetNode->workerName, - targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE); + targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE, 0); PG_RETURN_VOID(); } - - /* * TransferShards is the function for shard transfers. */ @@ -356,7 +385,7 @@ void TransferShards(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, char shardReplicationMode, - ShardTransferType transferType) + ShardTransferType transferType, uint32 optionFlags) { /* strings to be used in log messages */ const char *operationName = ShardTransferTypeNames[transferType]; @@ -385,10 +414,25 @@ TransferShards(int64 shardId, char *sourceNodeName, ErrorIfTargetNodeIsNotSafeForTransfer(targetNodeName, targetNodePort, transferType); - AcquirePlacementColocationLock(distributedTableId, ExclusiveLock, operationName); + AcquirePlacementColocationLock(distributedTableId, RowExclusiveLock, operationName); - List *colocatedTableList = ColocatedTableList(distributedTableId); - List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + List *colocatedTableList; + List *colocatedShardList; + + /* + * If SHARD_TRANSFER_SINGLE_SHARD_ONLY is set, we only transfer the shard + * specified by shardId. Otherwise, we transfer all colocated shards. + */ + if (optionFlags & SHARD_TRANSFER_SINGLE_SHARD_ONLY) + { + colocatedTableList = list_make1_oid(distributedTableId); + colocatedShardList = list_make1(shardInterval); + } + else + { + colocatedTableList = ColocatedTableList(distributedTableId); + colocatedShardList = ColocatedShardIntervalList(shardInterval); + } EnsureTableListOwner(colocatedTableList); @@ -412,19 +456,52 @@ TransferShards(int64 shardId, char *sourceNodeName, /* * We sort shardIntervalList so that lock operations will not cause any - * deadlocks. + * deadlocks. But we do not need to do that if the list contain only one + * shard. */ - colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + if (!(optionFlags & SHARD_TRANSFER_SINGLE_SHARD_ONLY)) + { + colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + } - if (TransferAlreadyCompleted(colocatedShardList, - sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort, - transferType)) + bool transferAlreadyCompleted = TransferAlreadyCompleted(colocatedShardList, + sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, + transferType); + + /* + * If we just need to create the shard relationships,We don't need to do anything + * else other than calling CopyShardTables with SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY + * flag. + */ + if (optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY) + { + if (!transferAlreadyCompleted) + { + /* + * if the transfer is not completed, and we are here just to create + * the relationships, we can return right away + */ + ereport(WARNING, (errmsg("shard is not present on node %s:%d", + targetNodeName, targetNodePort), + errdetail("%s may have not completed.", + operationNameCapitalized))); + return; + } + + CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, + targetNodePort, (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL), + operationFunctionName, optionFlags); + /* We don't need to do anything else, just return */ + return; + } + + if (transferAlreadyCompleted) { /* if the transfer is already completed, we can return right away */ ereport(WARNING, (errmsg("shard is already present on node %s:%d", - targetNodeName, targetNodePort), - errdetail("%s may have already completed.", + targetNodeName, targetNodePort), + errdetail("%s may have already completed.", operationNameCapitalized))); return; } @@ -515,7 +592,7 @@ TransferShards(int64 shardId, char *sourceNodeName, } CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort, useLogicalReplication, operationFunctionName); + targetNodePort, useLogicalReplication, operationFunctionName, optionFlags); if (transferType == SHARD_TRANSFER_MOVE) { @@ -573,7 +650,6 @@ TransferShards(int64 shardId, char *sourceNodeName, FinalizeCurrentProgressMonitor(); } - /* * Insert deferred cleanup records. * The shards will be dropped by background cleaner later. @@ -1131,9 +1207,9 @@ BlockWritesToShardList(List *shardList) * asynchronous shard copy in case of cascading DML operations. */ LockReferencedReferenceShardDistributionMetadata(shard->shardId, - ExclusiveLock); + RowExclusiveLock); - LockShardDistributionMetadata(shard->shardId, ExclusiveLock); + LockShardDistributionMetadata(shard->shardId, RowExclusiveLock); } /* following code relies on the list to have at least one shard */ @@ -1156,7 +1232,7 @@ BlockWritesToShardList(List *shardList) * Even if users disable metadata sync, we cannot allow them not to * acquire the remote locks. Hence, we have !IsCoordinator() check. */ - LockShardListMetadataOnWorkers(ExclusiveLock, shardList); + LockShardListMetadataOnWorkers(RowExclusiveLock, shardList); } } @@ -1333,7 +1409,7 @@ ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList) static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, bool useLogicalReplication, - const char *operationName) + const char *operationName, uint32 optionFlags) { if (list_length(shardIntervalList) < 1) { @@ -1347,12 +1423,12 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP { CopyShardTablesViaLogicalReplication(shardIntervalList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort); + targetNodePort, optionFlags); } else { CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort); + targetNodeName, targetNodePort, optionFlags); } /* @@ -1369,7 +1445,7 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort) + int32 targetNodePort, uint32 optionFlags) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaLogicalReplication", @@ -1437,7 +1513,7 @@ CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList) static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort) + int32 targetNodePort, uint32 optionFlags) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaBlockWrites", @@ -1446,127 +1522,141 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); - - /* iterate through the colocated shards and copy each */ ShardInterval *shardInterval = NULL; - foreach_declared_ptr(shardInterval, shardIntervalList) - { - /* - * For each shard we first create the shard table in a separate - * transaction and then we copy the data and create the indexes in a - * second separate transaction. The reason we don't do both in a single - * transaction is so we can see the size of the new shard growing - * during the copy when we run get_rebalance_progress in another - * session. If we wouldn't split these two phases up, then the table - * wouldn't be visible in the session that get_rebalance_progress uses. - * So get_rebalance_progress would always report its size as 0. - */ - List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, - sourceNodePort); - char *tableOwner = TableOwner(shardInterval->relationId); - - /* drop the shard we created on the target, in case of failure */ - InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, - ConstructQualifiedShardName(shardInterval), - GroupForNode(targetNodeName, - targetNodePort), - CLEANUP_ON_FAILURE); - - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); - } - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_COPYING_DATA); - - ConflictWithIsolationTestingBeforeCopy(); - CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); - ConflictWithIsolationTestingAfterCopy(); - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS); - - foreach_declared_ptr(shardInterval, shardIntervalList) - { - List *ddlCommandList = - PostLoadShardCreationCommandList(shardInterval, sourceNodeName, - sourceNodePort); - char *tableOwner = TableOwner(shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); - - MemoryContextReset(localContext); - } /* - * Once all shards are copied, we can recreate relationships between shards. - * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. + * If we’re only asked to create the relationships, the shards are already + * present and populated on the node. Skip the table‑setup and data‑loading + * steps and proceed straight to creating the relationships. */ - List *shardIntervalWithDDCommandsList = NIL; - foreach_declared_ptr(shardInterval, shardIntervalList) + if (!(optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY)) { - if (PartitionTable(shardInterval->relationId)) + /* iterate through the colocated shards and copy each */ + foreach_declared_ptr(shardInterval, shardIntervalList) { - char *attachPartitionCommand = - GenerateAttachShardPartitionCommand(shardInterval); + /* + * For each shard we first create the shard table in a separate + * transaction and then we copy the data and create the indexes in a + * second separate transaction. The reason we don't do both in a single + * transaction is so we can see the size of the new shard growing + * during the copy when we run get_rebalance_progress in another + * session. If we wouldn't split these two phases up, then the table + * wouldn't be visible in the session that get_rebalance_progress uses. + * So get_rebalance_progress would always report its size as 0. + */ + List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, + sourceNodePort); + char *tableOwner = TableOwner(shardInterval->relationId); - ShardCommandList *shardCommandList = CreateShardCommandList( - shardInterval, - list_make1(attachPartitionCommand)); - shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, - shardCommandList); + /* drop the shard we created on the target, in case of failure */ + InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, + ConstructQualifiedShardName(shardInterval), + GroupForNode(targetNodeName, + targetNodePort), + CLEANUP_ON_FAILURE); + + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + } + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COPYING_DATA); + + ConflictWithIsolationTestingBeforeCopy(); + CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); + ConflictWithIsolationTestingAfterCopy(); + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS); + + foreach_declared_ptr(shardInterval, shardIntervalList) + { + List *ddlCommandList = + PostLoadShardCreationCommandList(shardInterval, sourceNodeName, + sourceNodePort); + char *tableOwner = TableOwner(shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + + MemoryContextReset(localContext); } } - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); - /* - * Iterate through the colocated shards and create DDL commamnds - * to create the foreign constraints. + * Skip creating shard relationships if the caller has requested that they + * not be created. */ - foreach_declared_ptr(shardInterval, shardIntervalList) + if (!(optionFlags & SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS)) { - List *shardForeignConstraintCommandList = NIL; - List *referenceTableForeignConstraintList = NIL; + /* + * Once all shards are copied, we can recreate relationships between shards. + * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. + */ + List *shardIntervalWithDDCommandsList = NIL; + foreach_declared_ptr(shardInterval, shardIntervalList) + { + if (PartitionTable(shardInterval->relationId)) + { + char *attachPartitionCommand = + GenerateAttachShardPartitionCommand(shardInterval); - CopyShardForeignConstraintCommandListGrouped(shardInterval, - &shardForeignConstraintCommandList, - &referenceTableForeignConstraintList); + ShardCommandList *shardCommandList = CreateShardCommandList( + shardInterval, + list_make1(attachPartitionCommand)); + shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, + shardCommandList); + } + } - ShardCommandList *shardCommandList = CreateShardCommandList( - shardInterval, - list_concat(shardForeignConstraintCommandList, - referenceTableForeignConstraintList)); - shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, - shardCommandList); + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); + + /* + * Iterate through the colocated shards and create DDL commamnds + * to create the foreign constraints. + */ + foreach_declared_ptr(shardInterval, shardIntervalList) + { + List *shardForeignConstraintCommandList = NIL; + List *referenceTableForeignConstraintList = NIL; + + CopyShardForeignConstraintCommandListGrouped(shardInterval, + &shardForeignConstraintCommandList, + &referenceTableForeignConstraintList); + + ShardCommandList *shardCommandList = CreateShardCommandList( + shardInterval, + list_concat(shardForeignConstraintCommandList, + referenceTableForeignConstraintList)); + shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, + shardCommandList); + } + + /* Now execute the Partitioning & Foreign constraints creation commads. */ + ShardCommandList *shardCommandList = NULL; + foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList) + { + char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, + shardCommandList->ddlCommandList); + } + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COMPLETING); } - - /* Now execute the Partitioning & Foreign constraints creation commads. */ - ShardCommandList *shardCommandList = NULL; - foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList) - { - char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, - shardCommandList->ddlCommandList); - } - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_COMPLETING); - MemoryContextReset(localContext); MemoryContextSwitchTo(oldContext); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 430eb8555..b19ed9231 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1925,7 +1925,7 @@ RegisterCitusConfigVariables(void) "for scheduled background tasks that involve a particular node"), NULL, &MaxBackgroundTaskExecutorsPerNode, - 1, 1, 128, + 4, 1, 128, PGC_SIGHUP, GUC_STANDARD, NULL, NULL, NULL); diff --git a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql index 4d0dc19a3..a5f2520cc 100644 --- a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql +++ b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql @@ -51,3 +51,4 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits; #include "udfs/citus_stat_counters/13.1-1.sql" #include "udfs/citus_stat_counters_reset/13.1-1.sql" #include "udfs/citus_nodes/13.1-1.sql" +#include "udfs/citus_copy_shard_placement/13.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_copy_shard_placement/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_copy_shard_placement/13.1-1.sql new file mode 100644 index 000000000..916269770 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_copy_shard_placement/13.1-1.sql @@ -0,0 +1,16 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_copy_one_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + flags integer, + transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_copy_one_shard_placement$$; +COMMENT ON FUNCTION pg_catalog.citus_copy_one_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + flags integer, + transfer_mode citus.shard_transfer_mode) +IS 'copy a single shard from the source node to the destination node'; diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 911880dc7..7a9d049b4 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -125,7 +125,7 @@ static volatile sig_atomic_t GotSighup = false; /* keeping track of parallel background tasks per node */ HTAB *ParallelTasksPerNode = NULL; -int MaxBackgroundTaskExecutorsPerNode = 1; +int MaxBackgroundTaskExecutorsPerNode = 4; PG_FUNCTION_INFO_V1(citus_job_cancel); PG_FUNCTION_INFO_V1(citus_job_wait); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 8f0d89fc9..ed27dd20b 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -41,6 +41,8 @@ #include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" +#include "distributed/shard_transfer.h" + /* local function forward declarations */ static List * WorkersWithoutReferenceTablePlacement(uint64 shardId, LOCKMODE lockMode); @@ -131,6 +133,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) * DROP TABLE and create_reference_table calls so that the list of reference tables we * operate on are stable. * + * * Since the changes to the reference table placements are made via loopback * connections we release the locks held at the end of this function. Due to Citus * only running transactions in READ COMMITTED mode we can be sure that other @@ -179,7 +182,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) if (list_length(newWorkersList) == 0) { /* - * All workers alreaddy have a copy of the reference tables, make sure that + * All workers already have a copy of the reference tables, make sure that * any locks obtained earlier are released. It will probably not matter, but * we release the locks in the reverse order we obtained them in. */ @@ -293,6 +296,295 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) } } +/* + * ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes is essentially a + * twin of EnsureReferenceTablesExistOnAllNodesExtended. The difference is instead of + * copying the missing tables on to the worker nodes this function creates the background tasks + * for each required copy operation and schedule it in the background job. + * Another difference is that instead of moving all the colocated shards sequencially + * this function creates a seperate background task for each shard, even when the shards + * are part of same colocated shard group. + * + * For transfering the shards in parallel the function creates a task for each shard + * move and than schedules another task that creates the shard relationships (if any) + * between shards and that task wait for the completion of all shard transfer tasks. + * + * The function returns an array of task ids that are created for creating the shard + * relationships, effectively completion of these tasks signals the completion of + * of reference table setup on the worker nodes. Any process that needs to wait for + * the completion of the reference table setup can wait for these tasks to complete. + * + * The transferMode is passed to this function gets ignored for now and it only uses + * block write mode. + */ +int64 * +ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char transferMode, int *nDependTasks) +{ + List *referenceTableIdList = NIL; + uint64 shardId = INVALID_SHARD_ID; + List *newWorkersList = NIL; + int64 *dependsTaskArray = NULL; + const char *referenceTableName = NULL; + int colocationId = GetReferenceTableColocationId(); + + *nDependTasks = 0; + if (colocationId == INVALID_COLOCATION_ID) + { + /* we have no reference table yet. */ + return 0; + } + + /* + * Most of the time this function should result in a conclusion where we do not need + * to copy any reference tables. To prevent excessive locking the majority of the time + * we run our precondition checks first with a lower lock. If, after checking with the + * lower lock, that we might need to copy reference tables we check with a more + * aggressive and self conflicting lock. It is important to be self conflicting in the + * second run to make sure that two concurrent calls to this routine will actually not + * run concurrently after the initial check. + * + * If after two iterations of precondition checks we still find the need for copying + * reference tables we exit the loop with all locks held. This will prevent concurrent + * DROP TABLE and create_reference_table calls so that the list of reference tables we + * operate on are stable. + * + * + * Since the changes to the reference table placements are made via loopback + * connections we release the locks held at the end of this function. Due to Citus + * only running transactions in READ COMMITTED mode we can be sure that other + * transactions correctly find the metadata entries. + */ + LOCKMODE lockmodes[] = { AccessShareLock, ExclusiveLock }; + for (int lockmodeIndex = 0; lockmodeIndex < lengthof(lockmodes); lockmodeIndex++) + { + LockColocationId(colocationId, lockmodes[lockmodeIndex]); + + referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE); + if (referenceTableIdList == NIL) + { + /* + * No reference tables exist, make sure that any locks obtained earlier are + * released. It will probably not matter, but we release the locks in the + * reverse order we obtained them in. + */ + for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return 0; + } + + Oid referenceTableId = linitial_oid(referenceTableIdList); + referenceTableName = get_rel_name(referenceTableId); + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + if (list_length(shardIntervalList) == 0) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", + referenceTableName))); + } + + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + shardId = shardInterval->shardId; + + /* + * We only take an access share lock, otherwise we'll hold up citus_add_node. + * In case of create_reference_table() where we don't want concurrent writes + * to pg_dist_node, we have already acquired ShareLock on pg_dist_node. + */ + newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, AccessShareLock); + if (list_length(newWorkersList) == 0) + { + /* + * All workers already have a copy of the reference tables, make sure that + * any locks obtained earlier are released. It will probably not matter, but + * we release the locks in the reverse order we obtained them in. + */ + for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return 0; + } + } + + /* + * citus_copy_shard_placement triggers metadata sync-up, which tries to + * acquire a ShareLock on pg_dist_node. We do master_copy_shad_placement + * in a separate connection. If we have modified pg_dist_node in the + * current backend, this will cause a deadlock. + */ + if (TransactionModifiedNodeMetadata) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate reference tables in a transaction " + "that modified node metadata"))); + } + + /* + * Modifications to reference tables in current transaction are not visible + * to citus_copy_shard_placement, since it is done in a separate backend. + */ + if (AnyRelationsModifiedInTransaction(referenceTableIdList)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate reference tables in a transaction " + "that modified a reference table"))); + } + + bool missingOk = false; + ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk); + if (sourceShardPlacement == NULL) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table shard " + UINT64_FORMAT + " does not have an active shard placement", + shardId))); + } + + WorkerNode *newWorkerNode = NULL; + BackgroundTask *task = NULL; + StringInfoData buf = { 0 }; + initStringInfo(&buf); + List *depTasksList = NIL; + const char *transferModeString = "block_writes"; /*For now we only support block writes*/ + + foreach_declared_ptr(newWorkerNode, newWorkersList) + { + ereport(DEBUG2, (errmsg("replicating reference table '%s' to %s:%d ...", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort))); + + Oid relationId = InvalidOid; + List *nodeTasksList = NIL; + + foreach_declared_oid(relationId, referenceTableIdList) + { + referenceTableName = get_rel_name(relationId); + List *shardIntervalList = LoadShardIntervalList(relationId); + /* Reference tables are supposed to have only one shard */ + if (list_length(shardIntervalList) != 1) + { + ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", + referenceTableName))); + } + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + shardId = shardInterval->shardId; + + resetStringInfo(&buf); + /* + * In first step just create and load data in the shards but defer the creation + * of the shard relationships to the next step. + * The reason we want to defer the creation of the shard relationships is that + * we want to make sure that all the parallel shard copy task are finished + * before we create the relationships. Otherwise we might end up with + * a situation where the dependent-shard task is still running and trying to + * create the shard relationships will result in ERROR. + */ + uint32 shardTransferFlags = SHARD_TRANSFER_SINGLE_SHARD_ONLY | + SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS; + + appendStringInfo(&buf, + "SELECT pg_catalog.citus_copy_one_shard_placement(%ld,%u,%u,%u,%s)", + shardId, + sourceShardPlacement->nodeId, + newWorkerNode->nodeId, + shardTransferFlags, + quote_literal_cstr(transferModeString)); + + ereport(DEBUG2, + (errmsg("replicating reference table '%s' to %s:%d ... QUERY= %s", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort,buf.data))); + + int nDepends = 0; + int64 *dependsArray = NULL; + int32 nodesInvolved[2] = { 0 }; + nodesInvolved[0] = sourceShardPlacement->nodeId; + nodesInvolved[1] = newWorkerNode->nodeId; + + task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data, + nDepends, + dependsArray, 2, + nodesInvolved); + + nodeTasksList = lappend(nodeTasksList,task); + } + /* Create a task to create reference table relations on this node */ + if (list_length(nodeTasksList) > 0) + { + int nDepends = list_length(nodeTasksList); + int32 nodesInvolved[2] = { 0 }; + nodesInvolved[0] = sourceShardPlacement->nodeId; + nodesInvolved[1] = newWorkerNode->nodeId; + int64 *dependsArray = palloc(sizeof(int64) * list_length(nodeTasksList)); + int idx = 0; + foreach_declared_ptr(task, nodeTasksList) + { + dependsArray[idx++] = task->taskid; + } + resetStringInfo(&buf); + uint32 shardTransferFlags = SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY; + + appendStringInfo(&buf, + "SELECT pg_catalog.citus_copy_one_shard_placement(%ld,%u,%u,%u,%s)", + shardId, + sourceShardPlacement->nodeId, + newWorkerNode->nodeId, + shardTransferFlags, + quote_literal_cstr(transferModeString)); + + ereport(DEBUG2, + (errmsg("creating relations for reference table '%s' on %s:%d ... QUERY= %s", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort,buf.data))); + + task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data, + nDepends, + dependsArray, 2, + nodesInvolved); + + depTasksList = lappend(depTasksList, task); + + pfree(dependsArray); + list_free(nodeTasksList); + nodeTasksList = NIL; + } + } + + /* + * compute a dependent task list array to be used to indicate the completion of all + * reference table shards copy, so that we can start with distributed shard copy + */ + if (list_length(depTasksList) > 0) + { + *nDependTasks = list_length(depTasksList); + dependsTaskArray = palloc(sizeof(int64) * *nDependTasks); + int idx = 0; + foreach_declared_ptr(task, depTasksList) + { + dependsTaskArray[idx++] = task->taskid; + } + list_free(depTasksList); + } + + /* + * Since reference tables have been copied via a loopback connection we do not have to + * retain our locks. Since Citus only runs well in READ COMMITTED mode we can be sure + * that other transactions will find the reference tables copied. + * We have obtained and held multiple locks, here we unlock them all in the reverse + * order we have obtained them in. + */ + for (int releaseLockmodeIndex = lengthof(lockmodes) - 1; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return dependsTaskArray; +} /* * HasNodesWithMissingReferenceTables checks if all reference tables are already copied to diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index cf5a6fd02..c40cf852b 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -21,6 +21,7 @@ extern void EnsureReferenceTablesExistOnAllNodes(void); extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode); +extern int64 *ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char transferMode, int *nDependTasks); extern bool HasNodesWithMissingReferenceTables(List **referenceTableList); extern uint32 CreateReferenceTableColocationId(void); extern uint32 GetReferenceTableColocationId(void); diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 7d67b173d..a34ab7e89 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -57,7 +57,8 @@ typedef enum CitusOperations CITUS_NONBLOCKING_SPLIT = 1, CITUS_CREATE_DISTRIBUTED_TABLE_CONCURRENTLY = 2, CITUS_CREATE_COLOCATION_DEFAULT = 3, - CITUS_BACKGROUND_TASK_MONITOR = 4 + CITUS_BACKGROUND_TASK_MONITOR = 4, + CITUS_REBALANCE_OPERATION = 5 } CitusOperations; /* reuse advisory lock, but with different, unused field 4 (4)*/ diff --git a/src/include/distributed/shard_transfer.h b/src/include/distributed/shard_transfer.h index c1621879b..27e1f1ff9 100644 --- a/src/include/distributed/shard_transfer.h +++ b/src/include/distributed/shard_transfer.h @@ -23,10 +23,51 @@ typedef enum SHARD_TRANSFER_COPY = 2 } ShardTransferType; +/* + * ShardTransferOperationMode is used to pass flags to the shard transfer + * function. The flags are used to control the behavior of the transfer + * function. + */ +typedef enum +{ + /* + * This flag instructs the transfer function to only transfer single shard + * rather than transfer all the colocated shards for the shard interval. + * Using this flag mean we might break the colocated shard + * relationship on the source node. So this is only usefull when setting up + * the new node and we are sure that the node would not be used until we have + * transfered all the shards. + * The reason we need this flag is that we want to be able to transfer + * colocated shards in parallel and for now it is only used for the reference + * table shards. + * Finally if you are using this flag, you should also use consider defering + * the creation of the relationships on the source node until all colocated + * shards are transfered (see: SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS). + */ + SHARD_TRANSFER_SINGLE_SHARD_ONLY = 1 << 0, + /* With this flag the shard transfer function does not create any constrainsts + * or foreign relations defined on the shard, This can be used to defer the + * creation of the relationships until all the shards are transfered. + * This is usefull when we are transfering colocated shards in parallel and + * we want to avoid the creation of the relationships on the source node + * until all the shards are transfered. + */ + SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS = 1 << 1, + + /* This flag is used to indicate that the shard transfer function should + * only create the relationships on the target node and not transfer any data. + * This is can be used to create the relationships that were defered + * during the transfering of shards. + */ + SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY = 1 << 2 +} ShardTransferOperationMode; + + extern void TransferShards(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, - char shardReplicationMode, ShardTransferType transferType); + char shardReplicationMode, ShardTransferType transferType, + uint32 optionFlags); extern uint64 ShardListSizeInBytes(List *colocatedShardList, char *workerNodeName, uint32 workerNodePort); extern void ErrorIfMoveUnsupportedTableType(Oid relationId); From a8a5f34dc92afb3f71bb1766d04e4d35d1ccf6cc Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Thu, 15 May 2025 14:31:22 +0500 Subject: [PATCH 2/3] Allows parallel shard moves using logical replication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Following changes are part of the commit - Switch from ShareUpdateExclusiveLock to ShareLock when locking colocated relations during a shard move. This blocks concurrent DDL/TRUNCATE on the tables while still allowing parallel shard moves for other colocated relations. - Drop the leftover replication lock that previously serialized shard moves performed via logical replication. This lock was only needed when we used to drop and recreate the subscriptions/publications before each move. Since Citus now removes those objects later as part of the “unused distributed objects” cleanup, shard moves via logical replication can safely run in parallel without additional locking. --- .../distributed/operations/shard_transfer.c | 11 +++++----- .../replication/multi_logical_replication.c | 22 +------------------ src/include/distributed/resource_lock.h | 12 +--------- 3 files changed, 7 insertions(+), 38 deletions(-) diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index bcc1d0b19..57c0f418b 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -439,10 +439,9 @@ TransferShards(int64 shardId, char *sourceNodeName, if (transferType == SHARD_TRANSFER_MOVE) { /* - * Block concurrent DDL / TRUNCATE commands on the relation. Similarly, - * block concurrent citus_move_shard_placement() on any shard of - * the same relation. This is OK for now since we're executing shard - * moves sequentially anyway. + * Block concurrent DDL / TRUNCATE commands on the relation. while, + * allow concurrent citus_move_shard_placement() on the shards of + * the same relation. */ LockColocatedRelationsForMove(colocatedTableList); } @@ -752,7 +751,7 @@ IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetN /* * LockColocatedRelationsForMove takes a list of relations, locks all of them - * using ShareUpdateExclusiveLock + * using ShareLock */ static void LockColocatedRelationsForMove(List *colocatedTableList) @@ -760,7 +759,7 @@ LockColocatedRelationsForMove(List *colocatedTableList) Oid colocatedTableId = InvalidOid; foreach_declared_oid(colocatedTableId, colocatedTableList) { - LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock); + LockRelationOid(colocatedTableId, ShareLock); } } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 7189216d0..6ab73bdd3 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -132,7 +132,6 @@ static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); static void WaitForMiliseconds(long timeout); static XLogRecPtr GetSubscriptionPosition( GroupedLogicalRepTargets *groupedLogicalRepTargets); -static void AcquireLogicalReplicationLock(void); static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode, List *shardIntervals); @@ -156,7 +155,6 @@ void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, int targetNodePort) { - AcquireLogicalReplicationLock(); char *superUser = CitusExtensionOwnerName(); char *databaseName = get_database_name(MyDatabaseId); int connectionFlags = FORCE_NEW_CONNECTION; @@ -268,6 +266,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo */ CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash); CloseConnection(sourceConnection); + } @@ -497,25 +496,6 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList) } -/* - * AcquireLogicalReplicationLock tries to acquire a lock for logical - * replication. We need this lock, because at the start of logical replication - * we clean up old subscriptions and publications. Because of this cleanup it's - * not safe to run multiple logical replication based shard moves at the same - * time. If multiple logical replication moves would run at the same time, the - * second move might clean up subscriptions and publications that are in use by - * another move. - */ -static void -AcquireLogicalReplicationLock(void) -{ - LOCKTAG tag; - SET_LOCKTAG_LOGICAL_REPLICATION(tag); - - LockAcquire(&tag, ExclusiveLock, false, false); -} - - /* * PrepareReplicationSubscriptionList returns list of shards to be logically * replicated from given shard list. This is needed because Postgres does not diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index a34ab7e89..04537a8de 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -44,7 +44,7 @@ typedef enum AdvisoryLocktagClass ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8, ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9, ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID = 10, - ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, + ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, /* Not used anymore */ ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13, ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK = 14, ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION = 15 @@ -125,16 +125,6 @@ typedef enum CitusOperations (uint32) operationId, \ ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID) -/* reuse advisory lock, but with different, unused field 4 (12) - * Also it has the database hardcoded to MyDatabaseId, to ensure the locks - * are local to each database */ -#define SET_LOCKTAG_LOGICAL_REPLICATION(tag) \ - SET_LOCKTAG_ADVISORY(tag, \ - MyDatabaseId, \ - (uint32) 0, \ - (uint32) 0, \ - ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION) - /* reuse advisory lock, but with different, unused field 4 (14) * Also it has the database hardcoded to MyDatabaseId, to ensure the locks * are local to each database */ From e8361bcab8e49638b7303d7089603095a0e2e1d7 Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Thu, 15 May 2025 14:43:05 +0500 Subject: [PATCH 3/3] Fix indentation suggestions --- .../distributed/operations/shard_rebalancer.c | 23 +++-- .../distributed/operations/shard_transfer.c | 97 +++++++++++-------- 2 files changed, 69 insertions(+), 51 deletions(-) diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index fe51fafa4..b6c12f7a4 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -768,6 +768,7 @@ AcquireRebalanceColocationLock(Oid relationId, const char *operationName) } } + /* * AcquireRebalanceOperationLock does not allow concurrent rebalance * operations. @@ -786,14 +787,15 @@ AcquireRebalanceOperationLock(const char *operationName) if (!lockAcquired) { ereport(ERROR, (errmsg("could not acquire the lock required for %s operation", - operationName), + operationName), errdetail("It means that either a concurrent shard move " - "or shard copy is happening."), + "or shard copy is happening."), errhint("Make sure that the concurrent operation has " "finished and re-run the command"))); } } + /* * AcquirePlacementColocationLock tries to acquire a lock for * rebalance/replication while moving/copying the placement. If this @@ -2033,13 +2035,14 @@ InitializeShardMoveDependencies() */ static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 *refTablesDepTaskIds, - int refTablesDepTaskIdsCount, + int refTablesDepTaskIdsCount, ShardMoveDependencies shardMoveDependencies, int *nDepends) { HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64, "shardMoveDependencyList", 0); bool found; + /* * Check if there exists moves scheduled earlier whose source node * overlaps with the current move's target node. @@ -2104,7 +2107,6 @@ static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId, ShardMoveDependencies shardMoveDependencies) { - bool found; ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search( shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER, @@ -2204,13 +2206,14 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo if (HasNodesWithMissingReferenceTables(&referenceTableIdList)) { - refTablesDepTaskIds = ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(jobId, TRANSFER_MODE_BLOCK_WRITES, &refTablesDepTaskIdsCount); + refTablesDepTaskIds = ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes( + jobId, TRANSFER_MODE_BLOCK_WRITES, &refTablesDepTaskIdsCount); ereport(DEBUG2, - (errmsg("%d dependent copy reference table tasks for job %ld", - refTablesDepTaskIdsCount, jobId), - errdetail("Rebalance scheduled as background job"), - errhint("To monitor progress, run: SELECT * FROM " - "citus_rebalance_status();"))); + (errmsg("%d dependent copy reference table tasks for job %ld", + refTablesDepTaskIdsCount, jobId), + errdetail("Rebalance scheduled as background job"), + errhint("To monitor progress, run: SELECT * FROM " + "citus_rebalance_status();"))); } PlacementUpdateEvent *move = NULL; diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 57c0f418b..3af46dde5 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -305,6 +305,7 @@ citus_copy_one_shard_placement(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + /* * citus_move_shard_placement moves given shard (and its co-located shards) from one * node to the other node. To accomplish this it entirely recreates the table structure @@ -378,6 +379,8 @@ citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + + /* * TransferShards is the function for shard transfers. */ @@ -464,9 +467,11 @@ TransferShards(int64 shardId, char *sourceNodeName, } bool transferAlreadyCompleted = TransferAlreadyCompleted(colocatedShardList, - sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort, - transferType); + sourceNodeName, + sourceNodePort, + targetNodeName, + targetNodePort, + transferType); /* * If we just need to create the shard relationships,We don't need to do anything @@ -482,15 +487,18 @@ TransferShards(int64 shardId, char *sourceNodeName, * the relationships, we can return right away */ ereport(WARNING, (errmsg("shard is not present on node %s:%d", - targetNodeName, targetNodePort), - errdetail("%s may have not completed.", + targetNodeName, targetNodePort), + errdetail("%s may have not completed.", operationNameCapitalized))); return; } - CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort, (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL), + CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName + , + targetNodePort, (shardReplicationMode == + TRANSFER_MODE_FORCE_LOGICAL), operationFunctionName, optionFlags); + /* We don't need to do anything else, just return */ return; } @@ -499,8 +507,8 @@ TransferShards(int64 shardId, char *sourceNodeName, { /* if the transfer is already completed, we can return right away */ ereport(WARNING, (errmsg("shard is already present on node %s:%d", - targetNodeName, targetNodePort), - errdetail("%s may have already completed.", + targetNodeName, targetNodePort), + errdetail("%s may have already completed.", operationNameCapitalized))); return; } @@ -591,7 +599,8 @@ TransferShards(int64 shardId, char *sourceNodeName, } CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort, useLogicalReplication, operationFunctionName, optionFlags); + targetNodePort, useLogicalReplication, operationFunctionName, + optionFlags); if (transferType == SHARD_TRANSFER_MOVE) { @@ -649,6 +658,7 @@ TransferShards(int64 shardId, char *sourceNodeName, FinalizeCurrentProgressMonitor(); } + /* * Insert deferred cleanup records. * The shards will be dropped by background cleaner later. @@ -1534,28 +1544,30 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, foreach_declared_ptr(shardInterval, shardIntervalList) { /* - * For each shard we first create the shard table in a separate - * transaction and then we copy the data and create the indexes in a - * second separate transaction. The reason we don't do both in a single - * transaction is so we can see the size of the new shard growing - * during the copy when we run get_rebalance_progress in another - * session. If we wouldn't split these two phases up, then the table - * wouldn't be visible in the session that get_rebalance_progress uses. - * So get_rebalance_progress would always report its size as 0. - */ - List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, - sourceNodePort); + * For each shard we first create the shard table in a separate + * transaction and then we copy the data and create the indexes in a + * second separate transaction. The reason we don't do both in a single + * transaction is so we can see the size of the new shard growing + * during the copy when we run get_rebalance_progress in another + * session. If we wouldn't split these two phases up, then the table + * wouldn't be visible in the session that get_rebalance_progress uses. + * So get_rebalance_progress would always report its size as 0. + */ + List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, + sourceNodeName, + sourceNodePort); char *tableOwner = TableOwner(shardInterval->relationId); /* drop the shard we created on the target, in case of failure */ InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, - ConstructQualifiedShardName(shardInterval), - GroupForNode(targetNodeName, - targetNodePort), - CLEANUP_ON_FAILURE); + ConstructQualifiedShardName( + shardInterval), + GroupForNode(targetNodeName, + targetNodePort), + CLEANUP_ON_FAILURE); SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); + tableOwner, ddlCommandList); } UpdatePlacementUpdateStatusForShardIntervalList( @@ -1578,10 +1590,10 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, { List *ddlCommandList = PostLoadShardCreationCommandList(shardInterval, sourceNodeName, - sourceNodePort); + sourceNodePort); char *tableOwner = TableOwner(shardInterval->relationId); SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); + tableOwner, ddlCommandList); MemoryContextReset(localContext); } @@ -1594,9 +1606,9 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, if (!(optionFlags & SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS)) { /* - * Once all shards are copied, we can recreate relationships between shards. - * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. - */ + * Once all shards are copied, we can recreate relationships between shards. + * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. + */ List *shardIntervalWithDDCommandsList = NIL; foreach_declared_ptr(shardInterval, shardIntervalList) { @@ -1609,7 +1621,7 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, shardInterval, list_make1(attachPartitionCommand)); shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, - shardCommandList); + shardCommandList); } } @@ -1620,24 +1632,26 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); /* - * Iterate through the colocated shards and create DDL commamnds - * to create the foreign constraints. - */ + * Iterate through the colocated shards and create DDL commamnds + * to create the foreign constraints. + */ foreach_declared_ptr(shardInterval, shardIntervalList) { List *shardForeignConstraintCommandList = NIL; List *referenceTableForeignConstraintList = NIL; CopyShardForeignConstraintCommandListGrouped(shardInterval, - &shardForeignConstraintCommandList, - &referenceTableForeignConstraintList); + & + shardForeignConstraintCommandList, + & + referenceTableForeignConstraintList); ShardCommandList *shardCommandList = CreateShardCommandList( shardInterval, list_concat(shardForeignConstraintCommandList, referenceTableForeignConstraintList)); shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, - shardCommandList); + shardCommandList); } /* Now execute the Partitioning & Foreign constraints creation commads. */ @@ -1646,8 +1660,8 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, { char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, - shardCommandList->ddlCommandList); + tableOwner, + shardCommandList->ddlCommandList); } UpdatePlacementUpdateStatusForShardIntervalList( @@ -1736,7 +1750,8 @@ CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardInte ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList, MaxAdaptiveExecutorPoolSize, - NULL /* jobIdList (ignored by API implementation) */); + NULL /* jobIdList (ignored by API implementation) */ + ); }