diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index fe51fafa4..b6c12f7a4 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -768,6 +768,7 @@ AcquireRebalanceColocationLock(Oid relationId, const char *operationName) } } + /* * AcquireRebalanceOperationLock does not allow concurrent rebalance * operations. @@ -786,14 +787,15 @@ AcquireRebalanceOperationLock(const char *operationName) if (!lockAcquired) { ereport(ERROR, (errmsg("could not acquire the lock required for %s operation", - operationName), + operationName), errdetail("It means that either a concurrent shard move " - "or shard copy is happening."), + "or shard copy is happening."), errhint("Make sure that the concurrent operation has " "finished and re-run the command"))); } } + /* * AcquirePlacementColocationLock tries to acquire a lock for * rebalance/replication while moving/copying the placement. If this @@ -2033,13 +2035,14 @@ InitializeShardMoveDependencies() */ static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 *refTablesDepTaskIds, - int refTablesDepTaskIdsCount, + int refTablesDepTaskIdsCount, ShardMoveDependencies shardMoveDependencies, int *nDepends) { HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64, "shardMoveDependencyList", 0); bool found; + /* * Check if there exists moves scheduled earlier whose source node * overlaps with the current move's target node. @@ -2104,7 +2107,6 @@ static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId, ShardMoveDependencies shardMoveDependencies) { - bool found; ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search( shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER, @@ -2204,13 +2206,14 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo if (HasNodesWithMissingReferenceTables(&referenceTableIdList)) { - refTablesDepTaskIds = ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(jobId, TRANSFER_MODE_BLOCK_WRITES, &refTablesDepTaskIdsCount); + refTablesDepTaskIds = ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes( + jobId, TRANSFER_MODE_BLOCK_WRITES, &refTablesDepTaskIdsCount); ereport(DEBUG2, - (errmsg("%d dependent copy reference table tasks for job %ld", - refTablesDepTaskIdsCount, jobId), - errdetail("Rebalance scheduled as background job"), - errhint("To monitor progress, run: SELECT * FROM " - "citus_rebalance_status();"))); + (errmsg("%d dependent copy reference table tasks for job %ld", + refTablesDepTaskIdsCount, jobId), + errdetail("Rebalance scheduled as background job"), + errhint("To monitor progress, run: SELECT * FROM " + "citus_rebalance_status();"))); } PlacementUpdateEvent *move = NULL; diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 57c0f418b..3af46dde5 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -305,6 +305,7 @@ citus_copy_one_shard_placement(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + /* * citus_move_shard_placement moves given shard (and its co-located shards) from one * node to the other node. To accomplish this it entirely recreates the table structure @@ -378,6 +379,8 @@ citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + + /* * TransferShards is the function for shard transfers. */ @@ -464,9 +467,11 @@ TransferShards(int64 shardId, char *sourceNodeName, } bool transferAlreadyCompleted = TransferAlreadyCompleted(colocatedShardList, - sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort, - transferType); + sourceNodeName, + sourceNodePort, + targetNodeName, + targetNodePort, + transferType); /* * If we just need to create the shard relationships,We don't need to do anything @@ -482,15 +487,18 @@ TransferShards(int64 shardId, char *sourceNodeName, * the relationships, we can return right away */ ereport(WARNING, (errmsg("shard is not present on node %s:%d", - targetNodeName, targetNodePort), - errdetail("%s may have not completed.", + targetNodeName, targetNodePort), + errdetail("%s may have not completed.", operationNameCapitalized))); return; } - CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort, (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL), + CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName + , + targetNodePort, (shardReplicationMode == + TRANSFER_MODE_FORCE_LOGICAL), operationFunctionName, optionFlags); + /* We don't need to do anything else, just return */ return; } @@ -499,8 +507,8 @@ TransferShards(int64 shardId, char *sourceNodeName, { /* if the transfer is already completed, we can return right away */ ereport(WARNING, (errmsg("shard is already present on node %s:%d", - targetNodeName, targetNodePort), - errdetail("%s may have already completed.", + targetNodeName, targetNodePort), + errdetail("%s may have already completed.", operationNameCapitalized))); return; } @@ -591,7 +599,8 @@ TransferShards(int64 shardId, char *sourceNodeName, } CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort, useLogicalReplication, operationFunctionName, optionFlags); + targetNodePort, useLogicalReplication, operationFunctionName, + optionFlags); if (transferType == SHARD_TRANSFER_MOVE) { @@ -649,6 +658,7 @@ TransferShards(int64 shardId, char *sourceNodeName, FinalizeCurrentProgressMonitor(); } + /* * Insert deferred cleanup records. * The shards will be dropped by background cleaner later. @@ -1534,28 +1544,30 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, foreach_declared_ptr(shardInterval, shardIntervalList) { /* - * For each shard we first create the shard table in a separate - * transaction and then we copy the data and create the indexes in a - * second separate transaction. The reason we don't do both in a single - * transaction is so we can see the size of the new shard growing - * during the copy when we run get_rebalance_progress in another - * session. If we wouldn't split these two phases up, then the table - * wouldn't be visible in the session that get_rebalance_progress uses. - * So get_rebalance_progress would always report its size as 0. - */ - List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, - sourceNodePort); + * For each shard we first create the shard table in a separate + * transaction and then we copy the data and create the indexes in a + * second separate transaction. The reason we don't do both in a single + * transaction is so we can see the size of the new shard growing + * during the copy when we run get_rebalance_progress in another + * session. If we wouldn't split these two phases up, then the table + * wouldn't be visible in the session that get_rebalance_progress uses. + * So get_rebalance_progress would always report its size as 0. + */ + List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, + sourceNodeName, + sourceNodePort); char *tableOwner = TableOwner(shardInterval->relationId); /* drop the shard we created on the target, in case of failure */ InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, - ConstructQualifiedShardName(shardInterval), - GroupForNode(targetNodeName, - targetNodePort), - CLEANUP_ON_FAILURE); + ConstructQualifiedShardName( + shardInterval), + GroupForNode(targetNodeName, + targetNodePort), + CLEANUP_ON_FAILURE); SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); + tableOwner, ddlCommandList); } UpdatePlacementUpdateStatusForShardIntervalList( @@ -1578,10 +1590,10 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, { List *ddlCommandList = PostLoadShardCreationCommandList(shardInterval, sourceNodeName, - sourceNodePort); + sourceNodePort); char *tableOwner = TableOwner(shardInterval->relationId); SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); + tableOwner, ddlCommandList); MemoryContextReset(localContext); } @@ -1594,9 +1606,9 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, if (!(optionFlags & SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS)) { /* - * Once all shards are copied, we can recreate relationships between shards. - * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. - */ + * Once all shards are copied, we can recreate relationships between shards. + * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. + */ List *shardIntervalWithDDCommandsList = NIL; foreach_declared_ptr(shardInterval, shardIntervalList) { @@ -1609,7 +1621,7 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, shardInterval, list_make1(attachPartitionCommand)); shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, - shardCommandList); + shardCommandList); } } @@ -1620,24 +1632,26 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); /* - * Iterate through the colocated shards and create DDL commamnds - * to create the foreign constraints. - */ + * Iterate through the colocated shards and create DDL commamnds + * to create the foreign constraints. + */ foreach_declared_ptr(shardInterval, shardIntervalList) { List *shardForeignConstraintCommandList = NIL; List *referenceTableForeignConstraintList = NIL; CopyShardForeignConstraintCommandListGrouped(shardInterval, - &shardForeignConstraintCommandList, - &referenceTableForeignConstraintList); + & + shardForeignConstraintCommandList, + & + referenceTableForeignConstraintList); ShardCommandList *shardCommandList = CreateShardCommandList( shardInterval, list_concat(shardForeignConstraintCommandList, referenceTableForeignConstraintList)); shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, - shardCommandList); + shardCommandList); } /* Now execute the Partitioning & Foreign constraints creation commads. */ @@ -1646,8 +1660,8 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, { char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, - shardCommandList->ddlCommandList); + tableOwner, + shardCommandList->ddlCommandList); } UpdatePlacementUpdateStatusForShardIntervalList( @@ -1736,7 +1750,8 @@ CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardInte ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList, MaxAdaptiveExecutorPoolSize, - NULL /* jobIdList (ignored by API implementation) */); + NULL /* jobIdList (ignored by API implementation) */ + ); }