From 49e56001fd92e37e1df22f56e85e8f820dd7fdd5 Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Tue, 6 May 2025 12:31:53 +0500 Subject: [PATCH] Parallelize shard rebalancing to reduce rebalance time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit introduces the following changes: - Break out reference-table shard copies into independent background tasks - Introduce a new “single-shard” flag to the TransferShard APIs so they can either transfer all collected shards for a shard ID (existing behavior), or copy just the one shard specified by shardID - In citus_rebalance_start(), use this flag to spawn a separate background task for each reference-table shard, creating the table and loading its data without any constraints - Added a SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY flag so TransferShards() can defer foreign-key and other constraint creation - After all reference-table copies complete, schedule a final task that applies all deferred constraints in one batch - Introduce an advisory lock to serialize rebalance operations; downgrade previous colocation locks from ExclusiveLock to RowExclusiveLock so they don’t conflict with the rebalance - Remove intra-colocation-group dependencies so shards in the same group can move independently - Increase default citus.max_background_task_executors_per_node from 1 to 4 --- .../distributed/operations/shard_rebalancer.c | 107 ++--- .../distributed/operations/shard_transfer.c | 364 +++++++++++------- src/backend/distributed/shared_library_init.c | 2 +- .../distributed/sql/citus--13.0-1--13.1-1.sql | 1 + .../citus_copy_shard_placement/13.1-1.sql | 16 + .../distributed/utils/background_jobs.c | 2 +- .../distributed/utils/reference_table_utils.c | 294 +++++++++++++- .../distributed/reference_table_utils.h | 1 + src/include/distributed/resource_lock.h | 3 +- src/include/distributed/shard_transfer.h | 43 ++- 10 files changed, 641 insertions(+), 192 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_copy_shard_placement/13.1-1.sql diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 074f1bed0..fe51fafa4 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -220,7 +220,6 @@ typedef struct ShardMoveSourceNodeHashEntry */ typedef struct ShardMoveDependencies { - HTAB *colocationDependencies; HTAB *nodeDependencies; } ShardMoveDependencies; @@ -274,6 +273,7 @@ static int64 RebalanceTableShardsBackground(RebalanceOptions *options, Oid static void AcquireRebalanceColocationLock(Oid relationId, const char *operationName); static void ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, char *noticeOperation); +static void AcquireRebalanceOperationLock(const char *operationName); static float4 CalculateUtilization(float4 totalCost, float4 capacity); static Form_pg_dist_rebalance_strategy GetRebalanceStrategy(Name name); static void EnsureShardCostUDF(Oid functionOid); @@ -297,8 +297,9 @@ static void ErrorOnConcurrentRebalance(RebalanceOptions *); static List * GetSetCommandListForNewConnections(void); static int64 GetColocationId(PlacementUpdateEvent *move); static ShardMoveDependencies InitializeShardMoveDependencies(); -static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 - colocationId, +static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, + int64 *refTablesDepTaskIds, + int refTablesDepTaskIdsCount, ShardMoveDependencies shardMoveDependencies, int *nDepends); static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, @@ -767,6 +768,31 @@ AcquireRebalanceColocationLock(Oid relationId, const char *operationName) } } +/* + * AcquireRebalanceOperationLock does not allow concurrent rebalance + * operations. + */ +static void +AcquireRebalanceOperationLock(const char *operationName) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = true; + + SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_REBALANCE_OPERATION); + + LockAcquireResult lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock, + dontWait); + if (!lockAcquired) + { + ereport(ERROR, (errmsg("could not acquire the lock required for %s operation", + operationName), + errdetail("It means that either a concurrent shard move " + "or shard copy is happening."), + errhint("Make sure that the concurrent operation has " + "finished and re-run the command"))); + } +} /* * AcquirePlacementColocationLock tries to acquire a lock for @@ -1954,6 +1980,8 @@ ErrorOnConcurrentRebalance(RebalanceOptions *options) AcquireRebalanceColocationLock(relationId, options->operationName); } + AcquireRebalanceOperationLock(options->operationName); + int64 jobId = 0; if (HasNonTerminalJobOfType("rebalance", &jobId)) { @@ -1991,10 +2019,6 @@ static ShardMoveDependencies InitializeShardMoveDependencies() { ShardMoveDependencies shardMoveDependencies; - shardMoveDependencies.colocationDependencies = CreateSimpleHashWithNameAndSize(int64, - ShardMoveDependencyInfo, - "colocationDependencyHashMap", - 6); shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int32, ShardMoveSourceNodeHashEntry, "nodeDependencyHashMap", @@ -2008,23 +2032,14 @@ InitializeShardMoveDependencies() * the move must take a dependency on, given the shard move dependencies as input. */ static int64 * -GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, +GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 *refTablesDepTaskIds, + int refTablesDepTaskIdsCount, ShardMoveDependencies shardMoveDependencies, int *nDepends) { HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64, "shardMoveDependencyList", 0); bool found; - - /* Check if there exists a move in the same colocation group scheduled earlier. */ - ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( - shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, &found); - - if (found) - { - hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); - } - /* * Check if there exists moves scheduled earlier whose source node * overlaps with the current move's target node. @@ -2045,6 +2060,19 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, } } + /* + * shard copy can only start after finishing copy of reference table shards + * so each shard task will have a dependency on the task that indicates the + * copy complete of reference tables + */ + while (refTablesDepTaskIdsCount > 0) + { + int64 refTableTaskId = *refTablesDepTaskIds; + hash_search(dependsList, &refTableTaskId, HASH_ENTER, NULL); + refTablesDepTaskIds++; + refTablesDepTaskIdsCount--; + } + *nDepends = hash_get_num_entries(dependsList); int64 *dependsArray = NULL; @@ -2076,9 +2104,6 @@ static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId, ShardMoveDependencies shardMoveDependencies) { - ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( - shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL); - shardMoveDependencyInfo->taskId = taskId; bool found; ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search( @@ -2174,30 +2199,18 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo initStringInfo(&buf); List *referenceTableIdList = NIL; - int64 replicateRefTablesTaskId = 0; + int64 *refTablesDepTaskIds = NULL; + int refTablesDepTaskIdsCount = 0; if (HasNodesWithMissingReferenceTables(&referenceTableIdList)) { - if (shardTransferMode == TRANSFER_MODE_AUTOMATIC) - { - VerifyTablesHaveReplicaIdentity(referenceTableIdList); - } - - /* - * Reference tables need to be copied to (newly-added) nodes, this needs to be the - * first task before we can move any other table. - */ - appendStringInfo(&buf, - "SELECT pg_catalog.replicate_reference_tables(%s)", - quote_literal_cstr(shardTranferModeLabel)); - - int32 nodesInvolved[] = { 0 }; - - /* replicate_reference_tables permissions require superuser */ - Oid superUserId = CitusExtensionOwner(); - BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data, 0, - NULL, 0, nodesInvolved); - replicateRefTablesTaskId = task->taskid; + refTablesDepTaskIds = ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(jobId, TRANSFER_MODE_BLOCK_WRITES, &refTablesDepTaskIdsCount); + ereport(DEBUG2, + (errmsg("%d dependent copy reference table tasks for job %ld", + refTablesDepTaskIdsCount, jobId), + errdetail("Rebalance scheduled as background job"), + errhint("To monitor progress, run: SELECT * FROM " + "citus_rebalance_status();"))); } PlacementUpdateEvent *move = NULL; @@ -2219,17 +2232,11 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo int nDepends = 0; - int64 *dependsArray = GenerateTaskMoveDependencyList(move, colocationId, + int64 *dependsArray = GenerateTaskMoveDependencyList(move, refTablesDepTaskIds, + refTablesDepTaskIdsCount, shardMoveDependencies, &nDepends); - if (nDepends == 0 && replicateRefTablesTaskId > 0) - { - nDepends = 1; - dependsArray = palloc(nDepends * sizeof(int64)); - dependsArray[0] = replicateRefTablesTaskId; - } - int32 nodesInvolved[2] = { 0 }; nodesInvolved[0] = move->sourceNode->nodeId; nodesInvolved[1] = move->targetNode->nodeId; diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index b7d07b2cf..bcc1d0b19 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -107,16 +107,18 @@ static void ErrorIfSameNode(char *sourceNodeName, int sourceNodePort, static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, bool useLogicalReplication, - const char *operationName); + const char *operationName, uint32 optionFlags); static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort); + int32 targetNodePort, + uint32 optionFlags); static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, - char *targetNodeName, int32 targetNodePort); + char *targetNodeName, int32 targetNodePort, + uint32 optionFlags); static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, int32 targetNodePort); @@ -174,6 +176,7 @@ PG_FUNCTION_INFO_V1(master_copy_shard_placement); PG_FUNCTION_INFO_V1(citus_move_shard_placement); PG_FUNCTION_INFO_V1(citus_move_shard_placement_with_nodeid); PG_FUNCTION_INFO_V1(master_move_shard_placement); +PG_FUNCTION_INFO_V1(citus_copy_one_shard_placement); double DesiredPercentFreeAfterMove = 10; bool CheckAvailableSpaceBeforeMove = true; @@ -203,7 +206,7 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } @@ -232,7 +235,7 @@ citus_copy_shard_placement_with_nodeid(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, targetNode->workerName, targetNode->workerPort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } @@ -267,13 +270,41 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } +/* + * master_copy_shard_placement is a wrapper function for old UDF name. + */ +Datum +citus_copy_one_shard_placement(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + int64 shardId = PG_GETARG_INT64(0); + uint32 sourceNodeId = PG_GETARG_INT32(1); + uint32 targetNodeId = PG_GETARG_INT32(2); + uint32 flags = PG_GETARG_INT32(3); + Oid shardReplicationModeOid = PG_GETARG_OID(4); + + bool missingOk = false; + WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk); + WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk); + + char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); + + TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, + targetNode->workerName, targetNode->workerPort, + shardReplicationMode, SHARD_TRANSFER_COPY, flags); + + PG_RETURN_VOID(); +} + /* * citus_move_shard_placement moves given shard (and its co-located shards) from one * node to the other node. To accomplish this it entirely recreates the table structure @@ -315,7 +346,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_MOVE); + shardReplicationMode, SHARD_TRANSFER_MOVE, 0); PG_RETURN_VOID(); } @@ -343,12 +374,10 @@ citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS) char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, targetNode->workerName, - targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE); + targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE, 0); PG_RETURN_VOID(); } - - /* * TransferShards is the function for shard transfers. */ @@ -356,7 +385,7 @@ void TransferShards(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, char shardReplicationMode, - ShardTransferType transferType) + ShardTransferType transferType, uint32 optionFlags) { /* strings to be used in log messages */ const char *operationName = ShardTransferTypeNames[transferType]; @@ -385,10 +414,25 @@ TransferShards(int64 shardId, char *sourceNodeName, ErrorIfTargetNodeIsNotSafeForTransfer(targetNodeName, targetNodePort, transferType); - AcquirePlacementColocationLock(distributedTableId, ExclusiveLock, operationName); + AcquirePlacementColocationLock(distributedTableId, RowExclusiveLock, operationName); - List *colocatedTableList = ColocatedTableList(distributedTableId); - List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + List *colocatedTableList; + List *colocatedShardList; + + /* + * If SHARD_TRANSFER_SINGLE_SHARD_ONLY is set, we only transfer the shard + * specified by shardId. Otherwise, we transfer all colocated shards. + */ + if (optionFlags & SHARD_TRANSFER_SINGLE_SHARD_ONLY) + { + colocatedTableList = list_make1_oid(distributedTableId); + colocatedShardList = list_make1(shardInterval); + } + else + { + colocatedTableList = ColocatedTableList(distributedTableId); + colocatedShardList = ColocatedShardIntervalList(shardInterval); + } EnsureTableListOwner(colocatedTableList); @@ -412,19 +456,52 @@ TransferShards(int64 shardId, char *sourceNodeName, /* * We sort shardIntervalList so that lock operations will not cause any - * deadlocks. + * deadlocks. But we do not need to do that if the list contain only one + * shard. */ - colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + if (!(optionFlags & SHARD_TRANSFER_SINGLE_SHARD_ONLY)) + { + colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + } - if (TransferAlreadyCompleted(colocatedShardList, - sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort, - transferType)) + bool transferAlreadyCompleted = TransferAlreadyCompleted(colocatedShardList, + sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, + transferType); + + /* + * If we just need to create the shard relationships,We don't need to do anything + * else other than calling CopyShardTables with SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY + * flag. + */ + if (optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY) + { + if (!transferAlreadyCompleted) + { + /* + * if the transfer is not completed, and we are here just to create + * the relationships, we can return right away + */ + ereport(WARNING, (errmsg("shard is not present on node %s:%d", + targetNodeName, targetNodePort), + errdetail("%s may have not completed.", + operationNameCapitalized))); + return; + } + + CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, + targetNodePort, (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL), + operationFunctionName, optionFlags); + /* We don't need to do anything else, just return */ + return; + } + + if (transferAlreadyCompleted) { /* if the transfer is already completed, we can return right away */ ereport(WARNING, (errmsg("shard is already present on node %s:%d", - targetNodeName, targetNodePort), - errdetail("%s may have already completed.", + targetNodeName, targetNodePort), + errdetail("%s may have already completed.", operationNameCapitalized))); return; } @@ -515,7 +592,7 @@ TransferShards(int64 shardId, char *sourceNodeName, } CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort, useLogicalReplication, operationFunctionName); + targetNodePort, useLogicalReplication, operationFunctionName, optionFlags); if (transferType == SHARD_TRANSFER_MOVE) { @@ -573,7 +650,6 @@ TransferShards(int64 shardId, char *sourceNodeName, FinalizeCurrentProgressMonitor(); } - /* * Insert deferred cleanup records. * The shards will be dropped by background cleaner later. @@ -1131,9 +1207,9 @@ BlockWritesToShardList(List *shardList) * asynchronous shard copy in case of cascading DML operations. */ LockReferencedReferenceShardDistributionMetadata(shard->shardId, - ExclusiveLock); + RowExclusiveLock); - LockShardDistributionMetadata(shard->shardId, ExclusiveLock); + LockShardDistributionMetadata(shard->shardId, RowExclusiveLock); } /* following code relies on the list to have at least one shard */ @@ -1156,7 +1232,7 @@ BlockWritesToShardList(List *shardList) * Even if users disable metadata sync, we cannot allow them not to * acquire the remote locks. Hence, we have !IsCoordinator() check. */ - LockShardListMetadataOnWorkers(ExclusiveLock, shardList); + LockShardListMetadataOnWorkers(RowExclusiveLock, shardList); } } @@ -1333,7 +1409,7 @@ ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList) static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, bool useLogicalReplication, - const char *operationName) + const char *operationName, uint32 optionFlags) { if (list_length(shardIntervalList) < 1) { @@ -1347,12 +1423,12 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP { CopyShardTablesViaLogicalReplication(shardIntervalList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort); + targetNodePort, optionFlags); } else { CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort); + targetNodeName, targetNodePort, optionFlags); } /* @@ -1369,7 +1445,7 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort) + int32 targetNodePort, uint32 optionFlags) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaLogicalReplication", @@ -1437,7 +1513,7 @@ CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList) static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort) + int32 targetNodePort, uint32 optionFlags) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaBlockWrites", @@ -1446,127 +1522,141 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); - - /* iterate through the colocated shards and copy each */ ShardInterval *shardInterval = NULL; - foreach_declared_ptr(shardInterval, shardIntervalList) - { - /* - * For each shard we first create the shard table in a separate - * transaction and then we copy the data and create the indexes in a - * second separate transaction. The reason we don't do both in a single - * transaction is so we can see the size of the new shard growing - * during the copy when we run get_rebalance_progress in another - * session. If we wouldn't split these two phases up, then the table - * wouldn't be visible in the session that get_rebalance_progress uses. - * So get_rebalance_progress would always report its size as 0. - */ - List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, - sourceNodePort); - char *tableOwner = TableOwner(shardInterval->relationId); - - /* drop the shard we created on the target, in case of failure */ - InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, - ConstructQualifiedShardName(shardInterval), - GroupForNode(targetNodeName, - targetNodePort), - CLEANUP_ON_FAILURE); - - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); - } - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_COPYING_DATA); - - ConflictWithIsolationTestingBeforeCopy(); - CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); - ConflictWithIsolationTestingAfterCopy(); - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS); - - foreach_declared_ptr(shardInterval, shardIntervalList) - { - List *ddlCommandList = - PostLoadShardCreationCommandList(shardInterval, sourceNodeName, - sourceNodePort); - char *tableOwner = TableOwner(shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); - - MemoryContextReset(localContext); - } /* - * Once all shards are copied, we can recreate relationships between shards. - * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. + * If we’re only asked to create the relationships, the shards are already + * present and populated on the node. Skip the table‑setup and data‑loading + * steps and proceed straight to creating the relationships. */ - List *shardIntervalWithDDCommandsList = NIL; - foreach_declared_ptr(shardInterval, shardIntervalList) + if (!(optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY)) { - if (PartitionTable(shardInterval->relationId)) + /* iterate through the colocated shards and copy each */ + foreach_declared_ptr(shardInterval, shardIntervalList) { - char *attachPartitionCommand = - GenerateAttachShardPartitionCommand(shardInterval); + /* + * For each shard we first create the shard table in a separate + * transaction and then we copy the data and create the indexes in a + * second separate transaction. The reason we don't do both in a single + * transaction is so we can see the size of the new shard growing + * during the copy when we run get_rebalance_progress in another + * session. If we wouldn't split these two phases up, then the table + * wouldn't be visible in the session that get_rebalance_progress uses. + * So get_rebalance_progress would always report its size as 0. + */ + List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, + sourceNodePort); + char *tableOwner = TableOwner(shardInterval->relationId); - ShardCommandList *shardCommandList = CreateShardCommandList( - shardInterval, - list_make1(attachPartitionCommand)); - shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, - shardCommandList); + /* drop the shard we created on the target, in case of failure */ + InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, + ConstructQualifiedShardName(shardInterval), + GroupForNode(targetNodeName, + targetNodePort), + CLEANUP_ON_FAILURE); + + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + } + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COPYING_DATA); + + ConflictWithIsolationTestingBeforeCopy(); + CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); + ConflictWithIsolationTestingAfterCopy(); + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS); + + foreach_declared_ptr(shardInterval, shardIntervalList) + { + List *ddlCommandList = + PostLoadShardCreationCommandList(shardInterval, sourceNodeName, + sourceNodePort); + char *tableOwner = TableOwner(shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + + MemoryContextReset(localContext); } } - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); - /* - * Iterate through the colocated shards and create DDL commamnds - * to create the foreign constraints. + * Skip creating shard relationships if the caller has requested that they + * not be created. */ - foreach_declared_ptr(shardInterval, shardIntervalList) + if (!(optionFlags & SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS)) { - List *shardForeignConstraintCommandList = NIL; - List *referenceTableForeignConstraintList = NIL; + /* + * Once all shards are copied, we can recreate relationships between shards. + * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. + */ + List *shardIntervalWithDDCommandsList = NIL; + foreach_declared_ptr(shardInterval, shardIntervalList) + { + if (PartitionTable(shardInterval->relationId)) + { + char *attachPartitionCommand = + GenerateAttachShardPartitionCommand(shardInterval); - CopyShardForeignConstraintCommandListGrouped(shardInterval, - &shardForeignConstraintCommandList, - &referenceTableForeignConstraintList); + ShardCommandList *shardCommandList = CreateShardCommandList( + shardInterval, + list_make1(attachPartitionCommand)); + shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, + shardCommandList); + } + } - ShardCommandList *shardCommandList = CreateShardCommandList( - shardInterval, - list_concat(shardForeignConstraintCommandList, - referenceTableForeignConstraintList)); - shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, - shardCommandList); + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); + + /* + * Iterate through the colocated shards and create DDL commamnds + * to create the foreign constraints. + */ + foreach_declared_ptr(shardInterval, shardIntervalList) + { + List *shardForeignConstraintCommandList = NIL; + List *referenceTableForeignConstraintList = NIL; + + CopyShardForeignConstraintCommandListGrouped(shardInterval, + &shardForeignConstraintCommandList, + &referenceTableForeignConstraintList); + + ShardCommandList *shardCommandList = CreateShardCommandList( + shardInterval, + list_concat(shardForeignConstraintCommandList, + referenceTableForeignConstraintList)); + shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, + shardCommandList); + } + + /* Now execute the Partitioning & Foreign constraints creation commads. */ + ShardCommandList *shardCommandList = NULL; + foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList) + { + char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, + shardCommandList->ddlCommandList); + } + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COMPLETING); } - - /* Now execute the Partitioning & Foreign constraints creation commads. */ - ShardCommandList *shardCommandList = NULL; - foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList) - { - char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, - shardCommandList->ddlCommandList); - } - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_COMPLETING); - MemoryContextReset(localContext); MemoryContextSwitchTo(oldContext); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 430eb8555..b19ed9231 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1925,7 +1925,7 @@ RegisterCitusConfigVariables(void) "for scheduled background tasks that involve a particular node"), NULL, &MaxBackgroundTaskExecutorsPerNode, - 1, 1, 128, + 4, 1, 128, PGC_SIGHUP, GUC_STANDARD, NULL, NULL, NULL); diff --git a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql index 4d0dc19a3..a5f2520cc 100644 --- a/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql +++ b/src/backend/distributed/sql/citus--13.0-1--13.1-1.sql @@ -51,3 +51,4 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits; #include "udfs/citus_stat_counters/13.1-1.sql" #include "udfs/citus_stat_counters_reset/13.1-1.sql" #include "udfs/citus_nodes/13.1-1.sql" +#include "udfs/citus_copy_shard_placement/13.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_copy_shard_placement/13.1-1.sql b/src/backend/distributed/sql/udfs/citus_copy_shard_placement/13.1-1.sql new file mode 100644 index 000000000..916269770 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_copy_shard_placement/13.1-1.sql @@ -0,0 +1,16 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_copy_one_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + flags integer, + transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_copy_one_shard_placement$$; +COMMENT ON FUNCTION pg_catalog.citus_copy_one_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + flags integer, + transfer_mode citus.shard_transfer_mode) +IS 'copy a single shard from the source node to the destination node'; diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 911880dc7..7a9d049b4 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -125,7 +125,7 @@ static volatile sig_atomic_t GotSighup = false; /* keeping track of parallel background tasks per node */ HTAB *ParallelTasksPerNode = NULL; -int MaxBackgroundTaskExecutorsPerNode = 1; +int MaxBackgroundTaskExecutorsPerNode = 4; PG_FUNCTION_INFO_V1(citus_job_cancel); PG_FUNCTION_INFO_V1(citus_job_wait); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 8f0d89fc9..ed27dd20b 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -41,6 +41,8 @@ #include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" +#include "distributed/shard_transfer.h" + /* local function forward declarations */ static List * WorkersWithoutReferenceTablePlacement(uint64 shardId, LOCKMODE lockMode); @@ -131,6 +133,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) * DROP TABLE and create_reference_table calls so that the list of reference tables we * operate on are stable. * + * * Since the changes to the reference table placements are made via loopback * connections we release the locks held at the end of this function. Due to Citus * only running transactions in READ COMMITTED mode we can be sure that other @@ -179,7 +182,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) if (list_length(newWorkersList) == 0) { /* - * All workers alreaddy have a copy of the reference tables, make sure that + * All workers already have a copy of the reference tables, make sure that * any locks obtained earlier are released. It will probably not matter, but * we release the locks in the reverse order we obtained them in. */ @@ -293,6 +296,295 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) } } +/* + * ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes is essentially a + * twin of EnsureReferenceTablesExistOnAllNodesExtended. The difference is instead of + * copying the missing tables on to the worker nodes this function creates the background tasks + * for each required copy operation and schedule it in the background job. + * Another difference is that instead of moving all the colocated shards sequencially + * this function creates a seperate background task for each shard, even when the shards + * are part of same colocated shard group. + * + * For transfering the shards in parallel the function creates a task for each shard + * move and than schedules another task that creates the shard relationships (if any) + * between shards and that task wait for the completion of all shard transfer tasks. + * + * The function returns an array of task ids that are created for creating the shard + * relationships, effectively completion of these tasks signals the completion of + * of reference table setup on the worker nodes. Any process that needs to wait for + * the completion of the reference table setup can wait for these tasks to complete. + * + * The transferMode is passed to this function gets ignored for now and it only uses + * block write mode. + */ +int64 * +ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char transferMode, int *nDependTasks) +{ + List *referenceTableIdList = NIL; + uint64 shardId = INVALID_SHARD_ID; + List *newWorkersList = NIL; + int64 *dependsTaskArray = NULL; + const char *referenceTableName = NULL; + int colocationId = GetReferenceTableColocationId(); + + *nDependTasks = 0; + if (colocationId == INVALID_COLOCATION_ID) + { + /* we have no reference table yet. */ + return 0; + } + + /* + * Most of the time this function should result in a conclusion where we do not need + * to copy any reference tables. To prevent excessive locking the majority of the time + * we run our precondition checks first with a lower lock. If, after checking with the + * lower lock, that we might need to copy reference tables we check with a more + * aggressive and self conflicting lock. It is important to be self conflicting in the + * second run to make sure that two concurrent calls to this routine will actually not + * run concurrently after the initial check. + * + * If after two iterations of precondition checks we still find the need for copying + * reference tables we exit the loop with all locks held. This will prevent concurrent + * DROP TABLE and create_reference_table calls so that the list of reference tables we + * operate on are stable. + * + * + * Since the changes to the reference table placements are made via loopback + * connections we release the locks held at the end of this function. Due to Citus + * only running transactions in READ COMMITTED mode we can be sure that other + * transactions correctly find the metadata entries. + */ + LOCKMODE lockmodes[] = { AccessShareLock, ExclusiveLock }; + for (int lockmodeIndex = 0; lockmodeIndex < lengthof(lockmodes); lockmodeIndex++) + { + LockColocationId(colocationId, lockmodes[lockmodeIndex]); + + referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE); + if (referenceTableIdList == NIL) + { + /* + * No reference tables exist, make sure that any locks obtained earlier are + * released. It will probably not matter, but we release the locks in the + * reverse order we obtained them in. + */ + for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return 0; + } + + Oid referenceTableId = linitial_oid(referenceTableIdList); + referenceTableName = get_rel_name(referenceTableId); + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + if (list_length(shardIntervalList) == 0) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", + referenceTableName))); + } + + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + shardId = shardInterval->shardId; + + /* + * We only take an access share lock, otherwise we'll hold up citus_add_node. + * In case of create_reference_table() where we don't want concurrent writes + * to pg_dist_node, we have already acquired ShareLock on pg_dist_node. + */ + newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, AccessShareLock); + if (list_length(newWorkersList) == 0) + { + /* + * All workers already have a copy of the reference tables, make sure that + * any locks obtained earlier are released. It will probably not matter, but + * we release the locks in the reverse order we obtained them in. + */ + for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return 0; + } + } + + /* + * citus_copy_shard_placement triggers metadata sync-up, which tries to + * acquire a ShareLock on pg_dist_node. We do master_copy_shad_placement + * in a separate connection. If we have modified pg_dist_node in the + * current backend, this will cause a deadlock. + */ + if (TransactionModifiedNodeMetadata) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate reference tables in a transaction " + "that modified node metadata"))); + } + + /* + * Modifications to reference tables in current transaction are not visible + * to citus_copy_shard_placement, since it is done in a separate backend. + */ + if (AnyRelationsModifiedInTransaction(referenceTableIdList)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate reference tables in a transaction " + "that modified a reference table"))); + } + + bool missingOk = false; + ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk); + if (sourceShardPlacement == NULL) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table shard " + UINT64_FORMAT + " does not have an active shard placement", + shardId))); + } + + WorkerNode *newWorkerNode = NULL; + BackgroundTask *task = NULL; + StringInfoData buf = { 0 }; + initStringInfo(&buf); + List *depTasksList = NIL; + const char *transferModeString = "block_writes"; /*For now we only support block writes*/ + + foreach_declared_ptr(newWorkerNode, newWorkersList) + { + ereport(DEBUG2, (errmsg("replicating reference table '%s' to %s:%d ...", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort))); + + Oid relationId = InvalidOid; + List *nodeTasksList = NIL; + + foreach_declared_oid(relationId, referenceTableIdList) + { + referenceTableName = get_rel_name(relationId); + List *shardIntervalList = LoadShardIntervalList(relationId); + /* Reference tables are supposed to have only one shard */ + if (list_length(shardIntervalList) != 1) + { + ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", + referenceTableName))); + } + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + shardId = shardInterval->shardId; + + resetStringInfo(&buf); + /* + * In first step just create and load data in the shards but defer the creation + * of the shard relationships to the next step. + * The reason we want to defer the creation of the shard relationships is that + * we want to make sure that all the parallel shard copy task are finished + * before we create the relationships. Otherwise we might end up with + * a situation where the dependent-shard task is still running and trying to + * create the shard relationships will result in ERROR. + */ + uint32 shardTransferFlags = SHARD_TRANSFER_SINGLE_SHARD_ONLY | + SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS; + + appendStringInfo(&buf, + "SELECT pg_catalog.citus_copy_one_shard_placement(%ld,%u,%u,%u,%s)", + shardId, + sourceShardPlacement->nodeId, + newWorkerNode->nodeId, + shardTransferFlags, + quote_literal_cstr(transferModeString)); + + ereport(DEBUG2, + (errmsg("replicating reference table '%s' to %s:%d ... QUERY= %s", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort,buf.data))); + + int nDepends = 0; + int64 *dependsArray = NULL; + int32 nodesInvolved[2] = { 0 }; + nodesInvolved[0] = sourceShardPlacement->nodeId; + nodesInvolved[1] = newWorkerNode->nodeId; + + task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data, + nDepends, + dependsArray, 2, + nodesInvolved); + + nodeTasksList = lappend(nodeTasksList,task); + } + /* Create a task to create reference table relations on this node */ + if (list_length(nodeTasksList) > 0) + { + int nDepends = list_length(nodeTasksList); + int32 nodesInvolved[2] = { 0 }; + nodesInvolved[0] = sourceShardPlacement->nodeId; + nodesInvolved[1] = newWorkerNode->nodeId; + int64 *dependsArray = palloc(sizeof(int64) * list_length(nodeTasksList)); + int idx = 0; + foreach_declared_ptr(task, nodeTasksList) + { + dependsArray[idx++] = task->taskid; + } + resetStringInfo(&buf); + uint32 shardTransferFlags = SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY; + + appendStringInfo(&buf, + "SELECT pg_catalog.citus_copy_one_shard_placement(%ld,%u,%u,%u,%s)", + shardId, + sourceShardPlacement->nodeId, + newWorkerNode->nodeId, + shardTransferFlags, + quote_literal_cstr(transferModeString)); + + ereport(DEBUG2, + (errmsg("creating relations for reference table '%s' on %s:%d ... QUERY= %s", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort,buf.data))); + + task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data, + nDepends, + dependsArray, 2, + nodesInvolved); + + depTasksList = lappend(depTasksList, task); + + pfree(dependsArray); + list_free(nodeTasksList); + nodeTasksList = NIL; + } + } + + /* + * compute a dependent task list array to be used to indicate the completion of all + * reference table shards copy, so that we can start with distributed shard copy + */ + if (list_length(depTasksList) > 0) + { + *nDependTasks = list_length(depTasksList); + dependsTaskArray = palloc(sizeof(int64) * *nDependTasks); + int idx = 0; + foreach_declared_ptr(task, depTasksList) + { + dependsTaskArray[idx++] = task->taskid; + } + list_free(depTasksList); + } + + /* + * Since reference tables have been copied via a loopback connection we do not have to + * retain our locks. Since Citus only runs well in READ COMMITTED mode we can be sure + * that other transactions will find the reference tables copied. + * We have obtained and held multiple locks, here we unlock them all in the reverse + * order we have obtained them in. + */ + for (int releaseLockmodeIndex = lengthof(lockmodes) - 1; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return dependsTaskArray; +} /* * HasNodesWithMissingReferenceTables checks if all reference tables are already copied to diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index cf5a6fd02..c40cf852b 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -21,6 +21,7 @@ extern void EnsureReferenceTablesExistOnAllNodes(void); extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode); +extern int64 *ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char transferMode, int *nDependTasks); extern bool HasNodesWithMissingReferenceTables(List **referenceTableList); extern uint32 CreateReferenceTableColocationId(void); extern uint32 GetReferenceTableColocationId(void); diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 7d67b173d..a34ab7e89 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -57,7 +57,8 @@ typedef enum CitusOperations CITUS_NONBLOCKING_SPLIT = 1, CITUS_CREATE_DISTRIBUTED_TABLE_CONCURRENTLY = 2, CITUS_CREATE_COLOCATION_DEFAULT = 3, - CITUS_BACKGROUND_TASK_MONITOR = 4 + CITUS_BACKGROUND_TASK_MONITOR = 4, + CITUS_REBALANCE_OPERATION = 5 } CitusOperations; /* reuse advisory lock, but with different, unused field 4 (4)*/ diff --git a/src/include/distributed/shard_transfer.h b/src/include/distributed/shard_transfer.h index c1621879b..27e1f1ff9 100644 --- a/src/include/distributed/shard_transfer.h +++ b/src/include/distributed/shard_transfer.h @@ -23,10 +23,51 @@ typedef enum SHARD_TRANSFER_COPY = 2 } ShardTransferType; +/* + * ShardTransferOperationMode is used to pass flags to the shard transfer + * function. The flags are used to control the behavior of the transfer + * function. + */ +typedef enum +{ + /* + * This flag instructs the transfer function to only transfer single shard + * rather than transfer all the colocated shards for the shard interval. + * Using this flag mean we might break the colocated shard + * relationship on the source node. So this is only usefull when setting up + * the new node and we are sure that the node would not be used until we have + * transfered all the shards. + * The reason we need this flag is that we want to be able to transfer + * colocated shards in parallel and for now it is only used for the reference + * table shards. + * Finally if you are using this flag, you should also use consider defering + * the creation of the relationships on the source node until all colocated + * shards are transfered (see: SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS). + */ + SHARD_TRANSFER_SINGLE_SHARD_ONLY = 1 << 0, + /* With this flag the shard transfer function does not create any constrainsts + * or foreign relations defined on the shard, This can be used to defer the + * creation of the relationships until all the shards are transfered. + * This is usefull when we are transfering colocated shards in parallel and + * we want to avoid the creation of the relationships on the source node + * until all the shards are transfered. + */ + SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS = 1 << 1, + + /* This flag is used to indicate that the shard transfer function should + * only create the relationships on the target node and not transfer any data. + * This is can be used to create the relationships that were defered + * during the transfering of shards. + */ + SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY = 1 << 2 +} ShardTransferOperationMode; + + extern void TransferShards(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, - char shardReplicationMode, ShardTransferType transferType); + char shardReplicationMode, ShardTransferType transferType, + uint32 optionFlags); extern uint64 ShardListSizeInBytes(List *colocatedShardList, char *workerNodeName, uint32 workerNodePort); extern void ErrorIfMoveUnsupportedTableType(Oid relationId);