diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 074f1bed0..e04ddd26f 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -222,6 +222,7 @@ typedef struct ShardMoveDependencies { HTAB *colocationDependencies; HTAB *nodeDependencies; + bool parallelTransferColocatedShards; } ShardMoveDependencies; char *VariablesToBePassedToNewConnections = NULL; @@ -270,7 +271,9 @@ static ShardCost GetShardCost(uint64 shardId, void *context); static List * NonColocatedDistRelationIdList(void); static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid); static int64 RebalanceTableShardsBackground(RebalanceOptions *options, Oid - shardReplicationModeOid); + shardReplicationModeOid, + bool ParallelTransferReferenceTables, + bool ParallelTransferColocatedShards); static void AcquireRebalanceColocationLock(Oid relationId, const char *operationName); static void ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, char *noticeOperation); @@ -296,9 +299,12 @@ static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStati static void ErrorOnConcurrentRebalance(RebalanceOptions *); static List * GetSetCommandListForNewConnections(void); static int64 GetColocationId(PlacementUpdateEvent *move); -static ShardMoveDependencies InitializeShardMoveDependencies(); -static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 - colocationId, +static ShardMoveDependencies InitializeShardMoveDependencies(bool + ParallelTransferColocatedShards); +static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, + int64 colocationId, + int64 *refTablesDepTaskIds, + int refTablesDepTaskIdsCount, ShardMoveDependencies shardMoveDependencies, int *nDepends); static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, @@ -1014,6 +1020,12 @@ citus_rebalance_start(PG_FUNCTION_ARGS) PG_ENSURE_ARGNOTNULL(2, "shard_transfer_mode"); Oid shardTransferModeOid = PG_GETARG_OID(2); + PG_ENSURE_ARGNOTNULL(3, "parallel_transfer_reference_tables"); + bool ParallelTransferReferenceTables = PG_GETARG_BOOL(3); + + PG_ENSURE_ARGNOTNULL(4, "parallel_transfer_colocated_shards"); + bool ParallelTransferColocatedShards = PG_GETARG_BOOL(4); + RebalanceOptions options = { .relationIdList = relationIdList, .threshold = strategy->defaultThreshold, @@ -1023,7 +1035,9 @@ citus_rebalance_start(PG_FUNCTION_ARGS) .rebalanceStrategy = strategy, .improvementThreshold = strategy->improvementThreshold, }; - int jobId = RebalanceTableShardsBackground(&options, shardTransferModeOid); + int jobId = RebalanceTableShardsBackground(&options, shardTransferModeOid, + ParallelTransferReferenceTables, + ParallelTransferColocatedShards); if (jobId == 0) { @@ -1988,17 +2002,20 @@ GetColocationId(PlacementUpdateEvent *move) * given colocation group and the other one is for tracking source nodes of all moves. */ static ShardMoveDependencies -InitializeShardMoveDependencies() +InitializeShardMoveDependencies(bool ParallelTransferColocatedShards) { ShardMoveDependencies shardMoveDependencies; shardMoveDependencies.colocationDependencies = CreateSimpleHashWithNameAndSize(int64, ShardMoveDependencyInfo, "colocationDependencyHashMap", 6); + shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int32, ShardMoveSourceNodeHashEntry, "nodeDependencyHashMap", 6); + shardMoveDependencies.parallelTransferColocatedShards = + ParallelTransferColocatedShards; return shardMoveDependencies; } @@ -2009,6 +2026,7 @@ InitializeShardMoveDependencies() */ static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, + int64 *refTablesDepTaskIds, int refTablesDepTaskIdsCount, ShardMoveDependencies shardMoveDependencies, int *nDepends) { HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64, @@ -2016,13 +2034,17 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, bool found; - /* Check if there exists a move in the same colocation group scheduled earlier. */ - ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( - shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, &found); - - if (found) + if (!shardMoveDependencies.parallelTransferColocatedShards) { - hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); + /* Check if there exists a move in the same colocation group scheduled earlier. */ + ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( + shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, & + found); + + if (found) + { + hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL); + } } /* @@ -2045,6 +2067,23 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId, } } + *nDepends = hash_get_num_entries(dependsList); + if (*nDepends == 0) + { + /* + * shard copy can only start after finishing copy of reference table shards + * so each shard task will have a dependency on the task that indicates the + * copy complete of reference tables + */ + while (refTablesDepTaskIdsCount > 0) + { + int64 refTableTaskId = *refTablesDepTaskIds; + hash_search(dependsList, &refTableTaskId, HASH_ENTER, NULL); + refTablesDepTaskIds++; + refTablesDepTaskIdsCount--; + } + } + *nDepends = hash_get_num_entries(dependsList); int64 *dependsArray = NULL; @@ -2076,9 +2115,13 @@ static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId, ShardMoveDependencies shardMoveDependencies) { - ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( - shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL); - shardMoveDependencyInfo->taskId = taskId; + if (!shardMoveDependencies.parallelTransferColocatedShards) + { + ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search( + shardMoveDependencies.colocationDependencies, &colocationId, + HASH_ENTER, NULL); + shardMoveDependencyInfo->taskId = taskId; + } bool found; ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search( @@ -2103,7 +2146,9 @@ UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int * background job+task infrastructure. */ static int64 -RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationModeOid) +RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationModeOid, + bool ParallelTransferReferenceTables, + bool ParallelTransferColocatedShards) { if (list_length(options->relationIdList) == 0) { @@ -2174,7 +2219,8 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo initStringInfo(&buf); List *referenceTableIdList = NIL; - int64 replicateRefTablesTaskId = 0; + int64 *refTablesDepTaskIds = NULL; + int refTablesDepTaskIdsCount = 0; if (HasNodesWithMissingReferenceTables(&referenceTableIdList)) { @@ -2187,22 +2233,41 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo * Reference tables need to be copied to (newly-added) nodes, this needs to be the * first task before we can move any other table. */ - appendStringInfo(&buf, - "SELECT pg_catalog.replicate_reference_tables(%s)", - quote_literal_cstr(shardTranferModeLabel)); + if (ParallelTransferReferenceTables) + { + refTablesDepTaskIds = + ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes( + jobId, shardTransferMode, &refTablesDepTaskIdsCount); + ereport(DEBUG2, + (errmsg("%d dependent copy reference table tasks for job %ld", + refTablesDepTaskIdsCount, jobId), + errdetail("Rebalance scheduled as background job"), + errhint("To monitor progress, run: SELECT * FROM " + "citus_rebalance_status();"))); + } + else + { + /* Move all reference tables as single task. Classical way */ + appendStringInfo(&buf, + "SELECT pg_catalog.replicate_reference_tables(%s)", + quote_literal_cstr(shardTranferModeLabel)); - int32 nodesInvolved[] = { 0 }; + int32 nodesInvolved[] = { 0 }; - /* replicate_reference_tables permissions require superuser */ - Oid superUserId = CitusExtensionOwner(); - BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data, 0, - NULL, 0, nodesInvolved); - replicateRefTablesTaskId = task->taskid; + /* replicate_reference_tables permissions require superuser */ + Oid superUserId = CitusExtensionOwner(); + BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data, 0, + NULL, 0, nodesInvolved); + refTablesDepTaskIds = palloc0(sizeof(int64)); + refTablesDepTaskIds[0] = task->taskid; + refTablesDepTaskIdsCount = 1; + } } PlacementUpdateEvent *move = NULL; - ShardMoveDependencies shardMoveDependencies = InitializeShardMoveDependencies(); + ShardMoveDependencies shardMoveDependencies = + InitializeShardMoveDependencies(ParallelTransferColocatedShards); foreach_declared_ptr(move, placementUpdateList) { @@ -2220,16 +2285,11 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo int nDepends = 0; int64 *dependsArray = GenerateTaskMoveDependencyList(move, colocationId, + refTablesDepTaskIds, + refTablesDepTaskIdsCount, shardMoveDependencies, &nDepends); - if (nDepends == 0 && replicateRefTablesTaskId > 0) - { - nDepends = 1; - dependsArray = palloc(nDepends * sizeof(int64)); - dependsArray[0] = replicateRefTablesTaskId; - } - int32 nodesInvolved[2] = { 0 }; nodesInvolved[0] = move->sourceNode->nodeId; nodesInvolved[1] = move->targetNode->nodeId; diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index b1202e648..d62f225ba 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1546,12 +1546,15 @@ NonBlockingShardSplit(SplitOperation splitOperation, * 9) Logically replicate all the changes and do most of the table DDL, * like index and foreign key creation. */ + bool skipInterShardRelationshipCreation = false; + CompleteNonBlockingShardTransfer(sourceColocatedShardIntervalList, sourceConnection, publicationInfoHash, logicalRepTargetList, groupedLogicalRepTargetsHash, - SHARD_SPLIT); + SHARD_SPLIT, + skipInterShardRelationshipCreation); /* * 10) Delete old shards metadata and mark the shards as to be deferred drop. diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index b7d07b2cf..410280360 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -107,16 +107,18 @@ static void ErrorIfSameNode(char *sourceNodeName, int sourceNodePort, static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, bool useLogicalReplication, - const char *operationName); + const char *operationName, uint32 optionFlags); static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort); + int32 targetNodePort, + uint32 optionFlags); static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, - char *targetNodeName, int32 targetNodePort); + char *targetNodeName, int32 targetNodePort, + uint32 optionFlags); static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, int32 targetNodePort); @@ -165,7 +167,8 @@ static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval, static ShardCommandList * CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList); static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode); - +static void AcquireShardPlacementLock(uint64_t shardId, int lockMode, Oid relationId, + const char *operationName); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_copy_shard_placement); @@ -174,7 +177,7 @@ PG_FUNCTION_INFO_V1(master_copy_shard_placement); PG_FUNCTION_INFO_V1(citus_move_shard_placement); PG_FUNCTION_INFO_V1(citus_move_shard_placement_with_nodeid); PG_FUNCTION_INFO_V1(master_move_shard_placement); - +PG_FUNCTION_INFO_V1(citus_internal_copy_single_shard_placement); double DesiredPercentFreeAfterMove = 10; bool CheckAvailableSpaceBeforeMove = true; @@ -203,7 +206,7 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } @@ -232,7 +235,7 @@ citus_copy_shard_placement_with_nodeid(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, targetNode->workerName, targetNode->workerPort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } @@ -267,13 +270,69 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_COPY); + shardReplicationMode, SHARD_TRANSFER_COPY, 0); PG_RETURN_VOID(); } +/* + * citus_internal_copy_single_shard_placement is an internal function that + * copies a single shard placement from a source node to a target node. + * It has two main differences from citus_copy_shard_placement: + * 1. it copies only a single shard placement, not all colocated shards + * 2. It allows to defer the constraints creation and this same function + * can be used to create the constraints later. + * + * The primary use case for this function is to transfer the shards of + * reference tables. Since all reference tables are colocated together, + * and each reference table has only one shard, this function can be used + * to transfer the shards of reference tables in parallel. + * Furthermore, the reference tables could have relations with + * other reference tables, so we need to ensure that their constraints + * are also transferred after copying the shards to the target node. + * For this reason, we allow the caller to defer the constraints creation. + * + * This function is not supposed to be called by the user directly. + */ +Datum +citus_internal_copy_single_shard_placement(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + int64 shardId = PG_GETARG_INT64(0); + uint32 sourceNodeId = PG_GETARG_INT32(1); + uint32 targetNodeId = PG_GETARG_INT32(2); + uint32 flags = PG_GETARG_INT32(3); + Oid shardReplicationModeOid = PG_GETARG_OID(4); + + bool missingOk = false; + WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk); + WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk); + + char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); + + /* + * This is an internal function that is used by the rebalancer. + * It is not supposed to be called by the user directly. + */ + if (!IsRebalancerInternalBackend()) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("This is an internal Citus function that can only" + " be used by a rebalancer task"))); + } + + TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, + targetNode->workerName, targetNode->workerPort, + shardReplicationMode, SHARD_TRANSFER_COPY, flags); + + PG_RETURN_VOID(); +} + + /* * citus_move_shard_placement moves given shard (and its co-located shards) from one * node to the other node. To accomplish this it entirely recreates the table structure @@ -315,7 +374,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); TransferShards(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - shardReplicationMode, SHARD_TRANSFER_MOVE); + shardReplicationMode, SHARD_TRANSFER_MOVE, 0); PG_RETURN_VOID(); } @@ -343,20 +402,77 @@ citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS) char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, targetNode->workerName, - targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE); + targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE, 0); PG_RETURN_VOID(); } /* - * TransferShards is the function for shard transfers. + * AcquireShardPlacementLock tries to acquire a lock on the shardid + * while moving/copying the shard placement. If this + * is it not possible it fails instantly because this means + * another move/copy on same shard is currently happening. */ +static void +AcquireShardPlacementLock(uint64_t shardId, int lockMode, Oid relationId, + const char *operationName) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = true; + + SET_LOCKTAG_SHARD_MOVE(tag, shardId); + + LockAcquireResult lockAcquired = LockAcquire(&tag, lockMode, sessionLock, dontWait); + if (!lockAcquired) + { + ereport(ERROR, (errmsg("could not acquire the lock required to %s %s", + operationName, + generate_qualified_relation_name(relationId)), + errdetail("It means that either a concurrent shard move " + "or colocated distributed table creation is " + "happening."), + errhint("Make sure that the concurrent operation has " + "finished and re-run the command"))); + } +} + + +/* + * TransferShards is responsible for handling shard transfers. + * + * The optionFlags parameter controls the transfer behavior: + * + * - By default, shard colocation groups are treated as a single unit. This works + * well for distributed tables, since they can contain multiple colocated shards + * on the same node, and shard transfers can still be parallelized at the group level. + * + * - Reference tables are different: every reference table belongs to the same + * colocation group but has only a single shard. To parallelize reference table + * transfers, we must bypass the colocation group. The + * SHARD_TRANSFER_SINGLE_SHARD_ONLY flag enables this behavior by transferring + * only the specific shardId passed into the function, ignoring colocated shards. + * + * - Reference tables may also define foreign key relationships with each other. + * Since we cannot create those relationships until all shards have been moved, + * the SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS flag is used to defer their + * creation until shard transfer completes. + * + * - After shards are transferred, the SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY + * flag is used to create the foreign key relationships for already-transferred + * reference tables. + * + * Currently, optionFlags are only used to customize reference table transfers. + * For distributed tables, optionFlags should always be set to 0. + * passing 0 as optionFlags means that the default behavior will be used for + * all aspects of the shard transfer. That is to consider all colocated shards + * as a single unit and return after creating the necessary relationships. */ void TransferShards(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, char shardReplicationMode, - ShardTransferType transferType) + ShardTransferType transferType, uint32 optionFlags) { /* strings to be used in log messages */ const char *operationName = ShardTransferTypeNames[transferType]; @@ -385,20 +501,36 @@ TransferShards(int64 shardId, char *sourceNodeName, ErrorIfTargetNodeIsNotSafeForTransfer(targetNodeName, targetNodePort, transferType); - AcquirePlacementColocationLock(distributedTableId, ExclusiveLock, operationName); + AcquirePlacementColocationLock(distributedTableId, RowExclusiveLock, operationName); - List *colocatedTableList = ColocatedTableList(distributedTableId); - List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + List *colocatedTableList; + List *colocatedShardList; + + /* + * If SHARD_TRANSFER_SINGLE_SHARD_ONLY is set, we only transfer a single shard + * specified by shardId. Otherwise, we transfer all colocated shards. + */ + bool isSingleShardOnly = optionFlags & SHARD_TRANSFER_SINGLE_SHARD_ONLY; + + if (isSingleShardOnly) + { + colocatedTableList = list_make1_oid(distributedTableId); + colocatedShardList = list_make1(shardInterval); + } + else + { + colocatedTableList = ColocatedTableList(distributedTableId); + colocatedShardList = ColocatedShardIntervalList(shardInterval); + } EnsureTableListOwner(colocatedTableList); if (transferType == SHARD_TRANSFER_MOVE) { /* - * Block concurrent DDL / TRUNCATE commands on the relation. Similarly, - * block concurrent citus_move_shard_placement() on any shard of - * the same relation. This is OK for now since we're executing shard - * moves sequentially anyway. + * Block concurrent DDL / TRUNCATE commands on the relation. while, + * allow concurrent citus_move_shard_placement() on the shards of + * the same relation. */ LockColocatedRelationsForMove(colocatedTableList); } @@ -412,14 +544,66 @@ TransferShards(int64 shardId, char *sourceNodeName, /* * We sort shardIntervalList so that lock operations will not cause any - * deadlocks. + * deadlocks. But we do not need to do that if the list contain only one + * shard. */ - colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + if (!isSingleShardOnly) + { + colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + } - if (TransferAlreadyCompleted(colocatedShardList, - sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort, - transferType)) + /* We have pretty much covered the concurrent rebalance operations + * and we want to allow concurrent moves within the same colocation group. + * but at the same time we want to block the concurrent moves on the same shard + * placement. So we lock the shard moves before starting the transfer. + */ + foreach_declared_ptr(shardInterval, colocatedShardList) + { + int64 shardIdToLock = shardInterval->shardId; + AcquireShardPlacementLock(shardIdToLock, ExclusiveLock, distributedTableId, + operationName); + } + + bool transferAlreadyCompleted = TransferAlreadyCompleted(colocatedShardList, + sourceNodeName, + sourceNodePort, + targetNodeName, + targetNodePort, + transferType); + + /* + * If we just need to create the shard relationships,We don't need to do anything + * else other than calling CopyShardTables with SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY + * flag. + */ + bool createRelationshipsOnly = optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY; + + if (createRelationshipsOnly) + { + if (!transferAlreadyCompleted) + { + /* + * if the transfer is not completed, and we are here just to create + * the relationships, we can return right away + */ + ereport(WARNING, (errmsg("shard is not present on node %s:%d", + targetNodeName, targetNodePort), + errdetail("%s may have not completed.", + operationNameCapitalized))); + return; + } + + CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName + , + targetNodePort, (shardReplicationMode == + TRANSFER_MODE_FORCE_LOGICAL), + operationFunctionName, optionFlags); + + /* We don't need to do anything else, just return */ + return; + } + + if (transferAlreadyCompleted) { /* if the transfer is already completed, we can return right away */ ereport(WARNING, (errmsg("shard is already present on node %s:%d", @@ -515,7 +699,8 @@ TransferShards(int64 shardId, char *sourceNodeName, } CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort, useLogicalReplication, operationFunctionName); + targetNodePort, useLogicalReplication, operationFunctionName, + optionFlags); if (transferType == SHARD_TRANSFER_MOVE) { @@ -676,7 +861,7 @@ IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetN /* * LockColocatedRelationsForMove takes a list of relations, locks all of them - * using ShareUpdateExclusiveLock + * using ShareLock */ static void LockColocatedRelationsForMove(List *colocatedTableList) @@ -684,7 +869,7 @@ LockColocatedRelationsForMove(List *colocatedTableList) Oid colocatedTableId = InvalidOid; foreach_declared_oid(colocatedTableId, colocatedTableList) { - LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock); + LockRelationOid(colocatedTableId, RowExclusiveLock); } } @@ -1333,7 +1518,7 @@ ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList) static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, bool useLogicalReplication, - const char *operationName) + const char *operationName, uint32 optionFlags) { if (list_length(shardIntervalList) < 1) { @@ -1343,16 +1528,22 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP /* Start operation to prepare for generating cleanup records */ RegisterOperationNeedingCleanup(); - if (useLogicalReplication) + bool createRelationshipsOnly = optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY; + + /* + * If we're just going to create relationships only always use + * CopyShardTablesViaBlockWrites. + */ + if (useLogicalReplication && !createRelationshipsOnly) { CopyShardTablesViaLogicalReplication(shardIntervalList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort); + targetNodePort, optionFlags); } else { CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort); + targetNodeName, targetNodePort, optionFlags); } /* @@ -1369,7 +1560,7 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort) + int32 targetNodePort, uint32 optionFlags) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaLogicalReplication", @@ -1407,9 +1598,13 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa MemoryContextSwitchTo(oldContext); + bool skipRelationshipCreation = (optionFlags & + SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS); + /* data copy is done seperately when logical replication is used */ LogicallyReplicateShards(shardIntervalList, sourceNodeName, - sourceNodePort, targetNodeName, targetNodePort); + sourceNodePort, targetNodeName, targetNodePort, + skipRelationshipCreation); } @@ -1437,7 +1632,7 @@ CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList) static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort) + int32 targetNodePort, uint32 optionFlags) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaBlockWrites", @@ -1446,127 +1641,150 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); - - /* iterate through the colocated shards and copy each */ ShardInterval *shardInterval = NULL; - foreach_declared_ptr(shardInterval, shardIntervalList) - { - /* - * For each shard we first create the shard table in a separate - * transaction and then we copy the data and create the indexes in a - * second separate transaction. The reason we don't do both in a single - * transaction is so we can see the size of the new shard growing - * during the copy when we run get_rebalance_progress in another - * session. If we wouldn't split these two phases up, then the table - * wouldn't be visible in the session that get_rebalance_progress uses. - * So get_rebalance_progress would always report its size as 0. - */ - List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, - sourceNodePort); - char *tableOwner = TableOwner(shardInterval->relationId); - /* drop the shard we created on the target, in case of failure */ - InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, - ConstructQualifiedShardName(shardInterval), - GroupForNode(targetNodeName, - targetNodePort), - CLEANUP_ON_FAILURE); - - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); - } - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_COPYING_DATA); - - ConflictWithIsolationTestingBeforeCopy(); - CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); - ConflictWithIsolationTestingAfterCopy(); - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS); - - foreach_declared_ptr(shardInterval, shardIntervalList) - { - List *ddlCommandList = - PostLoadShardCreationCommandList(shardInterval, sourceNodeName, - sourceNodePort); - char *tableOwner = TableOwner(shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); - - MemoryContextReset(localContext); - } + bool createRelationshipsOnly = optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY; /* - * Once all shards are copied, we can recreate relationships between shards. - * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. + * If we’re only asked to create the relationships, the shards are already + * present and populated on the node. Skip the table‑setup and data‑loading + * steps and proceed straight to creating the relationships. */ - List *shardIntervalWithDDCommandsList = NIL; - foreach_declared_ptr(shardInterval, shardIntervalList) + if (!createRelationshipsOnly) { - if (PartitionTable(shardInterval->relationId)) + /* iterate through the colocated shards and copy each */ + foreach_declared_ptr(shardInterval, shardIntervalList) { - char *attachPartitionCommand = - GenerateAttachShardPartitionCommand(shardInterval); + /* + * For each shard we first create the shard table in a separate + * transaction and then we copy the data and create the indexes in a + * second separate transaction. The reason we don't do both in a single + * transaction is so we can see the size of the new shard growing + * during the copy when we run get_rebalance_progress in another + * session. If we wouldn't split these two phases up, then the table + * wouldn't be visible in the session that get_rebalance_progress uses. + * So get_rebalance_progress would always report its size as 0. + */ + List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, + sourceNodeName, + sourceNodePort); + char *tableOwner = TableOwner(shardInterval->relationId); - ShardCommandList *shardCommandList = CreateShardCommandList( - shardInterval, - list_make1(attachPartitionCommand)); - shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, - shardCommandList); + /* drop the shard we created on the target, in case of failure */ + InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, + ConstructQualifiedShardName( + shardInterval), + GroupForNode(targetNodeName, + targetNodePort), + CLEANUP_ON_FAILURE); + + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + } + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COPYING_DATA); + + ConflictWithIsolationTestingBeforeCopy(); + CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); + ConflictWithIsolationTestingAfterCopy(); + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS); + + foreach_declared_ptr(shardInterval, shardIntervalList) + { + List *ddlCommandList = + PostLoadShardCreationCommandList(shardInterval, sourceNodeName, + sourceNodePort); + char *tableOwner = TableOwner(shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); + + MemoryContextReset(localContext); } } - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); - /* - * Iterate through the colocated shards and create DDL commamnds - * to create the foreign constraints. + * Skip creating shard relationships if the caller has requested that they + * not be created. */ - foreach_declared_ptr(shardInterval, shardIntervalList) + bool skipRelationshipCreation = (optionFlags & + SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS); + + if (!skipRelationshipCreation) { - List *shardForeignConstraintCommandList = NIL; - List *referenceTableForeignConstraintList = NIL; + /* + * Once all shards are copied, we can recreate relationships between shards. + * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. + */ + List *shardIntervalWithDDCommandsList = NIL; + foreach_declared_ptr(shardInterval, shardIntervalList) + { + if (PartitionTable(shardInterval->relationId)) + { + char *attachPartitionCommand = + GenerateAttachShardPartitionCommand(shardInterval); - CopyShardForeignConstraintCommandListGrouped(shardInterval, - &shardForeignConstraintCommandList, - &referenceTableForeignConstraintList); + ShardCommandList *shardCommandList = CreateShardCommandList( + shardInterval, + list_make1(attachPartitionCommand)); + shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, + shardCommandList); + } + } - ShardCommandList *shardCommandList = CreateShardCommandList( - shardInterval, - list_concat(shardForeignConstraintCommandList, - referenceTableForeignConstraintList)); - shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, - shardCommandList); + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); + + /* + * Iterate through the colocated shards and create DDL commamnds + * to create the foreign constraints. + */ + foreach_declared_ptr(shardInterval, shardIntervalList) + { + List *shardForeignConstraintCommandList = NIL; + List *referenceTableForeignConstraintList = NIL; + + CopyShardForeignConstraintCommandListGrouped(shardInterval, + & + shardForeignConstraintCommandList, + & + referenceTableForeignConstraintList); + + ShardCommandList *shardCommandList = CreateShardCommandList( + shardInterval, + list_concat(shardForeignConstraintCommandList, + referenceTableForeignConstraintList)); + shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, + shardCommandList); + } + + /* Now execute the Partitioning & Foreign constraints creation commads. */ + ShardCommandList *shardCommandList = NULL; + foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList) + { + char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, + shardCommandList->ddlCommandList); + } + + UpdatePlacementUpdateStatusForShardIntervalList( + shardIntervalList, + sourceNodeName, + sourceNodePort, + PLACEMENT_UPDATE_STATUS_COMPLETING); } - - /* Now execute the Partitioning & Foreign constraints creation commads. */ - ShardCommandList *shardCommandList = NULL; - foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList) - { - char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, - shardCommandList->ddlCommandList); - } - - UpdatePlacementUpdateStatusForShardIntervalList( - shardIntervalList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_COMPLETING); - MemoryContextReset(localContext); MemoryContextSwitchTo(oldContext); } @@ -1647,7 +1865,8 @@ CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardInte ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList, MaxAdaptiveExecutorPoolSize, - NULL /* jobIdList (ignored by API implementation) */); + NULL /* jobIdList (ignored by API implementation) */ + ); } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 7189216d0..4c43d3513 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -118,7 +118,8 @@ static List * GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shard static List * GetIndexCommandListForShardBackingReplicaIdentity(Oid relationId, uint64 shardId); static void CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList, - LogicalRepType type); + LogicalRepType type, + bool skipInterShardRelationships); static void ExecuteCreateIndexCommands(List *logicalRepTargetList); static void ExecuteCreateConstraintsBackedByIndexCommands(List *logicalRepTargetList); static List * ConvertNonExistingPlacementDDLCommandsToTasks(List *shardCommandList, @@ -132,7 +133,6 @@ static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); static void WaitForMiliseconds(long timeout); static XLogRecPtr GetSubscriptionPosition( GroupedLogicalRepTargets *groupedLogicalRepTargets); -static void AcquireLogicalReplicationLock(void); static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode, List *shardIntervals); @@ -154,9 +154,9 @@ static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition, */ void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, - char *targetNodeName, int targetNodePort) + char *targetNodeName, int targetNodePort, + bool skipInterShardRelationshipCreation) { - AcquireLogicalReplicationLock(); char *superUser = CitusExtensionOwnerName(); char *databaseName = get_database_name(MyDatabaseId); int connectionFlags = FORCE_NEW_CONNECTION; @@ -258,7 +258,8 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo publicationInfoHash, logicalRepTargetList, groupedLogicalRepTargetsHash, - SHARD_MOVE); + SHARD_MOVE, + skipInterShardRelationshipCreation); /* * We use these connections exclusively for subscription management, @@ -317,7 +318,8 @@ CompleteNonBlockingShardTransfer(List *shardList, HTAB *publicationInfoHash, List *logicalRepTargetList, HTAB *groupedLogicalRepTargetsHash, - LogicalRepType type) + LogicalRepType type, + bool skipInterShardRelationshipCreation) { /* Start applying the changes from the replication slots to catch up. */ EnableSubscriptions(logicalRepTargetList); @@ -345,7 +347,8 @@ CompleteNonBlockingShardTransfer(List *shardList, * and partitioning hierarchy. Once they are done, wait until the replication * catches up again. So we don't block writes too long. */ - CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList, type); + CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList, type, + skipInterShardRelationshipCreation); UpdatePlacementUpdateStatusForShardIntervalList( shardList, @@ -372,7 +375,7 @@ CompleteNonBlockingShardTransfer(List *shardList, WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); - if (type != SHARD_SPLIT) + if (type != SHARD_SPLIT && !skipInterShardRelationshipCreation) { UpdatePlacementUpdateStatusForShardIntervalList( shardList, @@ -497,25 +500,6 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList) } -/* - * AcquireLogicalReplicationLock tries to acquire a lock for logical - * replication. We need this lock, because at the start of logical replication - * we clean up old subscriptions and publications. Because of this cleanup it's - * not safe to run multiple logical replication based shard moves at the same - * time. If multiple logical replication moves would run at the same time, the - * second move might clean up subscriptions and publications that are in use by - * another move. - */ -static void -AcquireLogicalReplicationLock(void) -{ - LOCKTAG tag; - SET_LOCKTAG_LOGICAL_REPLICATION(tag); - - LockAcquire(&tag, ExclusiveLock, false, false); -} - - /* * PrepareReplicationSubscriptionList returns list of shards to be logically * replicated from given shard list. This is needed because Postgres does not @@ -675,7 +659,8 @@ GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shardId) */ static void CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList, - LogicalRepType type) + LogicalRepType type, + bool skipInterShardRelationships) { /* * We create indexes in 4 steps. @@ -705,7 +690,7 @@ CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList, /* * Creating the partitioning hierarchy errors out in shard splits when */ - if (type != SHARD_SPLIT) + if (type != SHARD_SPLIT && !skipInterShardRelationships) { /* create partitioning hierarchy, if any */ CreatePartitioningHierarchy(logicalRepTargetList); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index e2d21d074..0ad977d30 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1967,7 +1967,7 @@ RegisterCitusConfigVariables(void) "because total background worker count is shared by all background workers. The value " "represents the possible maximum number of task executors."), &MaxBackgroundTaskExecutors, - 4, 1, MAX_BG_TASK_EXECUTORS, + 1, 1, MAX_BG_TASK_EXECUTORS, PGC_SIGHUP, GUC_STANDARD, NULL, NULL, NULL); diff --git a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql index f415fff88..624abf5f9 100644 --- a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql +++ b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql @@ -1,7 +1,11 @@ -- citus--13.1-1--13.2-1 + -- bump version to 13.2-1 #include "udfs/worker_last_saved_explain_analyze/13.2-1.sql" +#include "udfs/citus_rebalance_start/13.2-1.sql" +#include "udfs/citus_internal_copy_single_shard_placement/13.2-1.sql" + #include "udfs/citus_finish_pg_upgrade/13.2-1.sql" DO $drop_leftover_old_columnar_objects$ diff --git a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql index 032c45e60..23e55feae 100644 --- a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql @@ -1,5 +1,9 @@ -- citus--13.2-1--13.1-1 -- downgrade version to 13.1-1 +DROP FUNCTION IF EXISTS citus_internal.citus_internal_copy_single_shard_placement(bigint, integer, integer, integer, citus.shard_transfer_mode); + +DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean); +#include "../udfs/citus_rebalance_start/11.1-1.sql" DROP FUNCTION IF EXISTS pg_catalog.worker_last_saved_explain_analyze(); #include "../udfs/worker_last_saved_explain_analyze/9.4-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/13.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/13.2-1.sql new file mode 100644 index 000000000..bbb2fb63b --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/13.2-1.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION citus_internal.citus_internal_copy_single_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + flags integer, + transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_internal_copy_single_shard_placement$$; diff --git a/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/latest.sql new file mode 100644 index 000000000..bbb2fb63b --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_copy_single_shard_placement/latest.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION citus_internal.citus_internal_copy_single_shard_placement( + shard_id bigint, + source_node_id integer, + target_node_id integer, + flags integer, + transfer_mode citus.shard_transfer_mode default 'auto') +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_internal_copy_single_shard_placement$$; diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql b/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql new file mode 100644 index 000000000..b3886f7a2 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql @@ -0,0 +1,15 @@ +DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode); + +CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start( + rebalance_strategy name DEFAULT NULL, + drain_only boolean DEFAULT false, + shard_transfer_mode citus.shard_transfer_mode default 'auto', + parallel_transfer_reference_tables boolean DEFAULT false, + parallel_transfer_colocated_shards boolean DEFAULT false + ) + RETURNS bigint + AS 'MODULE_PATHNAME' + LANGUAGE C VOLATILE; +COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean) + IS 'rebalance the shards in the cluster in the background'; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean) TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql b/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql index cc84d3142..b3886f7a2 100644 --- a/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql @@ -1,11 +1,15 @@ +DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode); + CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start( rebalance_strategy name DEFAULT NULL, drain_only boolean DEFAULT false, - shard_transfer_mode citus.shard_transfer_mode default 'auto' + shard_transfer_mode citus.shard_transfer_mode default 'auto', + parallel_transfer_reference_tables boolean DEFAULT false, + parallel_transfer_colocated_shards boolean DEFAULT false ) RETURNS bigint AS 'MODULE_PATHNAME' LANGUAGE C VOLATILE; -COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode) +COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean) IS 'rebalance the shards in the cluster in the background'; -GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode) TO PUBLIC; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean) TO PUBLIC; diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 9ce29a99c..4dcc201d2 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -99,7 +99,7 @@ double DistributedDeadlockDetectionTimeoutFactor = 2.0; int Recover2PCInterval = 60000; int DeferShardDeleteInterval = 15000; int BackgroundTaskQueueCheckInterval = 5000; -int MaxBackgroundTaskExecutors = 4; +int MaxBackgroundTaskExecutors = 1; char *MainDb = ""; /* config variables for metadata sync timeout */ diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 8f0d89fc9..3e7ccef6c 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -27,6 +27,7 @@ #include "distributed/colocation_utils.h" #include "distributed/commands.h" #include "distributed/coordinator_protocol.h" +#include "distributed/hash_helpers.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" @@ -37,20 +38,27 @@ #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" +#include "distributed/shard_transfer.h" #include "distributed/shardinterval_utils.h" #include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" + /* local function forward declarations */ static List * WorkersWithoutReferenceTablePlacement(uint64 shardId, LOCKMODE lockMode); -static StringInfo CopyShardPlacementToWorkerNodeQuery( - ShardPlacement *sourceShardPlacement, - WorkerNode *workerNode, - char transferMode); +static StringInfo CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement + , + WorkerNode *workerNode, + char transferMode); static bool AnyRelationsModifiedInTransaction(List *relationIdList); static List * ReplicatedMetadataSyncedDistributedTableList(void); static bool NodeHasAllReferenceTableReplicas(WorkerNode *workerNode); +typedef struct ShardTaskEntry +{ + uint64 shardId; + int64 taskId; +} ShardTaskEntry; /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(upgrade_to_reference_table); @@ -179,7 +187,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) if (list_length(newWorkersList) == 0) { /* - * All workers alreaddy have a copy of the reference tables, make sure that + * All workers already have a copy of the reference tables, make sure that * any locks obtained earlier are released. It will probably not matter, but * we release the locks in the reverse order we obtained them in. */ @@ -230,7 +238,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) WorkerNode *newWorkerNode = NULL; foreach_declared_ptr(newWorkerNode, newWorkersList) { - ereport(NOTICE, (errmsg("replicating reference table '%s' to %s:%d ...", + ereport(DEBUG2, (errmsg("replicating reference table '%s' to %s:%d ...", referenceTableName, newWorkerNode->workerName, newWorkerNode->workerPort))); @@ -279,6 +287,386 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) CloseConnection(connection); } + /* + * Since reference tables have been copied via a loopback connection we do not have + * to retain our locks. Since Citus only runs well in READ COMMITTED mode we can be + * sure that other transactions will find the reference tables copied. + * We have obtained and held multiple locks, here we unlock them all in the reverse + * order we have obtained them in. + */ + for (int releaseLockmodeIndex = lengthof(lockmodes) - 1; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } +} + + +/* + * ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes is essentially a + * twin of EnsureReferenceTablesExistOnAllNodesExtended. The difference is instead of + * copying the missing tables on to the worker nodes this function creates the background + * tasks for each required copy operation and schedule it in the background job. + * Another difference is that instead of moving all the colocated shards sequencially + * this function creates a seperate background task for each shard, even when the shards + * are part of same colocated shard group. + * + * For transfering the shards in parallel the function creates a task for each shard + * move and than schedules another task that creates the shard relationships (if any) + * between shards and that task wait for the completion of all shard transfer tasks. + * + * The function returns an array of task ids that are created for creating the shard + * relationships, effectively completion of these tasks signals the completion of + * of reference table setup on the worker nodes. Any process that needs to wait for + * the completion of the reference table setup can wait for these tasks to complete. + * + * The transferMode is passed to this function gets ignored for now and it only uses + * block write mode. + */ +int64 * +ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char transferMode + , + int *nDependTasks) +{ + List *referenceTableIdList = NIL; + uint64 shardId = INVALID_SHARD_ID; + List *newWorkersList = NIL; + int64 *dependsTaskArray = NULL; + const char *referenceTableName = NULL; + int colocationId = GetReferenceTableColocationId(); + + *nDependTasks = 0; + if (colocationId == INVALID_COLOCATION_ID) + { + /* we have no reference table yet. */ + return 0; + } + + /* + * Most of the time this function should result in a conclusion where we do not need + * to copy any reference tables. To prevent excessive locking the majority of the time + * we run our precondition checks first with a lower lock. If, after checking with the + * lower lock, that we might need to copy reference tables we check with a more + * aggressive and self conflicting lock. It is important to be self conflicting in the + * second run to make sure that two concurrent calls to this routine will actually not + * run concurrently after the initial check. + * + * If after two iterations of precondition checks we still find the need for copying + * reference tables we exit the loop with all locks held. This will prevent concurrent + * DROP TABLE and create_reference_table calls so that the list of reference tables we + * operate on are stable. + * + * + * Since the changes to the reference table placements are made via loopback + * connections we release the locks held at the end of this function. Due to Citus + * only running transactions in READ COMMITTED mode we can be sure that other + * transactions correctly find the metadata entries. + */ + LOCKMODE lockmodes[] = { AccessShareLock, ExclusiveLock }; + for (int lockmodeIndex = 0; lockmodeIndex < lengthof(lockmodes); lockmodeIndex++) + { + LockColocationId(colocationId, lockmodes[lockmodeIndex]); + + referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE); + if (referenceTableIdList == NIL) + { + /* + * No reference tables exist, make sure that any locks obtained earlier are + * released. It will probably not matter, but we release the locks in the + * reverse order we obtained them in. + */ + for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return 0; + } + + Oid referenceTableId = linitial_oid(referenceTableIdList); + referenceTableName = get_rel_name(referenceTableId); + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + if (list_length(shardIntervalList) == 0) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", + referenceTableName))); + } + + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + shardId = shardInterval->shardId; + + /* + * We only take an access share lock, otherwise we'll hold up citus_add_node. + * In case of create_reference_table() where we don't want concurrent writes + * to pg_dist_node, we have already acquired ShareLock on pg_dist_node. + */ + newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, AccessShareLock); + if (list_length(newWorkersList) == 0) + { + /* + * All workers already have a copy of the reference tables, make sure that + * any locks obtained earlier are released. It will probably not matter, but + * we release the locks in the reverse order we obtained them in. + */ + for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0; + releaseLockmodeIndex--) + { + UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); + } + return 0; + } + } + + /* + * citus_copy_shard_placement triggers metadata sync-up, which tries to + * acquire a ShareLock on pg_dist_node. We do master_copy_shad_placement + * in a separate connection. If we have modified pg_dist_node in the + * current backend, this will cause a deadlock. + */ + if (TransactionModifiedNodeMetadata) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate reference tables in a transaction " + "that modified node metadata"))); + } + + /* + * Modifications to reference tables in current transaction are not visible + * to citus_copy_shard_placement, since it is done in a separate backend. + */ + if (AnyRelationsModifiedInTransaction(referenceTableIdList)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate reference tables in a transaction " + "that modified a reference table"))); + } + + bool missingOk = false; + ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk); + if (sourceShardPlacement == NULL) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table shard " + UINT64_FORMAT + " does not have an active shard placement", + shardId))); + } + + WorkerNode *newWorkerNode = NULL; + BackgroundTask *task = NULL; + StringInfoData buf = { 0 }; + initStringInfo(&buf); + List *depTasksList = NIL; + const char *transferModeString = + transferMode == TRANSFER_MODE_BLOCK_WRITES ? "block_writes" : + transferMode == TRANSFER_MODE_FORCE_LOGICAL ? "force_logical" : + "auto"; + + + foreach_declared_ptr(newWorkerNode, newWorkersList) + { + ereport(DEBUG2, (errmsg("replicating reference table '%s' to %s:%d ...", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort))); + + Oid relationId = InvalidOid; + List *nodeTasksList = NIL; + HTAB *shardTaskMap = CreateSimpleHashWithNameAndSize(uint64, + ShardTaskEntry, + "Shard_Task_Map", + list_length( + referenceTableIdList)); + + foreach_declared_oid(relationId, referenceTableIdList) + { + referenceTableName = get_rel_name(relationId); + List *shardIntervalList = LoadShardIntervalList(relationId); + if (list_length(shardIntervalList) != 1) + { + ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", + referenceTableName))); + } + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + shardId = shardInterval->shardId; + + resetStringInfo(&buf); + uint32 shardTransferFlags = SHARD_TRANSFER_SINGLE_SHARD_ONLY | + SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS; + + /* Temporary hack until we get background task config support PR */ + appendStringInfo(&buf, + "SET LOCAL application_name TO '%s%ld';", + CITUS_REBALANCER_APPLICATION_NAME_PREFIX, + GetGlobalPID()); + + /* + * In first step just create and load data in the shards but defer the + * creation of the shard relationships to the next step. + * The reason we want to defer the creation of the shard relationships is that + * we want to make sure that all the parallel shard copy task are finished + * before we create the relationships. Otherwise we might end up with + * a situation where the dependent-shard task is still running and trying to + * create the shard relationships will result in ERROR. + */ + appendStringInfo(&buf, + "SELECT " + "citus_internal.citus_internal_copy_single_shard_placement" + "(%ld,%u,%u,%u,%s)", + shardId, + sourceShardPlacement->nodeId, + newWorkerNode->nodeId, + shardTransferFlags, + quote_literal_cstr(transferModeString)); + + ereport(DEBUG2, + (errmsg("replicating reference table '%s' to %s:%d ... QUERY= %s", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort, buf.data))); + + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); + List *relatedRelations = list_concat(cacheEntry-> + referencedRelationsViaForeignKey, + cacheEntry-> + referencingRelationsViaForeignKey); + List *dependencyTaskList = NIL; + + Oid relatedRelationId = InvalidOid; + foreach_declared_oid(relatedRelationId, relatedRelations) + { + if (!list_member_oid(referenceTableIdList, relatedRelationId)) + { + continue; + } + + List *relatedShardIntervalList = LoadShardIntervalList( + relatedRelationId); + ShardInterval *relatedShardInterval = (ShardInterval *) linitial( + relatedShardIntervalList); + uint64 relatedShardId = relatedShardInterval->shardId; + bool taskFound = false; + ShardTaskEntry *taskEntry = hash_search(shardTaskMap, &relatedShardId, + HASH_FIND, &taskFound); + if (taskFound) + { + dependencyTaskList = lappend(dependencyTaskList, taskEntry); + } + } + + int nDepends = list_length(dependencyTaskList); + int64 *dependsArray = NULL; + if (nDepends > 0) + { + dependsArray = (int64 *) palloc(sizeof(int64) * nDepends); + int i = 0; + ListCell *lc; + foreach(lc, dependencyTaskList) + { + dependsArray[i++] = ((ShardTaskEntry *) lfirst(lc))->taskId; + } + } + + int32 nodesInvolved[2] = { 0 }; + nodesInvolved[0] = sourceShardPlacement->nodeId; + nodesInvolved[1] = newWorkerNode->nodeId; + + task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data, + nDepends, + dependsArray, 2, + nodesInvolved); + + bool found = false; + ShardTaskEntry *taskEntry = hash_search(shardTaskMap, &shardId, HASH_ENTER, + &found); + if (!found) + { + taskEntry->taskId = task->taskid; + ereport(DEBUG2, + (errmsg( + "Added hash entry in scheduled task hash " + "with task %ld for shard %ld", + task->taskid, shardId))); + } + else + { + ereport(ERROR, (errmsg("failed to record task dependency for shard %ld", + shardId))); + } + + nodeTasksList = lappend(nodeTasksList, task); + if (dependsArray) + { + pfree(dependsArray); + } + list_free(dependencyTaskList); + } + + if (list_length(nodeTasksList) > 0) + { + int nDepends = list_length(nodeTasksList); + int32 nodesInvolved[2] = { 0 }; + nodesInvolved[0] = sourceShardPlacement->nodeId; + nodesInvolved[1] = newWorkerNode->nodeId; + int64 *dependsArray = palloc(sizeof(int64) * list_length(nodeTasksList)); + int idx = 0; + foreach_declared_ptr(task, nodeTasksList) + { + dependsArray[idx++] = task->taskid; + } + resetStringInfo(&buf); + uint32 shardTransferFlags = SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY; + appendStringInfo(&buf, + "SET LOCAL application_name TO '%s%ld';\n", + CITUS_REBALANCER_APPLICATION_NAME_PREFIX, + GetGlobalPID()); + appendStringInfo(&buf, + "SELECT " + "citus_internal.citus_internal_copy_single_shard_placement" + "(%ld,%u,%u,%u,%s)", + shardId, + sourceShardPlacement->nodeId, + newWorkerNode->nodeId, + shardTransferFlags, + quote_literal_cstr(transferModeString)); + + ereport(DEBUG2, + (errmsg( + "creating relations for reference table '%s' on %s:%d ... " + "QUERY= %s", + referenceTableName, newWorkerNode->workerName, + newWorkerNode->workerPort, buf.data))); + + task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data, + nDepends, + dependsArray, 2, + nodesInvolved); + + depTasksList = lappend(depTasksList, task); + + pfree(dependsArray); + list_free(nodeTasksList); + nodeTasksList = NIL; + } + + hash_destroy(shardTaskMap); + } + + /* + * compute a dependent task list array to be used to indicate the completion of all + * reference table shards copy, so that we can start with distributed shard copy + */ + if (list_length(depTasksList) > 0) + { + *nDependTasks = list_length(depTasksList); + dependsTaskArray = palloc(sizeof(int64) * *nDependTasks); + int idx = 0; + foreach_declared_ptr(task, depTasksList) + { + dependsTaskArray[idx++] = task->taskid; + } + list_free(depTasksList); + } + /* * Since reference tables have been copied via a loopback connection we do not have to * retain our locks. Since Citus only runs well in READ COMMITTED mode we can be sure @@ -291,6 +679,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) { UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]); } + return dependsTaskArray; } @@ -614,8 +1003,9 @@ DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly) * connections. */ void -DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext( - MetadataSyncContext *context, int32 groupId, bool localOnly) +DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext(MetadataSyncContext * + context, int32 groupId, + bool localOnly) { List *replicatedPlacementListForGroup = ReplicatedPlacementsForNodeGroup(groupId); diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 2a57c0224..7dcdd5bd3 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -131,7 +131,8 @@ typedef enum LogicalRepType extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, - int targetNodePort); + int targetNodePort, + bool skipInterShardRelationshipCreation); extern void ConflictWithIsolationTestingBeforeCopy(void); extern void ConflictWithIsolationTestingAfterCopy(void); @@ -177,7 +178,8 @@ extern void CompleteNonBlockingShardTransfer(List *shardList, HTAB *publicationInfoHash, List *logicalRepTargetList, HTAB *groupedLogicalRepTargetsHash, - LogicalRepType type); + LogicalRepType type, + bool skipInterShardRelationshipCreation); extern void CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList); extern void CreatePartitioningHierarchy(List *logicalRepTargetList); diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index cf5a6fd02..9fd44b71b 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -21,6 +21,11 @@ extern void EnsureReferenceTablesExistOnAllNodes(void); extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode); +extern int64 * ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, + char + transferMode, + int * + nDependTasks); extern bool HasNodesWithMissingReferenceTables(List **referenceTableList); extern uint32 CreateReferenceTableColocationId(void); extern uint32 GetReferenceTableColocationId(void); diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index dfadc0263..0696ef6e8 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -44,10 +44,11 @@ typedef enum AdvisoryLocktagClass ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8, ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9, ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID = 10, - ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, + ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, /* Not used anymore */ ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13, ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK = 14, - ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION = 15 + ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION = 15, + ADV_LOCKTAG_CLASS_CITUS_SHARD_MOVE = 16 } AdvisoryLocktagClass; /* CitusOperations has constants for citus operations */ @@ -84,6 +85,16 @@ typedef enum CitusOperations (uint32) (shardid), \ ADV_LOCKTAG_CLASS_CITUS_SHARD) +/* advisory lock for citus shard move/copy operations, + * also it has the database hardcoded to MyDatabaseId, + * to ensure the locks are local to each database */ +#define SET_LOCKTAG_SHARD_MOVE(tag, shardid) \ + SET_LOCKTAG_ADVISORY(tag, \ + MyDatabaseId, \ + (uint32) ((shardid) >> 32), \ + (uint32) (shardid), \ + ADV_LOCKTAG_CLASS_CITUS_SHARD_MOVE) + /* reuse advisory lock, but with different, unused field 4 (7) * Also it has the database hardcoded to MyDatabaseId, to ensure the locks * are local to each database */ @@ -124,16 +135,6 @@ typedef enum CitusOperations (uint32) operationId, \ ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID) -/* reuse advisory lock, but with different, unused field 4 (12) - * Also it has the database hardcoded to MyDatabaseId, to ensure the locks - * are local to each database */ -#define SET_LOCKTAG_LOGICAL_REPLICATION(tag) \ - SET_LOCKTAG_ADVISORY(tag, \ - MyDatabaseId, \ - (uint32) 0, \ - (uint32) 0, \ - ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION) - /* reuse advisory lock, but with different, unused field 4 (14) * Also it has the database hardcoded to MyDatabaseId, to ensure the locks * are local to each database */ diff --git a/src/include/distributed/shard_transfer.h b/src/include/distributed/shard_transfer.h index c1621879b..2f253dc6d 100644 --- a/src/include/distributed/shard_transfer.h +++ b/src/include/distributed/shard_transfer.h @@ -23,10 +23,54 @@ typedef enum SHARD_TRANSFER_COPY = 2 } ShardTransferType; +/* + * ShardTransferOperationMode is used to pass flags to the shard transfer + * function. The flags are used to control the behavior of the transfer + * function. + * Currently, optionFlags are only used to customize reference table transfers. + * For distributed tables, optionFlags should always be set to 0. + */ +typedef enum +{ + /* + * This flag instructs the transfer function to only transfer single shard + * rather than transfer all the colocated shards for the shard interval. + * Using this flag mean we might break the colocated shard + * relationship on the source node. So this is only usefull when setting up + * the new node and we are sure that the node would not be used until we have + * transfered all the shards. + * The reason we need this flag is that we want to be able to transfer + * colocated shards in parallel and for now it is only used for the reference + * table shards. + * Finally if you are using this flag, you should also use consider defering + * the creation of the relationships on the source node until all colocated + * shards are transfered (see: SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS). + */ + SHARD_TRANSFER_SINGLE_SHARD_ONLY = 1 << 0, + + /* With this flag the shard transfer function does not create any constrainsts + * or foreign relations defined on the shard, This can be used to defer the + * creation of the relationships until all the shards are transfered. + * This is usefull when we are transfering colocated shards in parallel and + * we want to avoid the creation of the relationships on the source node + * until all the shards are transfered. + */ + SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS = 1 << 1, + + /* This flag is used to indicate that the shard transfer function should + * only create the relationships on the target node and not transfer any data. + * This is can be used to create the relationships that were defered + * during the transfering of shards. + */ + SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY = 1 << 2 +} ShardTransferOperationMode; + + extern void TransferShards(int64 shardId, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, - char shardReplicationMode, ShardTransferType transferType); + char shardReplicationMode, ShardTransferType transferType, + uint32 optionFlags); extern uint64 ShardListSizeInBytes(List *colocatedShardList, char *workerNodeName, uint32 workerNodePort); extern void ErrorIfMoveUnsupportedTableType(Oid relationId); diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 507577c5f..6ce4b73ca 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -140,6 +140,12 @@ DEPS = { "background_rebalance_parallel": TestDeps( None, ["multi_test_helpers", "multi_cluster_management"], worker_count=6 ), + "background_rebalance_parallel_reference_tables": TestDeps( + None, + ["multi_test_helpers", "multi_cluster_management"], + repeatable=False, + worker_count=6, + ), "function_propagation": TestDeps("minimal_schedule"), "citus_shards": TestDeps("minimal_schedule"), "grant_on_foreign_server_propagation": TestDeps("minimal_schedule"), diff --git a/src/test/regress/expected/background_rebalance_parallel.out b/src/test/regress/expected/background_rebalance_parallel.out index cc3470de9..90f13d701 100644 --- a/src/test/regress/expected/background_rebalance_parallel.out +++ b/src/test/regress/expected/background_rebalance_parallel.out @@ -534,7 +534,7 @@ FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; job_id | task_id | status | nodes_involved --------------------------------------------------------------------- 17779 | 1013 | done | {50,56} - 17779 | 1014 | running | {50,57} + 17779 | 1014 | done | {50,57} 17779 | 1015 | running | {50,56} 17779 | 1016 | blocked | {50,57} 17779 | 1017 | runnable | {50,56} diff --git a/src/test/regress/expected/background_rebalance_parallel_reference_tables.out b/src/test/regress/expected/background_rebalance_parallel_reference_tables.out new file mode 100644 index 000000000..811add5a4 --- /dev/null +++ b/src/test/regress/expected/background_rebalance_parallel_reference_tables.out @@ -0,0 +1,561 @@ +-- +-- BACKGROUND_REBALANCE_PARALLEL_REFERENCE_TABLES +-- +-- Test to check if the background tasks scheduled for moving reference tables +-- shards in parallel by the background rebalancer have the correct dependencies +-- +CREATE SCHEMA background_rebalance_parallel; +SET search_path TO background_rebalance_parallel; +SET citus.next_shard_id TO 85674000; +SET citus.shard_replication_factor TO 1; +SET client_min_messages TO ERROR; +ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; +ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050; +SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50; +SELECT 1 FROM master_remove_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_add_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +ALTER SYSTEM SET citus.max_background_task_executors_per_node = 5; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group +CREATE TABLE table1_colg1 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg1 (b int PRIMARY KEY); +SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group +CREATE TABLE table1_colg2 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg2 (b int primary key); +SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group +CREATE TABLE table1_colg3 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg3 (b int primary key); +SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Create reference tables with primary-foreign key relationships +CREATE TABLE customers ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + email TEXT UNIQUE NOT NULL +); +CREATE TABLE orders ( + id SERIAL PRIMARY KEY, + customer_id INTEGER NOT NULL REFERENCES customers(id), + order_date DATE NOT NULL DEFAULT CURRENT_DATE +); +CREATE TABLE order_items ( + id SERIAL PRIMARY KEY, + order_id INTEGER NOT NULL REFERENCES orders(id), + product_name TEXT NOT NULL, + quantity INTEGER NOT NULL, + price NUMERIC(10, 2) NOT NULL +); +SELECT create_reference_table('customers'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('orders'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('order_items'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- INSERT SOME DATA +-- Insert 10 customers +INSERT INTO customers (name, email) +SELECT + 'Customer ' || i, + 'customer' || i || '@example.com' +FROM generate_series(1, 10) AS i; +-- Insert 30 orders: each customer gets 3 orders +INSERT INTO orders (customer_id, order_date) +SELECT + (i % 10) + 1, -- customer_id between 1 and 10 + CURRENT_DATE - (i % 7) +FROM generate_series(1, 30) AS i; +-- Insert 90 order_items: each order has 3 items +INSERT INTO order_items (order_id, product_name, quantity, price) +SELECT + (i % 30) + 1, -- order_id between 1 and 30 + 'Product ' || (i % 5 + 1), + (i % 10) + 1, + round((random() * 100 + 10)::numeric, 2) +FROM generate_series(1, 90) AS i; +SELECT + c.id AS customer_id, + c.name AS customer_name, + c.email AS customer_email, + COUNT(oi.id) AS total_order_items +FROM customers c +JOIN orders o + ON c.id = o.customer_id +JOIN order_items oi + ON o.id = oi.order_id +GROUP BY c.id, c.name, c.email +ORDER BY c.id; + customer_id | customer_name | customer_email | total_order_items +--------------------------------------------------------------------- + 1 | Customer 1 | customer1@example.com | 9 + 2 | Customer 2 | customer2@example.com | 9 + 3 | Customer 3 | customer3@example.com | 9 + 4 | Customer 4 | customer4@example.com | 9 + 5 | Customer 5 | customer5@example.com | 9 + 6 | Customer 6 | customer6@example.com | 9 + 7 | Customer 7 | customer7@example.com | 9 + 8 | Customer 8 | customer8@example.com | 9 + 9 | Customer 9 | customer9@example.com | 9 + 10 | Customer 10 | customer10@example.com | 9 +(10 rows) + +-- Add two new nodes so that we can rebalance +SELECT 1 FROM citus_add_node('localhost', :worker_3_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_4_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid; + table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport +--------------------------------------------------------------------- + table1_colg1 | 85674000 | 0 | localhost | 57637 | localhost | 57640 + table1_colg1 | 85674001 | 0 | localhost | 57638 | localhost | 57639 + table2_colg1 | 85674004 | 0 | localhost | 57637 | localhost | 57640 + table2_colg1 | 85674005 | 0 | localhost | 57638 | localhost | 57639 + table1_colg2 | 85674008 | 0 | localhost | 57637 | localhost | 57640 + table1_colg2 | 85674009 | 0 | localhost | 57638 | localhost | 57639 + table2_colg2 | 85674012 | 0 | localhost | 57637 | localhost | 57640 + table2_colg2 | 85674013 | 0 | localhost | 57638 | localhost | 57639 + table1_colg3 | 85674016 | 0 | localhost | 57637 | localhost | 57640 + table1_colg3 | 85674017 | 0 | localhost | 57638 | localhost | 57639 + table2_colg3 | 85674020 | 0 | localhost | 57637 | localhost | 57640 + table2_colg3 | 85674021 | 0 | localhost | 57638 | localhost | 57639 +(12 rows) + +SET client_min_messages TO DEBUG1; +SELECT citus_rebalance_start AS job_id from citus_rebalance_start( + shard_transfer_mode := 'force_logical', + parallel_transfer_colocated_shards := true, + parallel_transfer_reference_tables := true) \gset +DEBUG: skipping child tables for relation named: table1_colg1 +DEBUG: skipping child tables for relation named: table1_colg1 +DEBUG: skipping child tables for relation named: table1_colg1 +DEBUG: skipping child tables for relation named: table1_colg1 +DEBUG: skipping child tables for relation named: table1_colg2 +DEBUG: skipping child tables for relation named: table1_colg2 +DEBUG: skipping child tables for relation named: table1_colg2 +DEBUG: skipping child tables for relation named: table1_colg2 +DEBUG: skipping child tables for relation named: table1_colg3 +DEBUG: skipping child tables for relation named: table1_colg3 +DEBUG: skipping child tables for relation named: table1_colg3 +DEBUG: skipping child tables for relation named: table1_colg3 +NOTICE: Scheduled 6 moves as job xxx +DETAIL: Rebalance scheduled as background job +HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); +SET client_min_messages TO ERROR; +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +-- see the dependencies of the tasks scheduled by the background rebalancer +SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on; + job_id | task_id | depends_on +--------------------------------------------------------------------- + 17777 | 1001 | 1000 + 17777 | 1002 | 1000 + 17777 | 1002 | 1001 + 17777 | 1003 | 1000 + 17777 | 1003 | 1001 + 17777 | 1003 | 1002 + 17777 | 1005 | 1004 + 17777 | 1006 | 1004 + 17777 | 1006 | 1005 + 17777 | 1007 | 1004 + 17777 | 1007 | 1005 + 17777 | 1007 | 1006 + 17777 | 1008 | 1003 + 17777 | 1008 | 1007 + 17777 | 1009 | 1003 + 17777 | 1009 | 1007 + 17777 | 1010 | 1003 + 17777 | 1010 | 1007 + 17777 | 1011 | 1003 + 17777 | 1011 | 1007 + 17777 | 1012 | 1003 + 17777 | 1012 | 1007 + 17777 | 1013 | 1003 + 17777 | 1013 | 1007 +(24 rows) + +-- Temporary hack to eliminate SET application name from command until we get the +-- background job enhancement done. +SELECT D.task_id, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + task_id | command | depends_on | command +--------------------------------------------------------------------- + 1001 | 85674025 | 1000 | 85674024 + 1002 | 85674026 | 1000 | 85674024 + 1002 | 85674026 | 1001 | 85674025 + 1003 | 85674026 | 1000 | 85674024 + 1003 | 85674026 | 1001 | 85674025 + 1003 | 85674026 | 1002 | 85674026 + 1005 | 85674025 | 1004 | 85674024 + 1006 | 85674026 | 1004 | 85674024 + 1006 | 85674026 | 1005 | 85674025 + 1007 | 85674026 | 1004 | 85674024 + 1007 | 85674026 | 1005 | 85674025 + 1007 | 85674026 | 1006 | 85674026 + 1008 | 85674001 | 1003 | 85674026 + 1008 | 85674001 | 1007 | 85674026 + 1009 | 85674000 | 1003 | 85674026 + 1009 | 85674000 | 1007 | 85674026 + 1010 | 85674009 | 1003 | 85674026 + 1010 | 85674009 | 1007 | 85674026 + 1011 | 85674008 | 1003 | 85674026 + 1011 | 85674008 | 1007 | 85674026 + 1012 | 85674017 | 1003 | 85674026 + 1012 | 85674017 | 1007 | 85674026 + 1013 | 85674016 | 1003 | 85674026 + 1013 | 85674016 | 1007 | 85674026 +(24 rows) + +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; +-- Drain worker_3 so that we can move only one colocation group to worker_3 +-- to create an unbalance that would cause parallel rebalancing. +SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); +-- Move all the shards of Colocation group 3 to worker_3. +SELECT +master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port +ORDER BY + shardid; + master_move_shard_placement +--------------------------------------------------------------------- + + + + +(4 rows) + +CALL citus_cleanup_orphaned_resources(); +SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_5_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_6_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid; + table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport +--------------------------------------------------------------------- + table1_colg1 | 85674001 | 0 | localhost | 57637 | localhost | 57642 + table1_colg1 | 85674003 | 0 | localhost | 57638 | localhost | 57641 + table2_colg1 | 85674005 | 0 | localhost | 57637 | localhost | 57642 + table2_colg1 | 85674007 | 0 | localhost | 57638 | localhost | 57641 + table1_colg3 | 85674016 | 0 | localhost | 57639 | localhost | 57641 + table1_colg3 | 85674017 | 0 | localhost | 57639 | localhost | 57642 + table2_colg3 | 85674020 | 0 | localhost | 57639 | localhost | 57641 + table2_colg3 | 85674021 | 0 | localhost | 57639 | localhost | 57642 +(8 rows) + +SET client_min_messages TO DEBUG1; +SELECT citus_rebalance_start AS job_id from citus_rebalance_start( + shard_transfer_mode := 'block_writes', + parallel_transfer_colocated_shards := true, + parallel_transfer_reference_tables := true) \gset +DEBUG: skipping child tables for relation named: table1_colg3 +DEBUG: skipping child tables for relation named: table1_colg1 +DEBUG: skipping child tables for relation named: table1_colg1 +DEBUG: skipping child tables for relation named: table1_colg1 +DEBUG: skipping child tables for relation named: table1_colg2 +DEBUG: skipping child tables for relation named: table1_colg3 +DEBUG: skipping child tables for relation named: table1_colg2 +DEBUG: skipping child tables for relation named: table1_colg2 +DEBUG: skipping child tables for relation named: table1_colg3 +DEBUG: skipping child tables for relation named: table1_colg3 +DEBUG: skipping child tables for relation named: table1_colg1 +DEBUG: skipping child tables for relation named: table1_colg2 +NOTICE: Scheduled 4 moves as job xxx +DETAIL: Rebalance scheduled as background job +HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status(); +SET client_min_messages TO ERROR; +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +-- see the dependencies of the tasks scheduled by the background rebalancer +SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on; + job_id | task_id | depends_on +--------------------------------------------------------------------- + 17778 | 1015 | 1014 + 17778 | 1016 | 1014 + 17778 | 1016 | 1015 + 17778 | 1017 | 1014 + 17778 | 1017 | 1015 + 17778 | 1017 | 1016 + 17778 | 1019 | 1018 + 17778 | 1020 | 1018 + 17778 | 1020 | 1019 + 17778 | 1021 | 1018 + 17778 | 1021 | 1019 + 17778 | 1021 | 1020 + 17778 | 1022 | 1017 + 17778 | 1022 | 1021 + 17778 | 1023 | 1017 + 17778 | 1023 | 1021 + 17778 | 1024 | 1017 + 17778 | 1024 | 1021 + 17778 | 1025 | 1017 + 17778 | 1025 | 1021 +(20 rows) + +-- Temporary hack to eliminate SET application name from command until we get the +-- background job enhancement done. +SELECT D.task_id, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + task_id | command | depends_on | command +--------------------------------------------------------------------- + 1015 | 85674025 | 1014 | 85674024 + 1016 | 85674026 | 1014 | 85674024 + 1016 | 85674026 | 1015 | 85674025 + 1017 | 85674026 | 1014 | 85674024 + 1017 | 85674026 | 1015 | 85674025 + 1017 | 85674026 | 1016 | 85674026 + 1019 | 85674025 | 1018 | 85674024 + 1020 | 85674026 | 1018 | 85674024 + 1020 | 85674026 | 1019 | 85674025 + 1021 | 85674026 | 1018 | 85674024 + 1021 | 85674026 | 1019 | 85674025 + 1021 | 85674026 | 1020 | 85674026 + 1022 | 85674016 | 1017 | 85674026 + 1022 | 85674016 | 1021 | 85674026 + 1023 | 85674017 | 1017 | 85674026 + 1023 | 85674017 | 1021 | 85674026 + 1024 | 85674003 | 1017 | 85674026 + 1024 | 85674003 | 1021 | 85674026 + 1025 | 85674001 | 1017 | 85674026 + 1025 | 85674001 | 1021 | 85674026 +(20 rows) + +SELECT + c.id AS customer_id, + c.name AS customer_name, + c.email AS customer_email, + COUNT(oi.id) AS total_order_items +FROM customers c +JOIN orders o + ON c.id = o.customer_id +JOIN order_items oi + ON o.id = oi.order_id +GROUP BY c.id, c.name, c.email +ORDER BY c.id; + customer_id | customer_name | customer_email | total_order_items +--------------------------------------------------------------------- + 1 | Customer 1 | customer1@example.com | 9 + 2 | Customer 2 | customer2@example.com | 9 + 3 | Customer 3 | customer3@example.com | 9 + 4 | Customer 4 | customer4@example.com | 9 + 5 | Customer 5 | customer5@example.com | 9 + 6 | Customer 6 | customer6@example.com | 9 + 7 | Customer 7 | customer7@example.com | 9 + 8 | Customer 8 | customer8@example.com | 9 + 9 | Customer 9 | customer9@example.com | 9 + 10 | Customer 10 | customer10@example.com | 9 +(10 rows) + +DROP SCHEMA background_rebalance_parallel CASCADE; +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_3_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_4_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_5_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_6_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; diff --git a/src/test/regress/expected/background_task_queue_monitor.out b/src/test/regress/expected/background_task_queue_monitor.out index 1d4006377..082238086 100644 --- a/src/test/regress/expected/background_task_queue_monitor.out +++ b/src/test/regress/expected/background_task_queue_monitor.out @@ -252,9 +252,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable) job_id | task_id | status --------------------------------------------------------------------- - 1450005 | 1450009 | running - 1450005 | 1450010 | running - 1450005 | 1450011 | running + 1450005 | 1450009 | done + 1450005 | 1450010 | done + 1450005 | 1450011 | done 1450006 | 1450012 | running 1450007 | 1450013 | runnable (5 rows) @@ -282,9 +282,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that last task is running job_id | task_id | status --------------------------------------------------------------------- - 1450005 | 1450009 | running - 1450005 | 1450010 | running - 1450005 | 1450011 | running + 1450005 | 1450009 | done + 1450005 | 1450010 | done + 1450005 | 1450011 | done 1450006 | 1450012 | cancelled 1450007 | 1450013 | running (5 rows) @@ -318,9 +318,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that multiple cancels worked job_id | task_id | status --------------------------------------------------------------------- - 1450005 | 1450009 | cancelled - 1450005 | 1450010 | cancelled - 1450005 | 1450011 | cancelled + 1450005 | 1450009 | done + 1450005 | 1450010 | done + 1450005 | 1450011 | done 1450006 | 1450012 | cancelled 1450007 | 1450013 | cancelled (5 rows) @@ -372,9 +372,9 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; -- show that last task is not running but ready to run(runnable) task_id | status --------------------------------------------------------------------- - 1450014 | running - 1450015 | running - 1450016 | running + 1450014 | done + 1450015 | done + 1450016 | done 1450017 | running 1450018 | runnable (5 rows) @@ -397,9 +397,9 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; -- show that last task is running task_id | status --------------------------------------------------------------------- - 1450014 | running - 1450015 | running - 1450016 | running + 1450014 | done + 1450015 | done + 1450016 | done 1450017 | running 1450018 | running (5 rows) @@ -445,9 +445,9 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; -- show that all tasks are cancelled task_id | status --------------------------------------------------------------------- - 1450014 | cancelled - 1450015 | cancelled - 1450016 | cancelled + 1450014 | done + 1450015 | done + 1450016 | done 1450017 | cancelled 1450018 | cancelled (5 rows) @@ -825,15 +825,15 @@ SELECT pg_reload_conf(); -- if pg_cancel_backend is called on one of the running task PIDs -- task doesn't restart because it's not allowed anymore by the limit. -- node with id 1 can be used only once, unless there are previously running tasks -SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset -SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process +SELECT pid AS task_id7_pid FROM pg_dist_background_task WHERE task_id IN (:task_id7) \gset +SELECT pg_cancel_backend(:task_id7_pid); -- cancel task_id7 process pg_cancel_backend --------------------------------------------------------------------- t (1 row) -- task goes to only runnable state, not running anymore. -SELECT citus_task_wait(:task_id6, desired_status => 'runnable'); +SELECT citus_task_wait(:task_id7, desired_status => 'runnable'); citus_task_wait --------------------------------------------------------------------- @@ -851,8 +851,8 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task 1450017 | 1450027 | done | {1,2} 1450017 | 1450028 | done | {1,3} 1450017 | 1450029 | done | {2,4} - 1450017 | 1450030 | runnable | {1,2} - 1450017 | 1450031 | running | {1,3} + 1450017 | 1450030 | running | {1,2} + 1450017 | 1450031 | runnable | {1,3} 1450017 | 1450032 | running | {1,4} (8 rows) @@ -868,7 +868,7 @@ SELECT citus_task_wait(:task_id8, desired_status => 'done'); (1 row) -SELECT citus_task_wait(:task_id6, desired_status => 'running'); +SELECT citus_task_wait(:task_id6, desired_status => 'done'); citus_task_wait --------------------------------------------------------------------- @@ -880,16 +880,16 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5, :task_id6, :task_id7, :task_id8) ORDER BY job_id, task_id; - job_id | task_id | status | nodes_involved + job_id | task_id | status | nodes_involved --------------------------------------------------------------------- - 1450017 | 1450025 | done | {1,2} - 1450017 | 1450026 | done | {3,4} - 1450017 | 1450027 | done | {1,2} - 1450017 | 1450028 | done | {1,3} - 1450017 | 1450029 | done | {2,4} - 1450017 | 1450030 | running | {1,2} - 1450017 | 1450031 | done | {1,3} - 1450017 | 1450032 | done | {1,4} + 1450017 | 1450025 | done | {1,2} + 1450017 | 1450026 | done | {3,4} + 1450017 | 1450027 | done | {1,2} + 1450017 | 1450028 | done | {1,3} + 1450017 | 1450029 | done | {2,4} + 1450017 | 1450030 | done | {1,2} + 1450017 | 1450031 | done | {1,3} + 1450017 | 1450032 | done | {1,4} (8 rows) SELECT citus_job_cancel(:job_id1); diff --git a/src/test/regress/expected/isolation_logical_replication_single_shard_commands.out b/src/test/regress/expected/isolation_logical_replication_single_shard_commands.out index cbb5e3d8d..979d10c14 100644 --- a/src/test/regress/expected/isolation_logical_replication_single_shard_commands.out +++ b/src/test/regress/expected/isolation_logical_replication_single_shard_commands.out @@ -594,10 +594,15 @@ step s2-move-placement: SELECT master_move_shard_placement( get_shard_id_for_distribution_column('logical_replicate_placement', 4), 'localhost', 57637, 'localhost', 57638); + +step s1-end: + COMMIT; -ERROR: could not acquire the lock required to move public.logical_replicate_placement -step s1-end: - COMMIT; +step s2-move-placement: <... completed> +master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) step s2-end: COMMIT; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index b9f4b621c..bbc894dad 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1638,9 +1638,12 @@ ALTER EXTENSION citus UPDATE TO '13.2-1'; SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- + function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) bigint | function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision) | + | function citus_internal.citus_internal_copy_single_shard_placement(bigint,integer,integer,integer,citus.shard_transfer_mode) void + | function citus_rebalance_start(name,boolean,citus.shard_transfer_mode,boolean,boolean) bigint | function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision) -(2 rows) +(5 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 8848a489d..270747958 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -30,7 +30,7 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND pg_catalog.pg_describe_object(classid, objid, 0) != 'function any_value(anyelement)' AND pg_catalog.pg_describe_object(classid, objid, 0) != 'function any_value_agg(anyelement,anyelement)' ORDER BY 1; - description + description --------------------------------------------------------------------- event trigger citus_cascade_to_partition function alter_distributed_table(regclass,text,integer,text,boolean) @@ -86,6 +86,7 @@ ORDER BY 1; function citus_internal.add_shard_metadata(regclass,bigint,"char",text,text) function citus_internal.add_tenant_schema(oid,integer) function citus_internal.adjust_local_clock_to_remote(cluster_clock) + function citus_internal.citus_internal_copy_single_shard_placement(bigint,integer,integer,integer,citus.shard_transfer_mode) function citus_internal.database_command(text) function citus_internal.delete_colocation_metadata(integer) function citus_internal.delete_partition_metadata(regclass) @@ -156,7 +157,7 @@ ORDER BY 1; function citus_pid_for_gpid(bigint) function citus_prepare_pg_upgrade() function citus_query_stats() - function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) + function citus_rebalance_start(name,boolean,citus.shard_transfer_mode,boolean,boolean) function citus_rebalance_status(boolean) function citus_rebalance_stop() function citus_rebalance_wait() @@ -394,6 +395,6 @@ ORDER BY 1; view citus_tables view pg_dist_shard_placement view time_partitions -(362 rows) +(363 rows) DROP TABLE extension_basic_types; diff --git a/src/test/regress/operations_schedule b/src/test/regress/operations_schedule index 6dbc303c2..5639ea4d6 100644 --- a/src/test/regress/operations_schedule +++ b/src/test/regress/operations_schedule @@ -13,3 +13,4 @@ test: multi_colocated_shard_rebalance test: cpu_priority test: check_mx test: citus_drain_node +test: background_rebalance_parallel_reference_tables diff --git a/src/test/regress/sql/background_rebalance_parallel_reference_tables.sql b/src/test/regress/sql/background_rebalance_parallel_reference_tables.sql new file mode 100644 index 000000000..f79961411 --- /dev/null +++ b/src/test/regress/sql/background_rebalance_parallel_reference_tables.sql @@ -0,0 +1,267 @@ +-- +-- BACKGROUND_REBALANCE_PARALLEL_REFERENCE_TABLES +-- +-- Test to check if the background tasks scheduled for moving reference tables +-- shards in parallel by the background rebalancer have the correct dependencies +-- +CREATE SCHEMA background_rebalance_parallel; +SET search_path TO background_rebalance_parallel; +SET citus.next_shard_id TO 85674000; +SET citus.shard_replication_factor TO 1; +SET client_min_messages TO ERROR; + +ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; +ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050; + +SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50; + +SELECT 1 FROM master_remove_node('localhost', :worker_1_port); +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); + +SELECT 1 FROM master_add_node('localhost', :worker_1_port); +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +ALTER SYSTEM SET citus.max_background_task_executors_per_node = 5; +SELECT pg_reload_conf(); + +-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group +CREATE TABLE table1_colg1 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none'); + +CREATE TABLE table2_colg1 (b int PRIMARY KEY); + +SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); + +-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group +CREATE TABLE table1_colg2 (a int PRIMARY KEY); + +SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none'); + +CREATE TABLE table2_colg2 (b int primary key); + +SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); + +-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group +CREATE TABLE table1_colg3 (a int PRIMARY KEY); + +SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none'); + +CREATE TABLE table2_colg3 (b int primary key); + +SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); + +-- Create reference tables with primary-foreign key relationships + +CREATE TABLE customers ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + email TEXT UNIQUE NOT NULL +); + +CREATE TABLE orders ( + id SERIAL PRIMARY KEY, + customer_id INTEGER NOT NULL REFERENCES customers(id), + order_date DATE NOT NULL DEFAULT CURRENT_DATE +); + +CREATE TABLE order_items ( + id SERIAL PRIMARY KEY, + order_id INTEGER NOT NULL REFERENCES orders(id), + product_name TEXT NOT NULL, + quantity INTEGER NOT NULL, + price NUMERIC(10, 2) NOT NULL +); + +SELECT create_reference_table('customers'); +SELECT create_reference_table('orders'); +SELECT create_reference_table('order_items'); + +-- INSERT SOME DATA +-- Insert 10 customers +INSERT INTO customers (name, email) +SELECT + 'Customer ' || i, + 'customer' || i || '@example.com' +FROM generate_series(1, 10) AS i; + +-- Insert 30 orders: each customer gets 3 orders +INSERT INTO orders (customer_id, order_date) +SELECT + (i % 10) + 1, -- customer_id between 1 and 10 + CURRENT_DATE - (i % 7) +FROM generate_series(1, 30) AS i; + +-- Insert 90 order_items: each order has 3 items +INSERT INTO order_items (order_id, product_name, quantity, price) +SELECT + (i % 30) + 1, -- order_id between 1 and 30 + 'Product ' || (i % 5 + 1), + (i % 10) + 1, + round((random() * 100 + 10)::numeric, 2) +FROM generate_series(1, 90) AS i; + + + +SELECT + c.id AS customer_id, + c.name AS customer_name, + c.email AS customer_email, + COUNT(oi.id) AS total_order_items +FROM customers c +JOIN orders o + ON c.id = o.customer_id +JOIN order_items oi + ON o.id = oi.order_id +GROUP BY c.id, c.name, c.email +ORDER BY c.id; + +-- Add two new nodes so that we can rebalance +SELECT 1 FROM citus_add_node('localhost', :worker_3_port); +SELECT 1 FROM citus_add_node('localhost', :worker_4_port); + +SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid; + +SET client_min_messages TO DEBUG1; + +SELECT citus_rebalance_start AS job_id from citus_rebalance_start( + shard_transfer_mode := 'force_logical', + parallel_transfer_colocated_shards := true, + parallel_transfer_reference_tables := true) \gset + +SET client_min_messages TO ERROR; + +SELECT citus_rebalance_wait(); + +SELECT citus_rebalance_wait(); + +-- see the dependencies of the tasks scheduled by the background rebalancer +SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on; + +-- Temporary hack to eliminate SET application name from command until we get the +-- background job enhancement done. +SELECT D.task_id, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + + +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; + +-- Drain worker_3 so that we can move only one colocation group to worker_3 +-- to create an unbalance that would cause parallel rebalancing. +SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); +SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true); + +CALL citus_cleanup_orphaned_resources(); + +-- Move all the shards of Colocation group 3 to worker_3. +SELECT +master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port +ORDER BY + shardid; + +CALL citus_cleanup_orphaned_resources(); + +SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); +SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true); + +SELECT 1 FROM citus_add_node('localhost', :worker_5_port); +SELECT 1 FROM citus_add_node('localhost', :worker_6_port); + +SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid; + +SET client_min_messages TO DEBUG1; + +SELECT citus_rebalance_start AS job_id from citus_rebalance_start( + shard_transfer_mode := 'block_writes', + parallel_transfer_colocated_shards := true, + parallel_transfer_reference_tables := true) \gset + + +SET client_min_messages TO ERROR; + +SELECT citus_rebalance_wait(); + +-- see the dependencies of the tasks scheduled by the background rebalancer +SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on; +-- Temporary hack to eliminate SET application name from command until we get the +-- background job enhancement done. +SELECT D.task_id, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + +SELECT + c.id AS customer_id, + c.name AS customer_name, + c.email AS customer_email, + COUNT(oi.id) AS total_order_items +FROM customers c +JOIN orders o + ON c.id = o.customer_id +JOIN order_items oi + ON o.id = oi.order_id +GROUP BY c.id, c.name, c.email +ORDER BY c.id; + +DROP SCHEMA background_rebalance_parallel CASCADE; +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; +SELECT public.wait_for_resource_cleanup(); +select citus_remove_node('localhost', :worker_3_port); +select citus_remove_node('localhost', :worker_4_port); +select citus_remove_node('localhost', :worker_5_port); +select citus_remove_node('localhost', :worker_6_port); + +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; diff --git a/src/test/regress/sql/background_task_queue_monitor.sql b/src/test/regress/sql/background_task_queue_monitor.sql index 9f6abb73a..6c6a3b078 100644 --- a/src/test/regress/sql/background_task_queue_monitor.sql +++ b/src/test/regress/sql/background_task_queue_monitor.sql @@ -345,11 +345,11 @@ SELECT pg_reload_conf(); -- if pg_cancel_backend is called on one of the running task PIDs -- task doesn't restart because it's not allowed anymore by the limit. -- node with id 1 can be used only once, unless there are previously running tasks -SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset -SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process +SELECT pid AS task_id7_pid FROM pg_dist_background_task WHERE task_id IN (:task_id7) \gset +SELECT pg_cancel_backend(:task_id7_pid); -- cancel task_id7 process -- task goes to only runnable state, not running anymore. -SELECT citus_task_wait(:task_id6, desired_status => 'runnable'); +SELECT citus_task_wait(:task_id7, desired_status => 'runnable'); -- show that cancelled task hasn't restarted because limit doesn't allow it SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task @@ -359,7 +359,7 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task SELECT citus_task_wait(:task_id7, desired_status => 'done'); SELECT citus_task_wait(:task_id8, desired_status => 'done'); -SELECT citus_task_wait(:task_id6, desired_status => 'running'); +SELECT citus_task_wait(:task_id6, desired_status => 'done'); -- show that the 6th task has restarted only after both 6 and 7 are done -- since we have a limit of 1 background task executor per node with id 1