From b86fde352038cea4b1853c7adc94a946066a2d51 Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Tue, 6 May 2025 12:31:53 +0500 Subject: [PATCH] Parallelize Shard Rebalancing & Enable Concurrent Logical Shard Movement MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Citus’ shard rebalancer has some key performance bottlenecks: - Sequential Movement of Reference Tables: Reference tables are often assumed to be small, but in real-world deployments, they can grow significantly large. Previously, reference table shards were transferred as a single unit, making the process monolithic and time-consuming. - No Parallelism Within a Colocation Group: Although Citus distributes data using colocated shards, shard movements within the same colocation group were serialized. In environments with hundreds of distributed tables colocated together, this serialization significantly slowed down rebalance operations. - Excessive Locking: Rebalancer used restrictive locks and redundant logical replication guards, further limiting concurrency. The goal of this commit is to eliminate these inefficiencies and enable maximum parallelism during rebalance, without compromising correctness or compatibility. Parallelize shard rebalancing to reduce rebalance time Feature Summary: 1. Parallel Reference Table Rebalancing Each reference-table shard is now copied in its own background task. Foreign key and other constraints are deferred until all shards are copied. For single shard movement without considering colocation a new internal-only UDF 'citus_internal_copy_single_shard_placement' is introduced to allow single-shard copy/move operations. Since this function is internal, we do not allow users to call it directly. **Temporary Hack to Set Background Task Context** Background tasks cannot currently set custom GUCs like application_name before executing internal-only functions. To work around this, we inject a SET LOCAL application_name TO 'citus_rebalancer ...' statement as a prefix in the task command. This is a temporary hack to label internal tasks until proper GUC injection support is added to the background task executor. 2. Changes in Locking Strategy - Drop the leftover replication lock that previously serialized shard moves performed via logical replication. This lock was only needed when we used to drop and recreate the subscriptions/publications before each move. Since Citus now removes those objects later as part of the “unused distributed objects” cleanup, shard moves via logical replication can safely run in parallel without additional locking. - Introduced a per-shard advisory lock to prevent concurrent operations on the same shard while allowing maximum parallelism elsewhere. - Change the lock mode in AcquirePlacementColocationLock from ExclusiveLock to RowExclusiveLock to allow concurrent updates within the same colocation group, while still preventing concurrent DDL operations. 3. citus_rebalance_start() enhancements The citus_rebalance_start() function now accepts two new optional parameters: - parallel_transfer_colocated_shards BOOLEAN DEFAULT false, - parallel_transfer_reference_tables BOOLEAN DEFAULT false This ensures backward compatibility by preserving the existing behavior and avoiding any disruption to user expectations and when both are set to true, the rebalancer operates with full parallelism. Previous Rebalancer Behavior: SELECT citus_rebalance_start(shard_transfer_mode := 'force_logical'); This would: Start a single background task for replicating all reference tables Then, move all shards serially, one at a time. Task 1: replicate_reference_tables() ↓ Task 2: move_shard_1() ↓ Task 3: move_shard_2() ↓ Task 4: move_shard_3() Slow and sequential. Reference table copy is a bottleneck. Colocated shards must wait for each other. New Parallel Rebalancer: SELECT citus_rebalance_start( shard_transfer_mode := 'force_logical', parallel_transfer_colocated_shards := true, parallel_transfer_reference_tables := true ); This would: - Schedule independent background tasks for each reference-table shard. - Move colocated shards in parallel, while still maintaining dependency order. - Defer constraint application until all reference shards are in place. Task 1: copy_ref_shard_1() Task 2: copy_ref_shard_2() Task 3: copy_ref_shard_3() → Task 4: apply_constraints() ↓ Task 5: copy_shard_1() Task 6: copy_shard_2() Task 7: copy_shard_3() ↓ Task 8-10: move_shard_1..3() Each operation is scheduled independently and can run as soon as dependencies are satisfied. --- .../distributed/operations/shard_rebalancer.c | 128 +++-- .../distributed/operations/shard_split.c | 2 +- .../distributed/operations/shard_transfer.c | 470 ++++++++++++------ .../replication/multi_logical_replication.c | 43 +- src/backend/distributed/shared_library_init.c | 4 +- .../distributed/sql/citus--13.1-1--13.2-1.sql | 5 + .../sql/downgrades/citus--13.2-1--13.1-1.sql | 3 + .../13.2-1.sql | 9 + .../latest.sql | 9 + .../sql/udfs/citus_rebalance_start/13.2-1.sql | 13 + .../sql/udfs/citus_rebalance_start/latest.sql | 8 +- .../distributed/utils/background_jobs.c | 2 +- src/backend/distributed/utils/maintenanced.c | 2 +- .../distributed/utils/reference_table_utils.c | 403 ++++++++++++++- .../distributed/multi_logical_replication.h | 6 +- .../distributed/reference_table_utils.h | 5 + src/include/distributed/resource_lock.h | 25 +- src/include/distributed/shard_transfer.h | 44 +- 18 files changed, 947 insertions(+), 234 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/13.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 074f1bed0..e04ddd26f 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -222,6 +222,7 @@ typedef struct ShardMoveDependencies { HTAB *colocationDependencies; HTAB *nodeDependencies; + bool parallelTransferColocatedShards; } ShardMoveDependencies; char *VariablesToBePassedToNewConnections = NULL; @@ -270,7 +271,9 @@ static ShardCost GetShardCost(uint64 shardId, void *context); static List * NonColocatedDistRelationIdList(void); static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid); static int64 RebalanceTableShardsBackground(RebalanceOptions *options, Oid - shardReplicationModeOid); + shardReplicationModeOid, + bool ParallelTransferReferenceTables, + bool ParallelTransferColocatedShards); static void AcquireRebalanceColocationLock(Oid relationId, const char *operationName); static void ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, char *noticeOperation); @@ -296,9 +299,12 @@ static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStati static void ErrorOnConcurrentRebalance(RebalanceOptions *); static List * GetSetCommandListForNewConnections(void); static int64 GetColocationId(PlacementUpdateEvent *move); -static ShardMoveDependencies InitializeShardMoveDependencies(); -static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 - colocationId, +static ShardMoveDependencies InitializeShardMoveDependencies(bool + ParallelTransferColocatedShards); +static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, + int64 colocationId, + int64 *refTablesDepTaskIds, + int refTablesDepTaskIdsCount, ShardMoveDependencies shardMoveDependencies, int *nDepends); static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, @@ -1014,6 +1020,12 @@ citus_rebalance_start(PG_FUNCTION_ARGS) PG_ENSURE_ARGNOTNULL(2, "shard_transfer_mode"); Oid shardTransferModeOid = PG_GETARG_OID(2); + PG_ENSURE_ARGNOTNULL(3, "parallel_transfer_reference_tables"); + bool ParallelTransferReferenceTables = PG_GETARG_BOOL(3); + + PG_ENSURE_ARGNOTNULL(4, "parallel_transfer_colocated_shards"); + bool ParallelTransferColocatedShards = PG_GETARG_BOOL(4); + RebalanceOptions options = { .relationIdList = relationIdList, .threshold = strategy->defaultThreshold, @@ -1023,7 +1035,9 @@ citus_rebalance_start(PG_FUNCTION_ARGS) .rebalanceStrategy = strategy, .improvementThreshold = strategy->improvementThreshold, }; - int jobId = RebalanceTableShardsBackground(&options, shardTransferModeOid); + int jobId = RebalanceTableShardsBackground(&options, shardTransferModeOid, + ParallelTransferReferenceTables, + ParallelTransferColocatedShards); if (jobId == 0) { @@ -1988,17 +2002,20 @@ GetColocationId(PlacementUpdateEvent *move) * given colocation group and the other one is for tracking source nodes of all moves. */ static ShardMoveDependencies -InitializeShardMoveDependencies() +InitializeShardMoveDependencies(bool ParallelTransferColocatedShards) { ShardMoveDependencies shardMoveDependencies; shardMoveDependencies.colocationDependencies = CreateSimpleHashWithNameAndSize(int64, ShardMoveDependencyInfo, "colocationDependencyHashMap", 6); + shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int32, ShardMoveSourceNodeHashEntry, "nodeDependencyHashMap", 6); + shardMoveDependencies.parallelTransferColocatedShards = + ParallelTransferColocatedShards; return shardMoveDependencies; } @@ -2009,6 +2026,7 @@ InitializeShardMoveDependencies() */ static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, + int64 *refTablesDepTaskIds, int refTablesDepTaskIdsCount, ShardMoveDependencies shardMoveDependencies, int *nDepends) { HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64, @@ -2016,13 +2034,17 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, bool found; - /* Check if there exists a move in the same colocation group scheduled earlier. */ - ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( - shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, &found); - - if (found) + if (!shardMoveDependencies.parallelTransferColocatedShards) { - hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); + /* Check if there exists a move in the same colocation group scheduled earlier. */ + ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( + shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, & + found); + + if (found) + { + hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); + } } /* @@ -2045,6 +2067,23 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, } } + *nDepends = hash_get_num_entries(dependsList); + if (*nDepends == 0) + { + /* + * shard copy can only start after finishing copy of reference table shards + * so each shard task will have a dependency on the task that indicates the + * copy complete of reference tables + */ + while (refTablesDepTaskIdsCount > 0) + { + int64 refTableTaskId = *refTablesDepTaskIds; + hash_search(dependsList, &refTableTaskId, HASH_ENTER, NULL); + refTablesDepTaskIds++; + refTablesDepTaskIdsCount--; + } + } + *nDepends = hash_get_num_entries(dependsList); int64 *dependsArray = NULL; @@ -2076,9 +2115,13 @@ static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId, ShardMoveDependencies shardMoveDependencies) { - ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( - shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL); - shardMoveDependencyInfo->taskId = taskId; + if (!shardMoveDependencies.parallelTransferColocatedShards) + { + ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( + shardMoveDependencies.colocationDependencies, &colocationId, + HASH_ENTER, NULL); + shardMoveDependencyInfo->taskId = taskId; + } bool found; ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search( @@ -2103,7 +2146,9 @@ UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int * background job+task infrastructure. */ static int64 -RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationModeOid) +RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationModeOid, + bool ParallelTransferReferenceTables, + bool ParallelTransferColocatedShards) { if (list_length(options->relationIdList) == 0) { @@ -2174,7 +2219,8 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo initStringInfo(&buf); List *referenceTableIdList = NIL; - int64 replicateRefTablesTaskId = 0; + int64 *refTablesDepTaskIds = NULL; + int refTablesDepTaskIdsCount = 0; if (HasNodesWithMissingReferenceTables(&referenceTableIdList)) { @@ -2187,22 +2233,41 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo * Reference tables need to be copied to (newly-added) nodes, this needs to be the * first task before we can move any other table. */ - appendStringInfo(&buf, - "SELECT pg_catalog.replicate_reference_tables(%s)", - quote_literal_cstr(shardTranferModeLabel)); + if (ParallelTransferReferenceTables) + { + refTablesDepTaskIds = + ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes( + jobId, shardTransferMode, &refTablesDepTaskIdsCount); + ereport(DEBUG2, + (errmsg("%d dependent copy reference table tasks for job %ld", + refTablesDepTaskIdsCount, jobId), + errdetail("Rebalance scheduled as background job"), + errhint("To monitor progress, run: SELECT * FROM " + "citus_rebalance_status();"))); + } + else + { + /* Move all reference tables as single task. Classical way */ + appendStringInfo(&buf, + "SELECT pg_catalog.replicate_reference_tables(%s)", + quote_literal_cstr(shardTranferModeLabel)); - int32 nodesInvolved[] = { 0 }; + int32 nodesInvolved[] = { 0 }; - /* replicate_reference_tables permissions require superuser */ - Oid superUserId = CitusExtensionOwner(); - BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data, 0, - NULL, 0, nodesInvolved); - replicateRefTablesTaskId = task->taskid; + /* replicate_reference_tables permissions require superuser */ + Oid superUserId = CitusExtensionOwner(); + BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data, 0, + NULL, 0, nodesInvolved); + refTablesDepTaskIds = palloc0(sizeof(int64)); + refTablesDepTaskIds[0] = task->taskid; + refTablesDepTaskIdsCount = 1; + } } PlacementUpdateEvent *move = NULL; - ShardMoveDependencies shardMoveDependencies = InitializeShardMoveDependencies(); + ShardMoveDependencies shardMoveDependencies = + InitializeShardMoveDependencies(ParallelTransferColocatedShards); foreach_declared_ptr(move, placementUpdateList) { @@ -2220,16 +2285,11 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo int nDepends = 0; int64 *dependsArray = GenerateTaskMoveDependencyList(move, colocationId, + refTablesDepTaskIds, + refTablesDepTaskIdsCount, shardMoveDependencies, &nDepends); - if (nDepends == 0 && replicateRefTablesTaskId > 0) - { - nDepends = 1; - dependsArray = palloc(nDepends * sizeof(int64)); - dependsArray[0] = replicateRefTablesTaskId; - } - int32 nodesInvolved[2] = { 0 }; nodesInvolved[0] = move->sourceNode->nodeId; nodesInvolved[1] = move->targetNode->nodeId; diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index b1202e648..8116b0b4e 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1551,7 +1551,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, publicationInfoHash, logicalRepTargetList, groupedLogicalRepTargetsHash, - SHARD_SPLIT); + SHARD_SPLIT, false); /* * 10) Delete old shards metadata and mark the shards as to be deferred drop. diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index b7d07b2cf..404bac911 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -107,16 +107,18 @@ static void ErrorIfSameNode(char *sourceNodeName, int sourceNodePort, static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, bool useLogicalReplication, - const char *operationName); + const char *operationName, uint32 optionFlags); static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort); + int32 targetNodePort, + uint32 optionFlags); static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, - char *targetNodeName, int32 targetNodePort); + char *targetNodeName, int32 targetNodePort, + uint32 optionFlags); static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, int32 targetNodePort); @@ -165,7 +167,8 @@ static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval, static ShardCommandList * CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList); static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode); - +static void AcquireShardPlacementLock(uint64_t shardId, int lockMode, Oid relationId, + const char *operationName); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_copy_shard_placement); @@ -174,7 +177,7 @@ PG_FUNCTION_INFO_V1(master_copy_shard_placement); PG_FUNCTION_INFO_V1(citus_move_shard_placement); PG_FUNCTION_INFO_V1(citus_move_shard_placement_with_nodeid); PG_FUNCTION_INFO_V1(master_move_shard_placement); - +PG_FUNCTION_INFO_V1(citus_internal_copy_single_shard_placement); double DesiredPercentFreeAfterMove = 10; bool CheckAvailableSpaceBeforeMove = true; @@ -203,7 +206,7 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } @@ -232,7 +235,7 @@ citus_copy_shard_placement_with_nodeid(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, targetNode->workerName, targetNode->workerPort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } @@ -267,13 +270,69 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } +/* + * citus_internal_copy_single_shard_placement is an internal function that + * copies a single shard placement from a source node to a target node. + * It has two main differences from citus_copy_shard_placement: + * 1. it copies only a single shard placement, not all colocated shards + * 2. It allows to defer the constraints creation and this same function + * can be used to create the constraints later. + * + * The primary use case for this function is to transfer the shards of + * reference tables. Since all reference tables are colocated together, + * and each reference table has only one shard, this function can be used + * to transfer the shards of reference tables in parallel. + * Furthermore, the reference tables could have relations with + * other reference tables, so we need to ensure that their constraints + * are also transferred after copying the shards to the target node. + * For this reason, we allow the caller to defer the constraints creation. + * + * This function is not supposed to be called by the user directly. + */ +Datum +citus_internal_copy_single_shard_placement(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + int64 shardId = PG_GETARG_INT64(0); + uint32 sourceNodeId = PG_GETARG_INT32(1); + uint32 targetNodeId = PG_GETARG_INT32(2); + uint32 flags = PG_GETARG_INT32(3); + Oid shardReplicationModeOid = PG_GETARG_OID(4); + + bool missingOk = false; + WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk); + WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk); + + char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); + + /* + * This is an internal function that is used by the rebalancer. + * It is not supposed to be called by the user directly. + */ + if (!IsRebalancerInternalBackend()) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("This is an internal Citus function can only be " + "used in by a rebalancer task"))); + } + + TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, + targetNode->workerName, targetNode->workerPort, + shardReplicationMode, SHARD_TRANSFER_COPY, flags); + + PG_RETURN_VOID(); +} + + /* * citus_move_shard_placement moves given shard (and its co-located shards) from one * node to the other node. To accomplish this it entirely recreates the table structure @@ -315,7 +374,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_MOVE); + shardReplicationMode, SHARD_TRANSFER_MOVE, 0); PG_RETURN_VOID(); } @@ -343,12 +402,42 @@ citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS) char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, targetNode->workerName, - targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE); + targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE, 0); PG_RETURN_VOID(); } +/* + * AcquireShardPlacementLock tries to acquire a lock on the shardid + * while moving/copying the shard placement. If this + * is it not possible it fails instantly because this means + * another move/copy on same shard is currently happening. */ +static void +AcquireShardPlacementLock(uint64_t shardId, int lockMode, Oid relationId, + const char *operationName) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = true; + + SET_LOCKTAG_SHARD_MOVE(tag, shardId); + + LockAcquireResult lockAcquired = LockAcquire(&tag, lockMode, sessionLock, dontWait); + if (!lockAcquired) + { + ereport(ERROR, (errmsg("could not acquire the lock required to %s %s", + operationName, + generate_qualified_relation_name(relationId)), + errdetail("It means that either a concurrent shard move " + "or colocated distributed table creation is " + "happening."), + errhint("Make sure that the concurrent operation has " + "finished and re-run the command"))); + } +} + + /* * TransferShards is the function for shard transfers. */ @@ -356,7 +445,7 @@ void TransferShards(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, char shardReplicationMode, - ShardTransferType transferType) + ShardTransferType transferType, uint32 optionFlags) { /* strings to be used in log messages */ const char *operationName = ShardTransferTypeNames[transferType]; @@ -385,20 +474,36 @@ TransferShards(int64 shardId, char *sourceNodeName, ErrorIfTargetNodeIsNotSafeForTransfer(targetNodeName, targetNodePort, transferType); - AcquirePlacementColocationLock(distributedTableId, ExclusiveLock, operationName); + AcquirePlacementColocationLock(distributedTableId, RowExclusiveLock, operationName); - List *colocatedTableList = ColocatedTableList(distributedTableId); - List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + List *colocatedTableList; + List *colocatedShardList; + + /* + * If SHARD_TRANSFER_SINGLE_SHARD_ONLY is set, we only transfer a single shard + * specified by shardId. Otherwise, we transfer all colocated shards. + */ + bool isSingleShardOnly = optionFlags & SHARD_TRANSFER_SINGLE_SHARD_ONLY; + + if (isSingleShardOnly) + { + colocatedTableList = list_make1_oid(distributedTableId); + colocatedShardList = list_make1(shardInterval); + } + else + { + colocatedTableList = ColocatedTableList(distributedTableId); + colocatedShardList = ColocatedShardIntervalList(shardInterval); + } EnsureTableListOwner(colocatedTableList); if (transferType == SHARD_TRANSFER_MOVE) { /* - * Block concurrent DDL / TRUNCATE commands on the relation. Similarly, - * block concurrent citus_move_shard_placement() on any shard of - * the same relation. This is OK for now since we're executing shard - * moves sequentially anyway. + * Block concurrent DDL / TRUNCATE commands on the relation. while, + * allow concurrent citus_move_shard_placement() on the shards of + * the same relation. */ LockColocatedRelationsForMove(colocatedTableList); } @@ -412,14 +517,66 @@ TransferShards(int64 shardId, char *sourceNodeName, /* * We sort shardIntervalList so that lock operations will not cause any - * deadlocks. + * deadlocks. But we do not need to do that if the list contain only one + * shard. */ - colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + if (!isSingleShardOnly) + { + colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + } - if (TransferAlreadyCompleted(colocatedShardList, - sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort, - transferType)) + /* We have pretty much covered the concurrent rebalance operations + * and we want to allow concurrent moves within the same colocation group. + * but at the same time we want to block the concurrent moves on the same shard + * placement. So we lock the shard moves before starting the transfer. + */ + foreach_declared_ptr(shardInterval, colocatedShardList) + { + int64 shardIdToLock = shardInterval->shardId; + AcquireShardPlacementLock(shardIdToLock, ExclusiveLock, distributedTableId, + operationName); + } + + bool transferAlreadyCompleted = TransferAlreadyCompleted(colocatedShardList, + sourceNodeName, + sourceNodePort, + targetNodeName, + targetNodePort, + transferType); + + /* + * If we just need to create the shard relationships,We don't need to do anything + * else other than calling CopyShardTables with SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY + * flag. + */ + bool createRelationshipsOnly = optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY; + + if (createRelationshipsOnly) + { + if (!transferAlreadyCompleted) + { + /* + * if the transfer is not completed, and we are here just to create + * the relationships, we can return right away + */ + ereport(WARNING, (errmsg("shard is not present on node %s:%d", + targetNodeName, targetNodePort), + errdetail("%s may have not completed.", + operationNameCapitalized))); + return; + } + + CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName + , + targetNodePort, (shardReplicationMode == + TRANSFER_MODE_FORCE_LOGICAL), + operationFunctionName, optionFlags); + + /* We don't need to do anything else, just return */ + return; + } + + if (transferAlreadyCompleted) { /* if the transfer is already completed, we can return right away */ ereport(WARNING, (errmsg("shard is already present on node %s:%d", @@ -515,7 +672,8 @@ TransferShards(int64 shardId, char *sourceNodeName, } CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort, useLogicalReplication, operationFunctionName); + targetNodePort, useLogicalReplication, operationFunctionName, + optionFlags); if (transferType == SHARD_TRANSFER_MOVE) { @@ -676,7 +834,7 @@ IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetN /* * LockColocatedRelationsForMove takes a list of relations, locks all of them - * using ShareUpdateExclusiveLock + * using ShareLock */ static void LockColocatedRelationsForMove(List *colocatedTableList) @@ -684,7 +842,7 @@ LockColocatedRelationsForMove(List *colocatedTableList) Oid colocatedTableId = InvalidOid; foreach_declared_oid(colocatedTableId, colocatedTableList) { - LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock); + LockRelationOid(colocatedTableId, RowExclusiveLock); } } @@ -1333,7 +1491,7 @@ ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList) static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, bool useLogicalReplication, - const char *operationName) + const char *operationName, uint32 optionFlags) { if (list_length(shardIntervalList) < 1) { @@ -1343,16 +1501,22 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP /* Start operation to prepare for generating cleanup records */ RegisterOperationNeedingCleanup(); - if (useLogicalReplication) + bool createRelationshipsOnly = optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY; + + /* + * If we're just going to create relationships only always use + * CopyShardTablesViaBlockWrites. + */ + if (useLogicalReplication && !createRelationshipsOnly) { CopyShardTablesViaLogicalReplication(shardIntervalList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort); + targetNodePort, optionFlags); } else { CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort); + targetNodeName, targetNodePort, optionFlags); } /* @@ -1369,7 +1533,7 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort) + int32 targetNodePort, uint32 optionFlags) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaLogicalReplication", @@ -1407,9 +1571,13 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa MemoryContextSwitchTo(oldContext); + bool skipRelationshipCreation = (optionFlags & + SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS); + /* data copy is done seperately when logical replication is used */ LogicallyReplicateShards(shardIntervalList, sourceNodeName, - sourceNodePort, targetNodeName, targetNodePort); + sourceNodePort, targetNodeName, targetNodePort, + skipRelationshipCreation); } @@ -1437,7 +1605,7 @@ CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList) static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort) + int32 targetNodePort, uint32 optionFlags) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaBlockWrites", @@ -1446,127 +1614,150 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); - - /* iterate through the colocated shards and copy each */ ShardInterval *shardInterval = NULL; - foreach_declared_ptr(shardInterval, shardIntervalList) - { - /* - * For each shard we first create the shard table in a separate - * transaction and then we copy the data and create the indexes in a - * second separate transaction. The reason we don't do both in a single - * transaction is so we can see the size of the new shard growing - * during the copy when we run get_rebalance_progress in another - * session. If we wouldn't split these two phases up, then the table - * wouldn't be visible in the session that get_rebalance_progress uses. - * So get_rebalance_progress would always report its size as 0. - */ - List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, - sourceNodePort); - char *tableOwner = TableOwner(shardInterval->relationId); - /* drop the shard we created on the target, in case of failure */ - InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, - ConstructQualifiedShardName(shardInterval), - GroupForNode(targetNodeName, - targetNodePort), - CLEANUP_ON_FAILURE); - - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); - } - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_COPYING_DATA); - - ConflictWithIsolationTestingBeforeCopy(); - CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); - ConflictWithIsolationTestingAfterCopy(); - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS); - - foreach_declared_ptr(shardInterval, shardIntervalList) - { - List *ddlCommandList = - PostLoadShardCreationCommandList(shardInterval, sourceNodeName, - sourceNodePort); - char *tableOwner = TableOwner(shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); - - MemoryContextReset(localContext); - } + bool createRelationshipsOnly = optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY; /* - * Once all shards are copied, we can recreate relationships between shards. - * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. + * If we’re only asked to create the relationships, the shards are already + * present and populated on the node. Skip the table‑setup and data‑loading + * steps and proceed straight to creating the relationships. */ - List *shardIntervalWithDDCommandsList = NIL; - foreach_declared_ptr(shardInterval, shardIntervalList) + if (!createRelationshipsOnly) { - if (PartitionTable(shardInterval->relationId)) + /* iterate through the colocated shards and copy each */ + foreach_declared_ptr(shardInterval, shardIntervalList) { - char *attachPartitionCommand = - GenerateAttachShardPartitionCommand(shardInterval); + /* + * For each shard we first create the shard table in a separate + * transaction and then we copy the data and create the indexes in a + * second separate transaction. The reason we don't do both in a single + * transaction is so we can see the size of the new shard growing + * during the copy when we run get_rebalance_progress in another + * session. If we wouldn't split these two phases up, then the table + * wouldn't be visible in the session that get_rebalance_progress uses. + * So get_rebalance_progress would always report its size as 0. + */ + List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, + sourceNodeName, + sourceNodePort); + char *tableOwner = TableOwner(shardInterval->relationId); - ShardCommandList *shardCommandList = CreateShardCommandList( - shardInterval, - list_make1(attachPartitionCommand)); - shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, - shardCommandList); + /* drop the shard we created on the target, in case of failure */ + InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, + ConstructQualifiedShardName( + shardInterval), + GroupForNode(targetNodeName, + targetNodePort), + CLEANUP_ON_FAILURE); + + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + } + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COPYING_DATA); + + ConflictWithIsolationTestingBeforeCopy(); + CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); + ConflictWithIsolationTestingAfterCopy(); + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS); + + foreach_declared_ptr(shardInterval, shardIntervalList) + { + List *ddlCommandList = + PostLoadShardCreationCommandList(shardInterval, sourceNodeName, + sourceNodePort); + char *tableOwner = TableOwner(shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + + MemoryContextReset(localContext); } } - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); - /* - * Iterate through the colocated shards and create DDL commamnds - * to create the foreign constraints. + * Skip creating shard relationships if the caller has requested that they + * not be created. */ - foreach_declared_ptr(shardInterval, shardIntervalList) + bool skipRelationshipCreation = (optionFlags & + SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS); + + if (!skipRelationshipCreation) { - List *shardForeignConstraintCommandList = NIL; - List *referenceTableForeignConstraintList = NIL; + /* + * Once all shards are copied, we can recreate relationships between shards. + * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. + */ + List *shardIntervalWithDDCommandsList = NIL; + foreach_declared_ptr(shardInterval, shardIntervalList) + { + if (PartitionTable(shardInterval->relationId)) + { + char *attachPartitionCommand = + GenerateAttachShardPartitionCommand(shardInterval); - CopyShardForeignConstraintCommandListGrouped(shardInterval, - &shardForeignConstraintCommandList, - &referenceTableForeignConstraintList); + ShardCommandList *shardCommandList = CreateShardCommandList( + shardInterval, + list_make1(attachPartitionCommand)); + shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, + shardCommandList); + } + } - ShardCommandList *shardCommandList = CreateShardCommandList( - shardInterval, - list_concat(shardForeignConstraintCommandList, - referenceTableForeignConstraintList)); - shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, - shardCommandList); + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); + + /* + * Iterate through the colocated shards and create DDL commamnds + * to create the foreign constraints. + */ + foreach_declared_ptr(shardInterval, shardIntervalList) + { + List *shardForeignConstraintCommandList = NIL; + List *referenceTableForeignConstraintList = NIL; + + CopyShardForeignConstraintCommandListGrouped(shardInterval, + & + shardForeignConstraintCommandList, + & + referenceTableForeignConstraintList); + + ShardCommandList *shardCommandList = CreateShardCommandList( + shardInterval, + list_concat(shardForeignConstraintCommandList, + referenceTableForeignConstraintList)); + shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, + shardCommandList); + } + + /* Now execute the Partitioning & Foreign constraints creation commads. */ + ShardCommandList *shardCommandList = NULL; + foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList) + { + char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, + shardCommandList->ddlCommandList); + } + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COMPLETING); } - - /* Now execute the Partitioning & Foreign constraints creation commads. */ - ShardCommandList *shardCommandList = NULL; - foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList) - { - char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, - shardCommandList->ddlCommandList); - } - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_COMPLETING); - MemoryContextReset(localContext); MemoryContextSwitchTo(oldContext); } @@ -1647,7 +1838,8 @@ CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardInte ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList, MaxAdaptiveExecutorPoolSize, - NULL /* jobIdList (ignored by API implementation) */); + NULL /* jobIdList (ignored by API implementation) */ + ); } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 7189216d0..4c43d3513 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -118,7 +118,8 @@ static List * GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shard static List * GetIndexCommandListForShardBackingReplicaIdentity(Oid relationId, uint64 shardId); static void CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList, - LogicalRepType type); + LogicalRepType type, + bool skipInterShardRelationships); static void ExecuteCreateIndexCommands(List *logicalRepTargetList); static void ExecuteCreateConstraintsBackedByIndexCommands(List *logicalRepTargetList); static List * ConvertNonExistingPlacementDDLCommandsToTasks(List *shardCommandList, @@ -132,7 +133,6 @@ static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); static void WaitForMiliseconds(long timeout); static XLogRecPtr GetSubscriptionPosition( GroupedLogicalRepTargets *groupedLogicalRepTargets); -static void AcquireLogicalReplicationLock(void); static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode, List *shardIntervals); @@ -154,9 +154,9 @@ static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition, */ void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, - char *targetNodeName, int targetNodePort) + char *targetNodeName, int targetNodePort, + bool skipInterShardRelationshipCreation) { - AcquireLogicalReplicationLock(); char *superUser = CitusExtensionOwnerName(); char *databaseName = get_database_name(MyDatabaseId); int connectionFlags = FORCE_NEW_CONNECTION; @@ -258,7 +258,8 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo publicationInfoHash, logicalRepTargetList, groupedLogicalRepTargetsHash, - SHARD_MOVE); + SHARD_MOVE, + skipInterShardRelationshipCreation); /* * We use these connections exclusively for subscription management, @@ -317,7 +318,8 @@ CompleteNonBlockingShardTransfer(List *shardList, HTAB *publicationInfoHash, List *logicalRepTargetList, HTAB *groupedLogicalRepTargetsHash, - LogicalRepType type) + LogicalRepType type, + bool skipInterShardRelationshipCreation) { /* Start applying the changes from the replication slots to catch up. */ EnableSubscriptions(logicalRepTargetList); @@ -345,7 +347,8 @@ CompleteNonBlockingShardTransfer(List *shardList, * and partitioning hierarchy. Once they are done, wait until the replication * catches up again. So we don't block writes too long. */ - CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList, type); + CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList, type, + skipInterShardRelationshipCreation); UpdatePlacementUpdateStatusForShardIntervalList( shardList, @@ -372,7 +375,7 @@ CompleteNonBlockingShardTransfer(List *shardList, WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); - if (type != SHARD_SPLIT) + if (type != SHARD_SPLIT && !skipInterShardRelationshipCreation) { UpdatePlacementUpdateStatusForShardIntervalList( shardList, @@ -497,25 +500,6 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList) } -/* - * AcquireLogicalReplicationLock tries to acquire a lock for logical - * replication. We need this lock, because at the start of logical replication - * we clean up old subscriptions and publications. Because of this cleanup it's - * not safe to run multiple logical replication based shard moves at the same - * time. If multiple logical replication moves would run at the same time, the - * second move might clean up subscriptions and publications that are in use by - * another move. - */ -static void -AcquireLogicalReplicationLock(void) -{ - LOCKTAG tag; - SET_LOCKTAG_LOGICAL_REPLICATION(tag); - - LockAcquire(&tag, ExclusiveLock, false, false); -} - - /* * PrepareReplicationSubscriptionList returns list of shards to be logically * replicated from given shard list. This is needed because Postgres does not @@ -675,7 +659,8 @@ GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shardId) */ static void CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList, - LogicalRepType type) + LogicalRepType type, + bool skipInterShardRelationships) { /* * We create indexes in 4 steps. @@ -705,7 +690,7 @@ CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList, /* * Creating the partitioning hierarchy errors out in shard splits when */ - if (type != SHARD_SPLIT) + if (type != SHARD_SPLIT && !skipInterShardRelationships) { /* create partitioning hierarchy, if any */ CreatePartitioningHierarchy(logicalRepTargetList); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 165aea05f..e6fd1c308 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1952,7 +1952,7 @@ RegisterCitusConfigVariables(void) "because total background worker count is shared by all background workers. The value " "represents the possible maximum number of task executors."), &MaxBackgroundTaskExecutors, - 4, 1, MAX_BG_TASK_EXECUTORS, + 1, 1, MAX_BG_TASK_EXECUTORS, PGC_SIGHUP, GUC_STANDARD, NULL, NULL, NULL); @@ -1964,7 +1964,7 @@ RegisterCitusConfigVariables(void) "for scheduled background tasks that involve a particular node"), NULL, &MaxBackgroundTaskExecutorsPerNode, - 1, 1, 128, + 4, 1, 128, PGC_SIGHUP, GUC_STANDARD, NULL, NULL, NULL); diff --git a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql index 2f507eb24..be4e0e62f 100644 --- a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql +++ b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql @@ -1,3 +1,8 @@ -- citus--13.1-1--13.2-1 + -- bump version to 13.2-1 #include "udfs/worker_last_saved_explain_analyze/13.2-1.sql" + +DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode); +#include "udfs/citus_rebalance_start/13.2-1.sql" +#include "udfs/citus_internal_copy_single_shard_placement/13.2-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql index de26b790a..cfdf1eb60 100644 --- a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql @@ -1,4 +1,7 @@ -- citus--13.2-1--13.1-1 +DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean); +DROP FUNCTION IF EXISTS citus_internal.citus_internal_copy_single_shard_placement(bigint, integer, integer, integer, citus.shard_transfer_mode); +#include "udfs/citus_rebalance_start/11.1-1.sql" -- downgrade version to 13.1-1 DROP FUNCTION IF EXISTS pg_catalog.worker_last_saved_explain_analyze(); diff --git a/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/13.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/13.2-1.sql new file mode 100644 index 000000000..bbb2fb63b --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/13.2-1.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION citus_internal.citus_internal_copy_single_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + flags integer, + transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_internal_copy_single_shard_placement$$; diff --git a/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/latest.sql new file mode 100644 index 000000000..bbb2fb63b --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/latest.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION citus_internal.citus_internal_copy_single_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + flags integer, + transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_internal_copy_single_shard_placement$$; diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql b/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql new file mode 100644 index 000000000..658e78dd6 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql @@ -0,0 +1,13 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start( + rebalance_strategy name DEFAULT NULL, + drain_only boolean DEFAULT false, + shard_transfer_mode citus.shard_transfer_mode default 'auto', + parallel_transfer_reference_tables boolean DEFAULT false, + parallel_transfer_colocated_shards boolean DEFAULT false + ) + RETURNS bigint + AS 'MODULE_PATHNAME' + LANGUAGE C VOLATILE; +COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean) + IS 'rebalance the shards in the cluster in the background'; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean) TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql b/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql index cc84d3142..658e78dd6 100644 --- a/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql @@ -1,11 +1,13 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start( rebalance_strategy name DEFAULT NULL, drain_only boolean DEFAULT false, - shard_transfer_mode citus.shard_transfer_mode default 'auto' + shard_transfer_mode citus.shard_transfer_mode default 'auto', + parallel_transfer_reference_tables boolean DEFAULT false, + parallel_transfer_colocated_shards boolean DEFAULT false ) RETURNS bigint AS 'MODULE_PATHNAME' LANGUAGE C VOLATILE; -COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode) +COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean) IS 'rebalance the shards in the cluster in the background'; -GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode) TO PUBLIC; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean) TO PUBLIC; diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 2d0f03a4c..ba3bde84f 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -125,7 +125,7 @@ static volatile sig_atomic_t GotSighup = false; /* keeping track of parallel background tasks per node */ HTAB *ParallelTasksPerNode = NULL; -int MaxBackgroundTaskExecutorsPerNode = 1; +int MaxBackgroundTaskExecutorsPerNode = 4; PG_FUNCTION_INFO_V1(citus_job_cancel); PG_FUNCTION_INFO_V1(citus_job_wait); diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index e6bf3d00c..234acc53d 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -98,7 +98,7 @@ double DistributedDeadlockDetectionTimeoutFactor = 2.0; int Recover2PCInterval = 60000; int DeferShardDeleteInterval = 15000; int BackgroundTaskQueueCheckInterval = 5000; -int MaxBackgroundTaskExecutors = 4; +int MaxBackgroundTaskExecutors = 1; char *MainDb = ""; /* config variables for metadata sync timeout */ diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 8f0d89fc9..5a98c9cdd 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -41,16 +41,24 @@ #include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" +#include "distributed/shard_transfer.h" +#include "distributed/hash_helpers.h" + /* local function forward declarations */ static List * WorkersWithoutReferenceTablePlacement(uint64 shardId, LOCKMODE lockMode); -static StringInfo CopyShardPlacementToWorkerNodeQuery( - ShardPlacement *sourceShardPlacement, - WorkerNode *workerNode, - char transferMode); +static StringInfo CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement + , + WorkerNode *workerNode, + char transferMode); static bool AnyRelationsModifiedInTransaction(List *relationIdList); static List * ReplicatedMetadataSyncedDistributedTableList(void); static bool NodeHasAllReferenceTableReplicas(WorkerNode *workerNode); +typedef struct ShardTaskEntry +{ + uint64 shardId; + int64 taskId; +} ShardTaskEntry; /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(upgrade_to_reference_table); @@ -131,6 +139,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) * DROP TABLE and create_reference_table calls so that the list of reference tables we * operate on are stable. * + * * Since the changes to the reference table placements are made via loopback * connections we release the locks held at the end of this function. Due to Citus * only running transactions in READ COMMITTED mode we can be sure that other @@ -179,7 +188,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) if (list_length(newWorkersList) == 0) { /* - * All workers alreaddy have a copy of the reference tables, make sure that + * All workers already have a copy of the reference tables, make sure that * any locks obtained earlier are released. It will probably not matter, but * we release the locks in the reverse order we obtained them in. */ @@ -230,7 +239,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) WorkerNode *newWorkerNode = NULL; foreach_declared_ptr(newWorkerNode, newWorkersList) { - ereport(NOTICE, (errmsg("replicating reference table '%s' to %s:%d ...", + ereport(DEBUG2, (errmsg("replicating reference table '%s' to %s:%d ...", referenceTableName, newWorkerNode->workerName, newWorkerNode->workerPort))); @@ -294,6 +303,381 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) } +/* + * ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes is essentially a + * twin of EnsureReferenceTablesExistOnAllNodesExtended. The difference is instead of + * copying the missing tables on to the worker nodes this function creates the background tasks + * for each required copy operation and schedule it in the background job. + * Another difference is that instead of moving all the colocated shards sequencially + * this function creates a seperate background task for each shard, even when the shards + * are part of same colocated shard group. + * + * For transfering the shards in parallel the function creates a task for each shard + * move and than schedules another task that creates the shard relationships (if any) + * between shards and that task wait for the completion of all shard transfer tasks. + * + * The function returns an array of task ids that are created for creating the shard + * relationships, effectively completion of these tasks signals the completion of + * of reference table setup on the worker nodes. Any process that needs to wait for + * the completion of the reference table setup can wait for these tasks to complete. + * + * The transferMode is passed to this function gets ignored for now and it only uses + * block write mode. + */ +int64 * +ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char transferMode + , + int *nDependTasks) +{ + List *referenceTableIdList = NIL; + uint64 shardId = INVALID_SHARD_ID; + List *newWorkersList = NIL; + int64 *dependsTaskArray = NULL; + const char *referenceTableName = NULL; + int colocationId = GetReferenceTableColocationId(); + + *nDependTasks = 0; + if (colocationId == INVALID_COLOCATION_ID) + { + /* we have no reference table yet. */ + return 0; + } + + /* + * Most of the time this function should result in a conclusion where we do not need + * to copy any reference tables. To prevent excessive locking the majority of the time + * we run our precondition checks first with a lower lock. If, after checking with the + * lower lock, that we might need to copy reference tables we check with a more + * aggressive and self conflicting lock. It is important to be self conflicting in the + * second run to make sure that two concurrent calls to this routine will actually not + * run concurrently after the initial check. + * + * If after two iterations of precondition checks we still find the need for copying + * reference tables we exit the loop with all locks held. This will prevent concurrent + * DROP TABLE and create_reference_table calls so that the list of reference tables we + * operate on are stable. + * + * + * Since the changes to the reference table placements are made via loopback + * connections we release the locks held at the end of this function. Due to Citus + * only running transactions in READ COMMITTED mode we can be sure that other + * transactions correctly find the metadata entries. + */ + LOCKMODE lockmodes[] = { AccessShareLock, ExclusiveLock }; + for (int lockmodeIndex = 0; lockmodeIndex < lengthof(lockmodes); lockmodeIndex++) + { + LockColocationId(colocationId, lockmodes[lockmodeIndex]); + + referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE); + if (referenceTableIdList == NIL) + { + /* + * No reference tables exist, make sure that any locks obtained earlier are + * released. It will probably not matter, but we release the locks in the + * reverse order we obtained them in. + */ + for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return 0; + } + + Oid referenceTableId = linitial_oid(referenceTableIdList); + referenceTableName = get_rel_name(referenceTableId); + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + if (list_length(shardIntervalList) == 0) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", + referenceTableName))); + } + + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + shardId = shardInterval->shardId; + + /* + * We only take an access share lock, otherwise we'll hold up citus_add_node. + * In case of create_reference_table() where we don't want concurrent writes + * to pg_dist_node, we have already acquired ShareLock on pg_dist_node. + */ + newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, AccessShareLock); + if (list_length(newWorkersList) == 0) + { + /* + * All workers already have a copy of the reference tables, make sure that + * any locks obtained earlier are released. It will probably not matter, but + * we release the locks in the reverse order we obtained them in. + */ + for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return 0; + } + } + + /* + * citus_copy_shard_placement triggers metadata sync-up, which tries to + * acquire a ShareLock on pg_dist_node. We do master_copy_shad_placement + * in a separate connection. If we have modified pg_dist_node in the + * current backend, this will cause a deadlock. + */ + if (TransactionModifiedNodeMetadata) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate reference tables in a transaction " + "that modified node metadata"))); + } + + /* + * Modifications to reference tables in current transaction are not visible + * to citus_copy_shard_placement, since it is done in a separate backend. + */ + if (AnyRelationsModifiedInTransaction(referenceTableIdList)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate reference tables in a transaction " + "that modified a reference table"))); + } + + bool missingOk = false; + ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk); + if (sourceShardPlacement == NULL) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table shard " + UINT64_FORMAT + " does not have an active shard placement", + shardId))); + } + + WorkerNode *newWorkerNode = NULL; + BackgroundTask *task = NULL; + StringInfoData buf = { 0 }; + initStringInfo(&buf); + List *depTasksList = NIL; + const char *transferModeString = + transferMode == TRANSFER_MODE_BLOCK_WRITES ? "block_writes" : + transferMode == TRANSFER_MODE_FORCE_LOGICAL ? "force_logical" : + "auto"; + + + foreach_declared_ptr(newWorkerNode, newWorkersList) + { + ereport(DEBUG2, (errmsg("replicating reference table '%s' to %s:%d ...", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort))); + + Oid relationId = InvalidOid; + List *nodeTasksList = NIL; + HTAB *shardTaskMap = CreateSimpleHashWithNameAndSize(uint64, + ShardTaskEntry, + "Shard_Task_Map", + list_length( + referenceTableIdList)); + + foreach_declared_oid(relationId, referenceTableIdList) + { + referenceTableName = get_rel_name(relationId); + List *shardIntervalList = LoadShardIntervalList(relationId); + if (list_length(shardIntervalList) != 1) + { + ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", + referenceTableName))); + } + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + shardId = shardInterval->shardId; + + resetStringInfo(&buf); + uint32 shardTransferFlags = SHARD_TRANSFER_SINGLE_SHARD_ONLY | + SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS; + + /* Temporary hack until we get background task config support PR */ + appendStringInfo(&buf, + "SET LOCAL application_name TO '%s%ld';", + CITUS_REBALANCER_APPLICATION_NAME_PREFIX, + GetGlobalPID()); + + /* + * In first step just create and load data in the shards but defer the creation + * of the shard relationships to the next step. + * The reason we want to defer the creation of the shard relationships is that + * we want to make sure that all the parallel shard copy task are finished + * before we create the relationships. Otherwise we might end up with + * a situation where the dependent-shard task is still running and trying to + * create the shard relationships will result in ERROR. + */ + appendStringInfo(&buf, + "SELECT citus_internal.citus_internal_copy_single_shard_placement(%ld,%u,%u,%u,%s)", + shardId, + sourceShardPlacement->nodeId, + newWorkerNode->nodeId, + shardTransferFlags, + quote_literal_cstr(transferModeString)); + + ereport(DEBUG2, + (errmsg("replicating reference table '%s' to %s:%d ... QUERY= %s", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort, buf.data))); + + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); + List *relatedRelations = list_concat(cacheEntry-> + referencedRelationsViaForeignKey, + cacheEntry-> + referencingRelationsViaForeignKey); + List *dependencyTaskList = NIL; + + Oid relatedRelationId = InvalidOid; + foreach_declared_oid(relatedRelationId, relatedRelations) + { + if (!list_member_oid(referenceTableIdList, relatedRelationId)) + { + continue; + } + + List *relatedShardIntervalList = LoadShardIntervalList( + relatedRelationId); + ShardInterval *relatedShardInterval = (ShardInterval *) linitial( + relatedShardIntervalList); + uint64 relatedShardId = relatedShardInterval->shardId; + bool taskFound = false; + ShardTaskEntry *taskEntry = hash_search(shardTaskMap, &relatedShardId, + HASH_FIND, &taskFound); + if (taskFound) + { + dependencyTaskList = lappend(dependencyTaskList, taskEntry); + } + } + + int nDepends = list_length(dependencyTaskList); + int64 *dependsArray = NULL; + if (nDepends > 0) + { + dependsArray = (int64 *) palloc(sizeof(int64) * nDepends); + int i = 0; + ListCell *lc; + foreach(lc, dependencyTaskList) + { + dependsArray[i++] = ((ShardTaskEntry *) lfirst(lc))->taskId; + } + } + + int32 nodesInvolved[2] = { 0 }; + nodesInvolved[0] = sourceShardPlacement->nodeId; + nodesInvolved[1] = newWorkerNode->nodeId; + + task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data, + nDepends, + dependsArray, 2, + nodesInvolved); + + bool found = false; + ShardTaskEntry *taskEntry = hash_search(shardTaskMap, &shardId, HASH_ENTER, + &found); + if (!found) + { + taskEntry->taskId = task->taskid; + ereport(DEBUG2, + (errmsg( + "Added hash entry in scheduled task hash with task %ld for shard %ld", + task->taskid, shardId))); + } + else + { + ereport(ERROR, (errmsg("failed to record task dependency for shard %ld", + shardId))); + } + + nodeTasksList = lappend(nodeTasksList, task); + if (dependsArray) + { + pfree(dependsArray); + } + list_free(dependencyTaskList); + } + + if (list_length(nodeTasksList) > 0) + { + int nDepends = list_length(nodeTasksList); + int32 nodesInvolved[2] = { 0 }; + nodesInvolved[0] = sourceShardPlacement->nodeId; + nodesInvolved[1] = newWorkerNode->nodeId; + int64 *dependsArray = palloc(sizeof(int64) * list_length(nodeTasksList)); + int idx = 0; + foreach_declared_ptr(task, nodeTasksList) + { + dependsArray[idx++] = task->taskid; + } + resetStringInfo(&buf); + uint32 shardTransferFlags = SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY; + appendStringInfo(&buf, + "SET LOCAL application_name TO '%s%ld';\n", + CITUS_REBALANCER_APPLICATION_NAME_PREFIX, + GetGlobalPID()); + appendStringInfo(&buf, + "SELECT citus_internal.citus_internal_copy_single_shard_placement(%ld,%u,%u,%u,%s)", + shardId, + sourceShardPlacement->nodeId, + newWorkerNode->nodeId, + shardTransferFlags, + quote_literal_cstr(transferModeString)); + + ereport(DEBUG2, + (errmsg( + "creating relations for reference table '%s' on %s:%d ... QUERY= %s", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort, buf.data))); + + task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data, + nDepends, + dependsArray, 2, + nodesInvolved); + + depTasksList = lappend(depTasksList, task); + + pfree(dependsArray); + list_free(nodeTasksList); + nodeTasksList = NIL; + } + + hash_destroy(shardTaskMap); + } + + /* + * compute a dependent task list array to be used to indicate the completion of all + * reference table shards copy, so that we can start with distributed shard copy + */ + if (list_length(depTasksList) > 0) + { + *nDependTasks = list_length(depTasksList); + dependsTaskArray = palloc(sizeof(int64) * *nDependTasks); + int idx = 0; + foreach_declared_ptr(task, depTasksList) + { + dependsTaskArray[idx++] = task->taskid; + } + list_free(depTasksList); + } + + /* + * Since reference tables have been copied via a loopback connection we do not have to + * retain our locks. Since Citus only runs well in READ COMMITTED mode we can be sure + * that other transactions will find the reference tables copied. + * We have obtained and held multiple locks, here we unlock them all in the reverse + * order we have obtained them in. + */ + for (int releaseLockmodeIndex = lengthof(lockmodes) - 1; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return dependsTaskArray; +} + + /* * HasNodesWithMissingReferenceTables checks if all reference tables are already copied to * all nodes. When a node doesn't have a copy of the reference tables we call them missing @@ -424,7 +808,7 @@ CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement, appendStringInfo(queryString, "SELECT pg_catalog.citus_copy_shard_placement(" UINT64_FORMAT ", %d, %d, " - "transfer_mode := %s)", + "transfer_mode := %s)", sourceShardPlacement->shardId, sourceShardPlacement->nodeId, workerNode->nodeId, @@ -614,8 +998,9 @@ DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly) * connections. */ void -DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext( - MetadataSyncContext *context, int32 groupId, bool localOnly) +DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext(MetadataSyncContext * + context, int32 groupId, + bool localOnly) { List *replicatedPlacementListForGroup = ReplicatedPlacementsForNodeGroup(groupId); diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 2a57c0224..7dcdd5bd3 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -131,7 +131,8 @@ typedef enum LogicalRepType extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, - int targetNodePort); + int targetNodePort, + bool skipInterShardRelationshipCreation); extern void ConflictWithIsolationTestingBeforeCopy(void); extern void ConflictWithIsolationTestingAfterCopy(void); @@ -177,7 +178,8 @@ extern void CompleteNonBlockingShardTransfer(List *shardList, HTAB *publicationInfoHash, List *logicalRepTargetList, HTAB *groupedLogicalRepTargetsHash, - LogicalRepType type); + LogicalRepType type, + bool skipInterShardRelationshipCreation); extern void CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList); extern void CreatePartitioningHierarchy(List *logicalRepTargetList); diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index cf5a6fd02..9fd44b71b 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -21,6 +21,11 @@ extern void EnsureReferenceTablesExistOnAllNodes(void); extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode); +extern int64 * ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, + char + transferMode, + int * + nDependTasks); extern bool HasNodesWithMissingReferenceTables(List **referenceTableList); extern uint32 CreateReferenceTableColocationId(void); extern uint32 GetReferenceTableColocationId(void); diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index dfadc0263..0696ef6e8 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -44,10 +44,11 @@ typedef enum AdvisoryLocktagClass ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8, ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9, ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID = 10, - ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, + ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, /* Not used anymore */ ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13, ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK = 14, - ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION = 15 + ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION = 15, + ADV_LOCKTAG_CLASS_CITUS_SHARD_MOVE = 16 } AdvisoryLocktagClass; /* CitusOperations has constants for citus operations */ @@ -84,6 +85,16 @@ typedef enum CitusOperations (uint32) (shardid), \ ADV_LOCKTAG_CLASS_CITUS_SHARD) +/* advisory lock for citus shard move/copy operations, + * also it has the database hardcoded to MyDatabaseId, + * to ensure the locks are local to each database */ +#define SET_LOCKTAG_SHARD_MOVE(tag, shardid) \ + SET_LOCKTAG_ADVISORY(tag, \ + MyDatabaseId, \ + (uint32) ((shardid) >> 32), \ + (uint32) (shardid), \ + ADV_LOCKTAG_CLASS_CITUS_SHARD_MOVE) + /* reuse advisory lock, but with different, unused field 4 (7) * Also it has the database hardcoded to MyDatabaseId, to ensure the locks * are local to each database */ @@ -124,16 +135,6 @@ typedef enum CitusOperations (uint32) operationId, \ ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID) -/* reuse advisory lock, but with different, unused field 4 (12) - * Also it has the database hardcoded to MyDatabaseId, to ensure the locks - * are local to each database */ -#define SET_LOCKTAG_LOGICAL_REPLICATION(tag) \ - SET_LOCKTAG_ADVISORY(tag, \ - MyDatabaseId, \ - (uint32) 0, \ - (uint32) 0, \ - ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION) - /* reuse advisory lock, but with different, unused field 4 (14) * Also it has the database hardcoded to MyDatabaseId, to ensure the locks * are local to each database */ diff --git a/src/include/distributed/shard_transfer.h b/src/include/distributed/shard_transfer.h index c1621879b..ae33c4ec3 100644 --- a/src/include/distributed/shard_transfer.h +++ b/src/include/distributed/shard_transfer.h @@ -23,10 +23,52 @@ typedef enum SHARD_TRANSFER_COPY = 2 } ShardTransferType; +/* + * ShardTransferOperationMode is used to pass flags to the shard transfer + * function. The flags are used to control the behavior of the transfer + * function. + */ +typedef enum +{ + /* + * This flag instructs the transfer function to only transfer single shard + * rather than transfer all the colocated shards for the shard interval. + * Using this flag mean we might break the colocated shard + * relationship on the source node. So this is only usefull when setting up + * the new node and we are sure that the node would not be used until we have + * transfered all the shards. + * The reason we need this flag is that we want to be able to transfer + * colocated shards in parallel and for now it is only used for the reference + * table shards. + * Finally if you are using this flag, you should also use consider defering + * the creation of the relationships on the source node until all colocated + * shards are transfered (see: SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS). + */ + SHARD_TRANSFER_SINGLE_SHARD_ONLY = 1 << 0, + + /* With this flag the shard transfer function does not create any constrainsts + * or foreign relations defined on the shard, This can be used to defer the + * creation of the relationships until all the shards are transfered. + * This is usefull when we are transfering colocated shards in parallel and + * we want to avoid the creation of the relationships on the source node + * until all the shards are transfered. + */ + SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS = 1 << 1, + + /* This flag is used to indicate that the shard transfer function should + * only create the relationships on the target node and not transfer any data. + * This is can be used to create the relationships that were defered + * during the transfering of shards. + */ + SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY = 1 << 2 +} ShardTransferOperationMode; + + extern void TransferShards(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, - char shardReplicationMode, ShardTransferType transferType); + char shardReplicationMode, ShardTransferType transferType, + uint32 optionFlags); extern uint64 ShardListSizeInBytes(List *colocatedShardList, char *workerNodeName, uint32 workerNodePort); extern void ErrorIfMoveUnsupportedTableType(Oid relationId);