diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 074f1bed0..e04ddd26f 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -222,6 +222,7 @@ typedef struct ShardMoveDependencies { HTAB *colocationDependencies; HTAB *nodeDependencies; + bool parallelTransferColocatedShards; } ShardMoveDependencies; char *VariablesToBePassedToNewConnections = NULL; @@ -270,7 +271,9 @@ static ShardCost GetShardCost(uint64 shardId, void *context); static List * NonColocatedDistRelationIdList(void); static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid); static int64 RebalanceTableShardsBackground(RebalanceOptions *options, Oid - shardReplicationModeOid); + shardReplicationModeOid, + bool ParallelTransferReferenceTables, + bool ParallelTransferColocatedShards); static void AcquireRebalanceColocationLock(Oid relationId, const char *operationName); static void ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, char *noticeOperation); @@ -296,9 +299,12 @@ static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStati static void ErrorOnConcurrentRebalance(RebalanceOptions *); static List * GetSetCommandListForNewConnections(void); static int64 GetColocationId(PlacementUpdateEvent *move); -static ShardMoveDependencies InitializeShardMoveDependencies(); -static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 - colocationId, +static ShardMoveDependencies InitializeShardMoveDependencies(bool + ParallelTransferColocatedShards); +static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, + int64 colocationId, + int64 *refTablesDepTaskIds, + int refTablesDepTaskIdsCount, ShardMoveDependencies shardMoveDependencies, int *nDepends); static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, @@ -1014,6 +1020,12 @@ citus_rebalance_start(PG_FUNCTION_ARGS) PG_ENSURE_ARGNOTNULL(2, "shard_transfer_mode"); Oid shardTransferModeOid = PG_GETARG_OID(2); + PG_ENSURE_ARGNOTNULL(3, "parallel_transfer_reference_tables"); + bool ParallelTransferReferenceTables = PG_GETARG_BOOL(3); + + PG_ENSURE_ARGNOTNULL(4, "parallel_transfer_colocated_shards"); + bool ParallelTransferColocatedShards = PG_GETARG_BOOL(4); + RebalanceOptions options = { .relationIdList = relationIdList, .threshold = strategy->defaultThreshold, @@ -1023,7 +1035,9 @@ citus_rebalance_start(PG_FUNCTION_ARGS) .rebalanceStrategy = strategy, .improvementThreshold = strategy->improvementThreshold, }; - int jobId = RebalanceTableShardsBackground(&options, shardTransferModeOid); + int jobId = RebalanceTableShardsBackground(&options, shardTransferModeOid, + ParallelTransferReferenceTables, + ParallelTransferColocatedShards); if (jobId == 0) { @@ -1988,17 +2002,20 @@ GetColocationId(PlacementUpdateEvent *move) * given colocation group and the other one is for tracking source nodes of all moves. */ static ShardMoveDependencies -InitializeShardMoveDependencies() +InitializeShardMoveDependencies(bool ParallelTransferColocatedShards) { ShardMoveDependencies shardMoveDependencies; shardMoveDependencies.colocationDependencies = CreateSimpleHashWithNameAndSize(int64, ShardMoveDependencyInfo, "colocationDependencyHashMap", 6); + shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int32, ShardMoveSourceNodeHashEntry, "nodeDependencyHashMap", 6); + shardMoveDependencies.parallelTransferColocatedShards = + ParallelTransferColocatedShards; return shardMoveDependencies; } @@ -2009,6 +2026,7 @@ InitializeShardMoveDependencies() */ static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, + int64 *refTablesDepTaskIds, int refTablesDepTaskIdsCount, ShardMoveDependencies shardMoveDependencies, int *nDepends) { HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64, @@ -2016,13 +2034,17 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, bool found; - /* Check if there exists a move in the same colocation group scheduled earlier. */ - ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( - shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, &found); - - if (found) + if (!shardMoveDependencies.parallelTransferColocatedShards) { - hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); + /* Check if there exists a move in the same colocation group scheduled earlier. */ + ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( + shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, & + found); + + if (found) + { + hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); + } } /* @@ -2045,6 +2067,23 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, } } + *nDepends = hash_get_num_entries(dependsList); + if (*nDepends == 0) + { + /* + * shard copy can only start after finishing copy of reference table shards + * so each shard task will have a dependency on the task that indicates the + * copy complete of reference tables + */ + while (refTablesDepTaskIdsCount > 0) + { + int64 refTableTaskId = *refTablesDepTaskIds; + hash_search(dependsList, &refTableTaskId, HASH_ENTER, NULL); + refTablesDepTaskIds++; + refTablesDepTaskIdsCount--; + } + } + *nDepends = hash_get_num_entries(dependsList); int64 *dependsArray = NULL; @@ -2076,9 +2115,13 @@ static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId, ShardMoveDependencies shardMoveDependencies) { - ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( - shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL); - shardMoveDependencyInfo->taskId = taskId; + if (!shardMoveDependencies.parallelTransferColocatedShards) + { + ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( + shardMoveDependencies.colocationDependencies, &colocationId, + HASH_ENTER, NULL); + shardMoveDependencyInfo->taskId = taskId; + } bool found; ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search( @@ -2103,7 +2146,9 @@ UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int * background job+task infrastructure. */ static int64 -RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationModeOid) +RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationModeOid, + bool ParallelTransferReferenceTables, + bool ParallelTransferColocatedShards) { if (list_length(options->relationIdList) == 0) { @@ -2174,7 +2219,8 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo initStringInfo(&buf); List *referenceTableIdList = NIL; - int64 replicateRefTablesTaskId = 0; + int64 *refTablesDepTaskIds = NULL; + int refTablesDepTaskIdsCount = 0; if (HasNodesWithMissingReferenceTables(&referenceTableIdList)) { @@ -2187,22 +2233,41 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo * Reference tables need to be copied to (newly-added) nodes, this needs to be the * first task before we can move any other table. */ - appendStringInfo(&buf, - "SELECT pg_catalog.replicate_reference_tables(%s)", - quote_literal_cstr(shardTranferModeLabel)); + if (ParallelTransferReferenceTables) + { + refTablesDepTaskIds = + ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes( + jobId, shardTransferMode, &refTablesDepTaskIdsCount); + ereport(DEBUG2, + (errmsg("%d dependent copy reference table tasks for job %ld", + refTablesDepTaskIdsCount, jobId), + errdetail("Rebalance scheduled as background job"), + errhint("To monitor progress, run: SELECT * FROM " + "citus_rebalance_status();"))); + } + else + { + /* Move all reference tables as single task. Classical way */ + appendStringInfo(&buf, + "SELECT pg_catalog.replicate_reference_tables(%s)", + quote_literal_cstr(shardTranferModeLabel)); - int32 nodesInvolved[] = { 0 }; + int32 nodesInvolved[] = { 0 }; - /* replicate_reference_tables permissions require superuser */ - Oid superUserId = CitusExtensionOwner(); - BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data, 0, - NULL, 0, nodesInvolved); - replicateRefTablesTaskId = task->taskid; + /* replicate_reference_tables permissions require superuser */ + Oid superUserId = CitusExtensionOwner(); + BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data, 0, + NULL, 0, nodesInvolved); + refTablesDepTaskIds = palloc0(sizeof(int64)); + refTablesDepTaskIds[0] = task->taskid; + refTablesDepTaskIdsCount = 1; + } } PlacementUpdateEvent *move = NULL; - ShardMoveDependencies shardMoveDependencies = InitializeShardMoveDependencies(); + ShardMoveDependencies shardMoveDependencies = + InitializeShardMoveDependencies(ParallelTransferColocatedShards); foreach_declared_ptr(move, placementUpdateList) { @@ -2220,16 +2285,11 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo int nDepends = 0; int64 *dependsArray = GenerateTaskMoveDependencyList(move, colocationId, + refTablesDepTaskIds, + refTablesDepTaskIdsCount, shardMoveDependencies, &nDepends); - if (nDepends == 0 && replicateRefTablesTaskId > 0) - { - nDepends = 1; - dependsArray = palloc(nDepends * sizeof(int64)); - dependsArray[0] = replicateRefTablesTaskId; - } - int32 nodesInvolved[2] = { 0 }; nodesInvolved[0] = move->sourceNode->nodeId; nodesInvolved[1] = move->targetNode->nodeId; diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index b1202e648..8116b0b4e 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1551,7 +1551,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, publicationInfoHash, logicalRepTargetList, groupedLogicalRepTargetsHash, - SHARD_SPLIT); + SHARD_SPLIT, false); /* * 10) Delete old shards metadata and mark the shards as to be deferred drop. diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index b7d07b2cf..404bac911 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -107,16 +107,18 @@ static void ErrorIfSameNode(char *sourceNodeName, int sourceNodePort, static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, bool useLogicalReplication, - const char *operationName); + const char *operationName, uint32 optionFlags); static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort); + int32 targetNodePort, + uint32 optionFlags); static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, - char *targetNodeName, int32 targetNodePort); + char *targetNodeName, int32 targetNodePort, + uint32 optionFlags); static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, int32 targetNodePort); @@ -165,7 +167,8 @@ static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval, static ShardCommandList * CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList); static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode); - +static void AcquireShardPlacementLock(uint64_t shardId, int lockMode, Oid relationId, + const char *operationName); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_copy_shard_placement); @@ -174,7 +177,7 @@ PG_FUNCTION_INFO_V1(master_copy_shard_placement); PG_FUNCTION_INFO_V1(citus_move_shard_placement); PG_FUNCTION_INFO_V1(citus_move_shard_placement_with_nodeid); PG_FUNCTION_INFO_V1(master_move_shard_placement); - +PG_FUNCTION_INFO_V1(citus_internal_copy_single_shard_placement); double DesiredPercentFreeAfterMove = 10; bool CheckAvailableSpaceBeforeMove = true; @@ -203,7 +206,7 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } @@ -232,7 +235,7 @@ citus_copy_shard_placement_with_nodeid(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, targetNode->workerName, targetNode->workerPort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } @@ -267,13 +270,69 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } +/* + * citus_internal_copy_single_shard_placement is an internal function that + * copies a single shard placement from a source node to a target node. + * It has two main differences from citus_copy_shard_placement: + * 1. it copies only a single shard placement, not all colocated shards + * 2. It allows to defer the constraints creation and this same function + * can be used to create the constraints later. + * + * The primary use case for this function is to transfer the shards of + * reference tables. Since all reference tables are colocated together, + * and each reference table has only one shard, this function can be used + * to transfer the shards of reference tables in parallel. + * Furthermore, the reference tables could have relations with + * other reference tables, so we need to ensure that their constraints + * are also transferred after copying the shards to the target node. + * For this reason, we allow the caller to defer the constraints creation. + * + * This function is not supposed to be called by the user directly. + */ +Datum +citus_internal_copy_single_shard_placement(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + int64 shardId = PG_GETARG_INT64(0); + uint32 sourceNodeId = PG_GETARG_INT32(1); + uint32 targetNodeId = PG_GETARG_INT32(2); + uint32 flags = PG_GETARG_INT32(3); + Oid shardReplicationModeOid = PG_GETARG_OID(4); + + bool missingOk = false; + WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk); + WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk); + + char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); + + /* + * This is an internal function that is used by the rebalancer. + * It is not supposed to be called by the user directly. + */ + if (!IsRebalancerInternalBackend()) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("This is an internal Citus function can only be " + "used in by a rebalancer task"))); + } + + TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, + targetNode->workerName, targetNode->workerPort, + shardReplicationMode, SHARD_TRANSFER_COPY, flags); + + PG_RETURN_VOID(); +} + + /* * citus_move_shard_placement moves given shard (and its co-located shards) from one * node to the other node. To accomplish this it entirely recreates the table structure @@ -315,7 +374,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_MOVE); + shardReplicationMode, SHARD_TRANSFER_MOVE, 0); PG_RETURN_VOID(); } @@ -343,12 +402,42 @@ citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS) char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, targetNode->workerName, - targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE); + targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE, 0); PG_RETURN_VOID(); } +/* + * AcquireShardPlacementLock tries to acquire a lock on the shardid + * while moving/copying the shard placement. If this + * is it not possible it fails instantly because this means + * another move/copy on same shard is currently happening. */ +static void +AcquireShardPlacementLock(uint64_t shardId, int lockMode, Oid relationId, + const char *operationName) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = true; + + SET_LOCKTAG_SHARD_MOVE(tag, shardId); + + LockAcquireResult lockAcquired = LockAcquire(&tag, lockMode, sessionLock, dontWait); + if (!lockAcquired) + { + ereport(ERROR, (errmsg("could not acquire the lock required to %s %s", + operationName, + generate_qualified_relation_name(relationId)), + errdetail("It means that either a concurrent shard move " + "or colocated distributed table creation is " + "happening."), + errhint("Make sure that the concurrent operation has " + "finished and re-run the command"))); + } +} + + /* * TransferShards is the function for shard transfers. */ @@ -356,7 +445,7 @@ void TransferShards(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, char shardReplicationMode, - ShardTransferType transferType) + ShardTransferType transferType, uint32 optionFlags) { /* strings to be used in log messages */ const char *operationName = ShardTransferTypeNames[transferType]; @@ -385,20 +474,36 @@ TransferShards(int64 shardId, char *sourceNodeName, ErrorIfTargetNodeIsNotSafeForTransfer(targetNodeName, targetNodePort, transferType); - AcquirePlacementColocationLock(distributedTableId, ExclusiveLock, operationName); + AcquirePlacementColocationLock(distributedTableId, RowExclusiveLock, operationName); - List *colocatedTableList = ColocatedTableList(distributedTableId); - List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + List *colocatedTableList; + List *colocatedShardList; + + /* + * If SHARD_TRANSFER_SINGLE_SHARD_ONLY is set, we only transfer a single shard + * specified by shardId. Otherwise, we transfer all colocated shards. + */ + bool isSingleShardOnly = optionFlags & SHARD_TRANSFER_SINGLE_SHARD_ONLY; + + if (isSingleShardOnly) + { + colocatedTableList = list_make1_oid(distributedTableId); + colocatedShardList = list_make1(shardInterval); + } + else + { + colocatedTableList = ColocatedTableList(distributedTableId); + colocatedShardList = ColocatedShardIntervalList(shardInterval); + } EnsureTableListOwner(colocatedTableList); if (transferType == SHARD_TRANSFER_MOVE) { /* - * Block concurrent DDL / TRUNCATE commands on the relation. Similarly, - * block concurrent citus_move_shard_placement() on any shard of - * the same relation. This is OK for now since we're executing shard - * moves sequentially anyway. + * Block concurrent DDL / TRUNCATE commands on the relation. while, + * allow concurrent citus_move_shard_placement() on the shards of + * the same relation. */ LockColocatedRelationsForMove(colocatedTableList); } @@ -412,14 +517,66 @@ TransferShards(int64 shardId, char *sourceNodeName, /* * We sort shardIntervalList so that lock operations will not cause any - * deadlocks. + * deadlocks. But we do not need to do that if the list contain only one + * shard. */ - colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + if (!isSingleShardOnly) + { + colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + } - if (TransferAlreadyCompleted(colocatedShardList, - sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort, - transferType)) + /* We have pretty much covered the concurrent rebalance operations + * and we want to allow concurrent moves within the same colocation group. + * but at the same time we want to block the concurrent moves on the same shard + * placement. So we lock the shard moves before starting the transfer. + */ + foreach_declared_ptr(shardInterval, colocatedShardList) + { + int64 shardIdToLock = shardInterval->shardId; + AcquireShardPlacementLock(shardIdToLock, ExclusiveLock, distributedTableId, + operationName); + } + + bool transferAlreadyCompleted = TransferAlreadyCompleted(colocatedShardList, + sourceNodeName, + sourceNodePort, + targetNodeName, + targetNodePort, + transferType); + + /* + * If we just need to create the shard relationships,We don't need to do anything + * else other than calling CopyShardTables with SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY + * flag. + */ + bool createRelationshipsOnly = optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY; + + if (createRelationshipsOnly) + { + if (!transferAlreadyCompleted) + { + /* + * if the transfer is not completed, and we are here just to create + * the relationships, we can return right away + */ + ereport(WARNING, (errmsg("shard is not present on node %s:%d", + targetNodeName, targetNodePort), + errdetail("%s may have not completed.", + operationNameCapitalized))); + return; + } + + CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName + , + targetNodePort, (shardReplicationMode == + TRANSFER_MODE_FORCE_LOGICAL), + operationFunctionName, optionFlags); + + /* We don't need to do anything else, just return */ + return; + } + + if (transferAlreadyCompleted) { /* if the transfer is already completed, we can return right away */ ereport(WARNING, (errmsg("shard is already present on node %s:%d", @@ -515,7 +672,8 @@ TransferShards(int64 shardId, char *sourceNodeName, } CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort, useLogicalReplication, operationFunctionName); + targetNodePort, useLogicalReplication, operationFunctionName, + optionFlags); if (transferType == SHARD_TRANSFER_MOVE) { @@ -676,7 +834,7 @@ IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetN /* * LockColocatedRelationsForMove takes a list of relations, locks all of them - * using ShareUpdateExclusiveLock + * using ShareLock */ static void LockColocatedRelationsForMove(List *colocatedTableList) @@ -684,7 +842,7 @@ LockColocatedRelationsForMove(List *colocatedTableList) Oid colocatedTableId = InvalidOid; foreach_declared_oid(colocatedTableId, colocatedTableList) { - LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock); + LockRelationOid(colocatedTableId, RowExclusiveLock); } } @@ -1333,7 +1491,7 @@ ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList) static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, bool useLogicalReplication, - const char *operationName) + const char *operationName, uint32 optionFlags) { if (list_length(shardIntervalList) < 1) { @@ -1343,16 +1501,22 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP /* Start operation to prepare for generating cleanup records */ RegisterOperationNeedingCleanup(); - if (useLogicalReplication) + bool createRelationshipsOnly = optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY; + + /* + * If we're just going to create relationships only always use + * CopyShardTablesViaBlockWrites. + */ + if (useLogicalReplication && !createRelationshipsOnly) { CopyShardTablesViaLogicalReplication(shardIntervalList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort); + targetNodePort, optionFlags); } else { CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort); + targetNodeName, targetNodePort, optionFlags); } /* @@ -1369,7 +1533,7 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort) + int32 targetNodePort, uint32 optionFlags) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaLogicalReplication", @@ -1407,9 +1571,13 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa MemoryContextSwitchTo(oldContext); + bool skipRelationshipCreation = (optionFlags & + SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS); + /* data copy is done seperately when logical replication is used */ LogicallyReplicateShards(shardIntervalList, sourceNodeName, - sourceNodePort, targetNodeName, targetNodePort); + sourceNodePort, targetNodeName, targetNodePort, + skipRelationshipCreation); } @@ -1437,7 +1605,7 @@ CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList) static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort) + int32 targetNodePort, uint32 optionFlags) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaBlockWrites", @@ -1446,127 +1614,150 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); - - /* iterate through the colocated shards and copy each */ ShardInterval *shardInterval = NULL; - foreach_declared_ptr(shardInterval, shardIntervalList) - { - /* - * For each shard we first create the shard table in a separate - * transaction and then we copy the data and create the indexes in a - * second separate transaction. The reason we don't do both in a single - * transaction is so we can see the size of the new shard growing - * during the copy when we run get_rebalance_progress in another - * session. If we wouldn't split these two phases up, then the table - * wouldn't be visible in the session that get_rebalance_progress uses. - * So get_rebalance_progress would always report its size as 0. - */ - List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, - sourceNodePort); - char *tableOwner = TableOwner(shardInterval->relationId); - /* drop the shard we created on the target, in case of failure */ - InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, - ConstructQualifiedShardName(shardInterval), - GroupForNode(targetNodeName, - targetNodePort), - CLEANUP_ON_FAILURE); - - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); - } - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_COPYING_DATA); - - ConflictWithIsolationTestingBeforeCopy(); - CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); - ConflictWithIsolationTestingAfterCopy(); - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS); - - foreach_declared_ptr(shardInterval, shardIntervalList) - { - List *ddlCommandList = - PostLoadShardCreationCommandList(shardInterval, sourceNodeName, - sourceNodePort); - char *tableOwner = TableOwner(shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); - - MemoryContextReset(localContext); - } + bool createRelationshipsOnly = optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY; /* - * Once all shards are copied, we can recreate relationships between shards. - * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. + * If we’re only asked to create the relationships, the shards are already + * present and populated on the node. Skip the table‑setup and data‑loading + * steps and proceed straight to creating the relationships. */ - List *shardIntervalWithDDCommandsList = NIL; - foreach_declared_ptr(shardInterval, shardIntervalList) + if (!createRelationshipsOnly) { - if (PartitionTable(shardInterval->relationId)) + /* iterate through the colocated shards and copy each */ + foreach_declared_ptr(shardInterval, shardIntervalList) { - char *attachPartitionCommand = - GenerateAttachShardPartitionCommand(shardInterval); + /* + * For each shard we first create the shard table in a separate + * transaction and then we copy the data and create the indexes in a + * second separate transaction. The reason we don't do both in a single + * transaction is so we can see the size of the new shard growing + * during the copy when we run get_rebalance_progress in another + * session. If we wouldn't split these two phases up, then the table + * wouldn't be visible in the session that get_rebalance_progress uses. + * So get_rebalance_progress would always report its size as 0. + */ + List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, + sourceNodeName, + sourceNodePort); + char *tableOwner = TableOwner(shardInterval->relationId); - ShardCommandList *shardCommandList = CreateShardCommandList( - shardInterval, - list_make1(attachPartitionCommand)); - shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, - shardCommandList); + /* drop the shard we created on the target, in case of failure */ + InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, + ConstructQualifiedShardName( + shardInterval), + GroupForNode(targetNodeName, + targetNodePort), + CLEANUP_ON_FAILURE); + + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + } + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COPYING_DATA); + + ConflictWithIsolationTestingBeforeCopy(); + CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); + ConflictWithIsolationTestingAfterCopy(); + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS); + + foreach_declared_ptr(shardInterval, shardIntervalList) + { + List *ddlCommandList = + PostLoadShardCreationCommandList(shardInterval, sourceNodeName, + sourceNodePort); + char *tableOwner = TableOwner(shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + + MemoryContextReset(localContext); } } - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); - /* - * Iterate through the colocated shards and create DDL commamnds - * to create the foreign constraints. + * Skip creating shard relationships if the caller has requested that they + * not be created. */ - foreach_declared_ptr(shardInterval, shardIntervalList) + bool skipRelationshipCreation = (optionFlags & + SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS); + + if (!skipRelationshipCreation) { - List *shardForeignConstraintCommandList = NIL; - List *referenceTableForeignConstraintList = NIL; + /* + * Once all shards are copied, we can recreate relationships between shards. + * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. + */ + List *shardIntervalWithDDCommandsList = NIL; + foreach_declared_ptr(shardInterval, shardIntervalList) + { + if (PartitionTable(shardInterval->relationId)) + { + char *attachPartitionCommand = + GenerateAttachShardPartitionCommand(shardInterval); - CopyShardForeignConstraintCommandListGrouped(shardInterval, - &shardForeignConstraintCommandList, - &referenceTableForeignConstraintList); + ShardCommandList *shardCommandList = CreateShardCommandList( + shardInterval, + list_make1(attachPartitionCommand)); + shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, + shardCommandList); + } + } - ShardCommandList *shardCommandList = CreateShardCommandList( - shardInterval, - list_concat(shardForeignConstraintCommandList, - referenceTableForeignConstraintList)); - shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, - shardCommandList); + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); + + /* + * Iterate through the colocated shards and create DDL commamnds + * to create the foreign constraints. + */ + foreach_declared_ptr(shardInterval, shardIntervalList) + { + List *shardForeignConstraintCommandList = NIL; + List *referenceTableForeignConstraintList = NIL; + + CopyShardForeignConstraintCommandListGrouped(shardInterval, + & + shardForeignConstraintCommandList, + & + referenceTableForeignConstraintList); + + ShardCommandList *shardCommandList = CreateShardCommandList( + shardInterval, + list_concat(shardForeignConstraintCommandList, + referenceTableForeignConstraintList)); + shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, + shardCommandList); + } + + /* Now execute the Partitioning & Foreign constraints creation commads. */ + ShardCommandList *shardCommandList = NULL; + foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList) + { + char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, + shardCommandList->ddlCommandList); + } + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COMPLETING); } - - /* Now execute the Partitioning & Foreign constraints creation commads. */ - ShardCommandList *shardCommandList = NULL; - foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList) - { - char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, - shardCommandList->ddlCommandList); - } - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_COMPLETING); - MemoryContextReset(localContext); MemoryContextSwitchTo(oldContext); } @@ -1647,7 +1838,8 @@ CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardInte ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList, MaxAdaptiveExecutorPoolSize, - NULL /* jobIdList (ignored by API implementation) */); + NULL /* jobIdList (ignored by API implementation) */ + ); } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 7189216d0..4c43d3513 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -118,7 +118,8 @@ static List * GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shard static List * GetIndexCommandListForShardBackingReplicaIdentity(Oid relationId, uint64 shardId); static void CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList, - LogicalRepType type); + LogicalRepType type, + bool skipInterShardRelationships); static void ExecuteCreateIndexCommands(List *logicalRepTargetList); static void ExecuteCreateConstraintsBackedByIndexCommands(List *logicalRepTargetList); static List * ConvertNonExistingPlacementDDLCommandsToTasks(List *shardCommandList, @@ -132,7 +133,6 @@ static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); static void WaitForMiliseconds(long timeout); static XLogRecPtr GetSubscriptionPosition( GroupedLogicalRepTargets *groupedLogicalRepTargets); -static void AcquireLogicalReplicationLock(void); static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode, List *shardIntervals); @@ -154,9 +154,9 @@ static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition, */ void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, - char *targetNodeName, int targetNodePort) + char *targetNodeName, int targetNodePort, + bool skipInterShardRelationshipCreation) { - AcquireLogicalReplicationLock(); char *superUser = CitusExtensionOwnerName(); char *databaseName = get_database_name(MyDatabaseId); int connectionFlags = FORCE_NEW_CONNECTION; @@ -258,7 +258,8 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo publicationInfoHash, logicalRepTargetList, groupedLogicalRepTargetsHash, - SHARD_MOVE); + SHARD_MOVE, + skipInterShardRelationshipCreation); /* * We use these connections exclusively for subscription management, @@ -317,7 +318,8 @@ CompleteNonBlockingShardTransfer(List *shardList, HTAB *publicationInfoHash, List *logicalRepTargetList, HTAB *groupedLogicalRepTargetsHash, - LogicalRepType type) + LogicalRepType type, + bool skipInterShardRelationshipCreation) { /* Start applying the changes from the replication slots to catch up. */ EnableSubscriptions(logicalRepTargetList); @@ -345,7 +347,8 @@ CompleteNonBlockingShardTransfer(List *shardList, * and partitioning hierarchy. Once they are done, wait until the replication * catches up again. So we don't block writes too long. */ - CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList, type); + CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList, type, + skipInterShardRelationshipCreation); UpdatePlacementUpdateStatusForShardIntervalList( shardList, @@ -372,7 +375,7 @@ CompleteNonBlockingShardTransfer(List *shardList, WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); - if (type != SHARD_SPLIT) + if (type != SHARD_SPLIT && !skipInterShardRelationshipCreation) { UpdatePlacementUpdateStatusForShardIntervalList( shardList, @@ -497,25 +500,6 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList) } -/* - * AcquireLogicalReplicationLock tries to acquire a lock for logical - * replication. We need this lock, because at the start of logical replication - * we clean up old subscriptions and publications. Because of this cleanup it's - * not safe to run multiple logical replication based shard moves at the same - * time. If multiple logical replication moves would run at the same time, the - * second move might clean up subscriptions and publications that are in use by - * another move. - */ -static void -AcquireLogicalReplicationLock(void) -{ - LOCKTAG tag; - SET_LOCKTAG_LOGICAL_REPLICATION(tag); - - LockAcquire(&tag, ExclusiveLock, false, false); -} - - /* * PrepareReplicationSubscriptionList returns list of shards to be logically * replicated from given shard list. This is needed because Postgres does not @@ -675,7 +659,8 @@ GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shardId) */ static void CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList, - LogicalRepType type) + LogicalRepType type, + bool skipInterShardRelationships) { /* * We create indexes in 4 steps. @@ -705,7 +690,7 @@ CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList, /* * Creating the partitioning hierarchy errors out in shard splits when */ - if (type != SHARD_SPLIT) + if (type != SHARD_SPLIT && !skipInterShardRelationships) { /* create partitioning hierarchy, if any */ CreatePartitioningHierarchy(logicalRepTargetList); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 165aea05f..e6fd1c308 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1952,7 +1952,7 @@ RegisterCitusConfigVariables(void) "because total background worker count is shared by all background workers. The value " "represents the possible maximum number of task executors."), &MaxBackgroundTaskExecutors, - 4, 1, MAX_BG_TASK_EXECUTORS, + 1, 1, MAX_BG_TASK_EXECUTORS, PGC_SIGHUP, GUC_STANDARD, NULL, NULL, NULL); @@ -1964,7 +1964,7 @@ RegisterCitusConfigVariables(void) "for scheduled background tasks that involve a particular node"), NULL, &MaxBackgroundTaskExecutorsPerNode, - 1, 1, 128, + 4, 1, 128, PGC_SIGHUP, GUC_STANDARD, NULL, NULL, NULL); diff --git a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql index 2f507eb24..be4e0e62f 100644 --- a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql +++ b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql @@ -1,3 +1,8 @@ -- citus--13.1-1--13.2-1 + -- bump version to 13.2-1 #include "udfs/worker_last_saved_explain_analyze/13.2-1.sql" + +DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode); +#include "udfs/citus_rebalance_start/13.2-1.sql" +#include "udfs/citus_internal_copy_single_shard_placement/13.2-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql index de26b790a..cfdf1eb60 100644 --- a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql @@ -1,4 +1,7 @@ -- citus--13.2-1--13.1-1 +DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean); +DROP FUNCTION IF EXISTS citus_internal.citus_internal_copy_single_shard_placement(bigint, integer, integer, integer, citus.shard_transfer_mode); +#include "udfs/citus_rebalance_start/11.1-1.sql" -- downgrade version to 13.1-1 DROP FUNCTION IF EXISTS pg_catalog.worker_last_saved_explain_analyze(); diff --git a/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/13.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/13.2-1.sql new file mode 100644 index 000000000..bbb2fb63b --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/13.2-1.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION citus_internal.citus_internal_copy_single_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + flags integer, + transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_internal_copy_single_shard_placement$$; diff --git a/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/latest.sql new file mode 100644 index 000000000..bbb2fb63b --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/latest.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION citus_internal.citus_internal_copy_single_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + flags integer, + transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_internal_copy_single_shard_placement$$; diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql b/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql new file mode 100644 index 000000000..658e78dd6 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql @@ -0,0 +1,13 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start( + rebalance_strategy name DEFAULT NULL, + drain_only boolean DEFAULT false, + shard_transfer_mode citus.shard_transfer_mode default 'auto', + parallel_transfer_reference_tables boolean DEFAULT false, + parallel_transfer_colocated_shards boolean DEFAULT false + ) + RETURNS bigint + AS 'MODULE_PATHNAME' + LANGUAGE C VOLATILE; +COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean) + IS 'rebalance the shards in the cluster in the background'; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean) TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql b/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql index cc84d3142..658e78dd6 100644 --- a/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql @@ -1,11 +1,13 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start( rebalance_strategy name DEFAULT NULL, drain_only boolean DEFAULT false, - shard_transfer_mode citus.shard_transfer_mode default 'auto' + shard_transfer_mode citus.shard_transfer_mode default 'auto', + parallel_transfer_reference_tables boolean DEFAULT false, + parallel_transfer_colocated_shards boolean DEFAULT false ) RETURNS bigint AS 'MODULE_PATHNAME' LANGUAGE C VOLATILE; -COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode) +COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean) IS 'rebalance the shards in the cluster in the background'; -GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode) TO PUBLIC; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean) TO PUBLIC; diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 2d0f03a4c..ba3bde84f 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -125,7 +125,7 @@ static volatile sig_atomic_t GotSighup = false; /* keeping track of parallel background tasks per node */ HTAB *ParallelTasksPerNode = NULL; -int MaxBackgroundTaskExecutorsPerNode = 1; +int MaxBackgroundTaskExecutorsPerNode = 4; PG_FUNCTION_INFO_V1(citus_job_cancel); PG_FUNCTION_INFO_V1(citus_job_wait); diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index e6bf3d00c..234acc53d 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -98,7 +98,7 @@ double DistributedDeadlockDetectionTimeoutFactor = 2.0; int Recover2PCInterval = 60000; int DeferShardDeleteInterval = 15000; int BackgroundTaskQueueCheckInterval = 5000; -int MaxBackgroundTaskExecutors = 4; +int MaxBackgroundTaskExecutors = 1; char *MainDb = ""; /* config variables for metadata sync timeout */ diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 8f0d89fc9..5a98c9cdd 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -41,16 +41,24 @@ #include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" +#include "distributed/shard_transfer.h" +#include "distributed/hash_helpers.h" + /* local function forward declarations */ static List * WorkersWithoutReferenceTablePlacement(uint64 shardId, LOCKMODE lockMode); -static StringInfo CopyShardPlacementToWorkerNodeQuery( - ShardPlacement *sourceShardPlacement, - WorkerNode *workerNode, - char transferMode); +static StringInfo CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement + , + WorkerNode *workerNode, + char transferMode); static bool AnyRelationsModifiedInTransaction(List *relationIdList); static List * ReplicatedMetadataSyncedDistributedTableList(void); static bool NodeHasAllReferenceTableReplicas(WorkerNode *workerNode); +typedef struct ShardTaskEntry +{ + uint64 shardId; + int64 taskId; +} ShardTaskEntry; /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(upgrade_to_reference_table); @@ -131,6 +139,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) * DROP TABLE and create_reference_table calls so that the list of reference tables we * operate on are stable. * + * * Since the changes to the reference table placements are made via loopback * connections we release the locks held at the end of this function. Due to Citus * only running transactions in READ COMMITTED mode we can be sure that other @@ -179,7 +188,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) if (list_length(newWorkersList) == 0) { /* - * All workers alreaddy have a copy of the reference tables, make sure that + * All workers already have a copy of the reference tables, make sure that * any locks obtained earlier are released. It will probably not matter, but * we release the locks in the reverse order we obtained them in. */ @@ -230,7 +239,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) WorkerNode *newWorkerNode = NULL; foreach_declared_ptr(newWorkerNode, newWorkersList) { - ereport(NOTICE, (errmsg("replicating reference table '%s' to %s:%d ...", + ereport(DEBUG2, (errmsg("replicating reference table '%s' to %s:%d ...", referenceTableName, newWorkerNode->workerName, newWorkerNode->workerPort))); @@ -294,6 +303,381 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) } +/* + * ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes is essentially a + * twin of EnsureReferenceTablesExistOnAllNodesExtended. The difference is instead of + * copying the missing tables on to the worker nodes this function creates the background tasks + * for each required copy operation and schedule it in the background job. + * Another difference is that instead of moving all the colocated shards sequencially + * this function creates a seperate background task for each shard, even when the shards + * are part of same colocated shard group. + * + * For transfering the shards in parallel the function creates a task for each shard + * move and than schedules another task that creates the shard relationships (if any) + * between shards and that task wait for the completion of all shard transfer tasks. + * + * The function returns an array of task ids that are created for creating the shard + * relationships, effectively completion of these tasks signals the completion of + * of reference table setup on the worker nodes. Any process that needs to wait for + * the completion of the reference table setup can wait for these tasks to complete. + * + * The transferMode is passed to this function gets ignored for now and it only uses + * block write mode. + */ +int64 * +ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char transferMode + , + int *nDependTasks) +{ + List *referenceTableIdList = NIL; + uint64 shardId = INVALID_SHARD_ID; + List *newWorkersList = NIL; + int64 *dependsTaskArray = NULL; + const char *referenceTableName = NULL; + int colocationId = GetReferenceTableColocationId(); + + *nDependTasks = 0; + if (colocationId == INVALID_COLOCATION_ID) + { + /* we have no reference table yet. */ + return 0; + } + + /* + * Most of the time this function should result in a conclusion where we do not need + * to copy any reference tables. To prevent excessive locking the majority of the time + * we run our precondition checks first with a lower lock. If, after checking with the + * lower lock, that we might need to copy reference tables we check with a more + * aggressive and self conflicting lock. It is important to be self conflicting in the + * second run to make sure that two concurrent calls to this routine will actually not + * run concurrently after the initial check. + * + * If after two iterations of precondition checks we still find the need for copying + * reference tables we exit the loop with all locks held. This will prevent concurrent + * DROP TABLE and create_reference_table calls so that the list of reference tables we + * operate on are stable. + * + * + * Since the changes to the reference table placements are made via loopback + * connections we release the locks held at the end of this function. Due to Citus + * only running transactions in READ COMMITTED mode we can be sure that other + * transactions correctly find the metadata entries. + */ + LOCKMODE lockmodes[] = { AccessShareLock, ExclusiveLock }; + for (int lockmodeIndex = 0; lockmodeIndex < lengthof(lockmodes); lockmodeIndex++) + { + LockColocationId(colocationId, lockmodes[lockmodeIndex]); + + referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE); + if (referenceTableIdList == NIL) + { + /* + * No reference tables exist, make sure that any locks obtained earlier are + * released. It will probably not matter, but we release the locks in the + * reverse order we obtained them in. + */ + for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return 0; + } + + Oid referenceTableId = linitial_oid(referenceTableIdList); + referenceTableName = get_rel_name(referenceTableId); + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + if (list_length(shardIntervalList) == 0) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", + referenceTableName))); + } + + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + shardId = shardInterval->shardId; + + /* + * We only take an access share lock, otherwise we'll hold up citus_add_node. + * In case of create_reference_table() where we don't want concurrent writes + * to pg_dist_node, we have already acquired ShareLock on pg_dist_node. + */ + newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, AccessShareLock); + if (list_length(newWorkersList) == 0) + { + /* + * All workers already have a copy of the reference tables, make sure that + * any locks obtained earlier are released. It will probably not matter, but + * we release the locks in the reverse order we obtained them in. + */ + for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return 0; + } + } + + /* + * citus_copy_shard_placement triggers metadata sync-up, which tries to + * acquire a ShareLock on pg_dist_node. We do master_copy_shad_placement + * in a separate connection. If we have modified pg_dist_node in the + * current backend, this will cause a deadlock. + */ + if (TransactionModifiedNodeMetadata) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate reference tables in a transaction " + "that modified node metadata"))); + } + + /* + * Modifications to reference tables in current transaction are not visible + * to citus_copy_shard_placement, since it is done in a separate backend. + */ + if (AnyRelationsModifiedInTransaction(referenceTableIdList)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate reference tables in a transaction " + "that modified a reference table"))); + } + + bool missingOk = false; + ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk); + if (sourceShardPlacement == NULL) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table shard " + UINT64_FORMAT + " does not have an active shard placement", + shardId))); + } + + WorkerNode *newWorkerNode = NULL; + BackgroundTask *task = NULL; + StringInfoData buf = { 0 }; + initStringInfo(&buf); + List *depTasksList = NIL; + const char *transferModeString = + transferMode == TRANSFER_MODE_BLOCK_WRITES ? "block_writes" : + transferMode == TRANSFER_MODE_FORCE_LOGICAL ? "force_logical" : + "auto"; + + + foreach_declared_ptr(newWorkerNode, newWorkersList) + { + ereport(DEBUG2, (errmsg("replicating reference table '%s' to %s:%d ...", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort))); + + Oid relationId = InvalidOid; + List *nodeTasksList = NIL; + HTAB *shardTaskMap = CreateSimpleHashWithNameAndSize(uint64, + ShardTaskEntry, + "Shard_Task_Map", + list_length( + referenceTableIdList)); + + foreach_declared_oid(relationId, referenceTableIdList) + { + referenceTableName = get_rel_name(relationId); + List *shardIntervalList = LoadShardIntervalList(relationId); + if (list_length(shardIntervalList) != 1) + { + ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", + referenceTableName))); + } + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + shardId = shardInterval->shardId; + + resetStringInfo(&buf); + uint32 shardTransferFlags = SHARD_TRANSFER_SINGLE_SHARD_ONLY | + SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS; + + /* Temporary hack until we get background task config support PR */ + appendStringInfo(&buf, + "SET LOCAL application_name TO '%s%ld';", + CITUS_REBALANCER_APPLICATION_NAME_PREFIX, + GetGlobalPID()); + + /* + * In first step just create and load data in the shards but defer the creation + * of the shard relationships to the next step. + * The reason we want to defer the creation of the shard relationships is that + * we want to make sure that all the parallel shard copy task are finished + * before we create the relationships. Otherwise we might end up with + * a situation where the dependent-shard task is still running and trying to + * create the shard relationships will result in ERROR. + */ + appendStringInfo(&buf, + "SELECT citus_internal.citus_internal_copy_single_shard_placement(%ld,%u,%u,%u,%s)", + shardId, + sourceShardPlacement->nodeId, + newWorkerNode->nodeId, + shardTransferFlags, + quote_literal_cstr(transferModeString)); + + ereport(DEBUG2, + (errmsg("replicating reference table '%s' to %s:%d ... QUERY= %s", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort, buf.data))); + + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); + List *relatedRelations = list_concat(cacheEntry-> + referencedRelationsViaForeignKey, + cacheEntry-> + referencingRelationsViaForeignKey); + List *dependencyTaskList = NIL; + + Oid relatedRelationId = InvalidOid; + foreach_declared_oid(relatedRelationId, relatedRelations) + { + if (!list_member_oid(referenceTableIdList, relatedRelationId)) + { + continue; + } + + List *relatedShardIntervalList = LoadShardIntervalList( + relatedRelationId); + ShardInterval *relatedShardInterval = (ShardInterval *) linitial( + relatedShardIntervalList); + uint64 relatedShardId = relatedShardInterval->shardId; + bool taskFound = false; + ShardTaskEntry *taskEntry = hash_search(shardTaskMap, &relatedShardId, + HASH_FIND, &taskFound); + if (taskFound) + { + dependencyTaskList = lappend(dependencyTaskList, taskEntry); + } + } + + int nDepends = list_length(dependencyTaskList); + int64 *dependsArray = NULL; + if (nDepends > 0) + { + dependsArray = (int64 *) palloc(sizeof(int64) * nDepends); + int i = 0; + ListCell *lc; + foreach(lc, dependencyTaskList) + { + dependsArray[i++] = ((ShardTaskEntry *) lfirst(lc))->taskId; + } + } + + int32 nodesInvolved[2] = { 0 }; + nodesInvolved[0] = sourceShardPlacement->nodeId; + nodesInvolved[1] = newWorkerNode->nodeId; + + task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data, + nDepends, + dependsArray, 2, + nodesInvolved); + + bool found = false; + ShardTaskEntry *taskEntry = hash_search(shardTaskMap, &shardId, HASH_ENTER, + &found); + if (!found) + { + taskEntry->taskId = task->taskid; + ereport(DEBUG2, + (errmsg( + "Added hash entry in scheduled task hash with task %ld for shard %ld", + task->taskid, shardId))); + } + else + { + ereport(ERROR, (errmsg("failed to record task dependency for shard %ld", + shardId))); + } + + nodeTasksList = lappend(nodeTasksList, task); + if (dependsArray) + { + pfree(dependsArray); + } + list_free(dependencyTaskList); + } + + if (list_length(nodeTasksList) > 0) + { + int nDepends = list_length(nodeTasksList); + int32 nodesInvolved[2] = { 0 }; + nodesInvolved[0] = sourceShardPlacement->nodeId; + nodesInvolved[1] = newWorkerNode->nodeId; + int64 *dependsArray = palloc(sizeof(int64) * list_length(nodeTasksList)); + int idx = 0; + foreach_declared_ptr(task, nodeTasksList) + { + dependsArray[idx++] = task->taskid; + } + resetStringInfo(&buf); + uint32 shardTransferFlags = SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY; + appendStringInfo(&buf, + "SET LOCAL application_name TO '%s%ld';\n", + CITUS_REBALANCER_APPLICATION_NAME_PREFIX, + GetGlobalPID()); + appendStringInfo(&buf, + "SELECT citus_internal.citus_internal_copy_single_shard_placement(%ld,%u,%u,%u,%s)", + shardId, + sourceShardPlacement->nodeId, + newWorkerNode->nodeId, + shardTransferFlags, + quote_literal_cstr(transferModeString)); + + ereport(DEBUG2, + (errmsg( + "creating relations for reference table '%s' on %s:%d ... QUERY= %s", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort, buf.data))); + + task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data, + nDepends, + dependsArray, 2, + nodesInvolved); + + depTasksList = lappend(depTasksList, task); + + pfree(dependsArray); + list_free(nodeTasksList); + nodeTasksList = NIL; + } + + hash_destroy(shardTaskMap); + } + + /* + * compute a dependent task list array to be used to indicate the completion of all + * reference table shards copy, so that we can start with distributed shard copy + */ + if (list_length(depTasksList) > 0) + { + *nDependTasks = list_length(depTasksList); + dependsTaskArray = palloc(sizeof(int64) * *nDependTasks); + int idx = 0; + foreach_declared_ptr(task, depTasksList) + { + dependsTaskArray[idx++] = task->taskid; + } + list_free(depTasksList); + } + + /* + * Since reference tables have been copied via a loopback connection we do not have to + * retain our locks. Since Citus only runs well in READ COMMITTED mode we can be sure + * that other transactions will find the reference tables copied. + * We have obtained and held multiple locks, here we unlock them all in the reverse + * order we have obtained them in. + */ + for (int releaseLockmodeIndex = lengthof(lockmodes) - 1; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return dependsTaskArray; +} + + /* * HasNodesWithMissingReferenceTables checks if all reference tables are already copied to * all nodes. When a node doesn't have a copy of the reference tables we call them missing @@ -424,7 +808,7 @@ CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement, appendStringInfo(queryString, "SELECT pg_catalog.citus_copy_shard_placement(" UINT64_FORMAT ", %d, %d, " - "transfer_mode := %s)", + "transfer_mode := %s)", sourceShardPlacement->shardId, sourceShardPlacement->nodeId, workerNode->nodeId, @@ -614,8 +998,9 @@ DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly) * connections. */ void -DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext( - MetadataSyncContext *context, int32 groupId, bool localOnly) +DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext(MetadataSyncContext * + context, int32 groupId, + bool localOnly) { List *replicatedPlacementListForGroup = ReplicatedPlacementsForNodeGroup(groupId); diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 2a57c0224..7dcdd5bd3 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -131,7 +131,8 @@ typedef enum LogicalRepType extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, - int targetNodePort); + int targetNodePort, + bool skipInterShardRelationshipCreation); extern void ConflictWithIsolationTestingBeforeCopy(void); extern void ConflictWithIsolationTestingAfterCopy(void); @@ -177,7 +178,8 @@ extern void CompleteNonBlockingShardTransfer(List *shardList, HTAB *publicationInfoHash, List *logicalRepTargetList, HTAB *groupedLogicalRepTargetsHash, - LogicalRepType type); + LogicalRepType type, + bool skipInterShardRelationshipCreation); extern void CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList); extern void CreatePartitioningHierarchy(List *logicalRepTargetList); diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index cf5a6fd02..9fd44b71b 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -21,6 +21,11 @@ extern void EnsureReferenceTablesExistOnAllNodes(void); extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode); +extern int64 * ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, + char + transferMode, + int * + nDependTasks); extern bool HasNodesWithMissingReferenceTables(List **referenceTableList); extern uint32 CreateReferenceTableColocationId(void); extern uint32 GetReferenceTableColocationId(void); diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index dfadc0263..0696ef6e8 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -44,10 +44,11 @@ typedef enum AdvisoryLocktagClass ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8, ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9, ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID = 10, - ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, + ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, /* Not used anymore */ ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13, ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK = 14, - ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION = 15 + ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION = 15, + ADV_LOCKTAG_CLASS_CITUS_SHARD_MOVE = 16 } AdvisoryLocktagClass; /* CitusOperations has constants for citus operations */ @@ -84,6 +85,16 @@ typedef enum CitusOperations (uint32) (shardid), \ ADV_LOCKTAG_CLASS_CITUS_SHARD) +/* advisory lock for citus shard move/copy operations, + * also it has the database hardcoded to MyDatabaseId, + * to ensure the locks are local to each database */ +#define SET_LOCKTAG_SHARD_MOVE(tag, shardid) \ + SET_LOCKTAG_ADVISORY(tag, \ + MyDatabaseId, \ + (uint32) ((shardid) >> 32), \ + (uint32) (shardid), \ + ADV_LOCKTAG_CLASS_CITUS_SHARD_MOVE) + /* reuse advisory lock, but with different, unused field 4 (7) * Also it has the database hardcoded to MyDatabaseId, to ensure the locks * are local to each database */ @@ -124,16 +135,6 @@ typedef enum CitusOperations (uint32) operationId, \ ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID) -/* reuse advisory lock, but with different, unused field 4 (12) - * Also it has the database hardcoded to MyDatabaseId, to ensure the locks - * are local to each database */ -#define SET_LOCKTAG_LOGICAL_REPLICATION(tag) \ - SET_LOCKTAG_ADVISORY(tag, \ - MyDatabaseId, \ - (uint32) 0, \ - (uint32) 0, \ - ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION) - /* reuse advisory lock, but with different, unused field 4 (14) * Also it has the database hardcoded to MyDatabaseId, to ensure the locks * are local to each database */ diff --git a/src/include/distributed/shard_transfer.h b/src/include/distributed/shard_transfer.h index c1621879b..ae33c4ec3 100644 --- a/src/include/distributed/shard_transfer.h +++ b/src/include/distributed/shard_transfer.h @@ -23,10 +23,52 @@ typedef enum SHARD_TRANSFER_COPY = 2 } ShardTransferType; +/* + * ShardTransferOperationMode is used to pass flags to the shard transfer + * function. The flags are used to control the behavior of the transfer + * function. + */ +typedef enum +{ + /* + * This flag instructs the transfer function to only transfer single shard + * rather than transfer all the colocated shards for the shard interval. + * Using this flag mean we might break the colocated shard + * relationship on the source node. So this is only usefull when setting up + * the new node and we are sure that the node would not be used until we have + * transfered all the shards. + * The reason we need this flag is that we want to be able to transfer + * colocated shards in parallel and for now it is only used for the reference + * table shards. + * Finally if you are using this flag, you should also use consider defering + * the creation of the relationships on the source node until all colocated + * shards are transfered (see: SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS). + */ + SHARD_TRANSFER_SINGLE_SHARD_ONLY = 1 << 0, + + /* With this flag the shard transfer function does not create any constrainsts + * or foreign relations defined on the shard, This can be used to defer the + * creation of the relationships until all the shards are transfered. + * This is usefull when we are transfering colocated shards in parallel and + * we want to avoid the creation of the relationships on the source node + * until all the shards are transfered. + */ + SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS = 1 << 1, + + /* This flag is used to indicate that the shard transfer function should + * only create the relationships on the target node and not transfer any data. + * This is can be used to create the relationships that were defered + * during the transfering of shards. + */ + SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY = 1 << 2 +} ShardTransferOperationMode; + + extern void TransferShards(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, - char shardReplicationMode, ShardTransferType transferType); + char shardReplicationMode, ShardTransferType transferType, + uint32 optionFlags); extern uint64 ShardListSizeInBytes(List *colocatedShardList, char *workerNodeName, uint32 workerNodePort); extern void ErrorIfMoveUnsupportedTableType(Oid relationId);