Fix indentation suggestions

pull/7983/head
Muhammad Usama 2025-05-15 14:43:05 +05:00
parent a8a5f34dc9
commit e8361bcab8
2 changed files with 69 additions and 51 deletions

View File

@ -768,6 +768,7 @@ AcquireRebalanceColocationLock(Oid relationId, const char *operationName)
} }
} }
/* /*
* AcquireRebalanceOperationLock does not allow concurrent rebalance * AcquireRebalanceOperationLock does not allow concurrent rebalance
* operations. * operations.
@ -786,14 +787,15 @@ AcquireRebalanceOperationLock(const char *operationName)
if (!lockAcquired) if (!lockAcquired)
{ {
ereport(ERROR, (errmsg("could not acquire the lock required for %s operation", ereport(ERROR, (errmsg("could not acquire the lock required for %s operation",
operationName), operationName),
errdetail("It means that either a concurrent shard move " errdetail("It means that either a concurrent shard move "
"or shard copy is happening."), "or shard copy is happening."),
errhint("Make sure that the concurrent operation has " errhint("Make sure that the concurrent operation has "
"finished and re-run the command"))); "finished and re-run the command")));
} }
} }
/* /*
* AcquirePlacementColocationLock tries to acquire a lock for * AcquirePlacementColocationLock tries to acquire a lock for
* rebalance/replication while moving/copying the placement. If this * rebalance/replication while moving/copying the placement. If this
@ -2033,13 +2035,14 @@ InitializeShardMoveDependencies()
*/ */
static int64 * static int64 *
GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 *refTablesDepTaskIds, GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 *refTablesDepTaskIds,
int refTablesDepTaskIdsCount, int refTablesDepTaskIdsCount,
ShardMoveDependencies shardMoveDependencies, int *nDepends) ShardMoveDependencies shardMoveDependencies, int *nDepends)
{ {
HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64, HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64,
"shardMoveDependencyList", 0); "shardMoveDependencyList", 0);
bool found; bool found;
/* /*
* Check if there exists moves scheduled earlier whose source node * Check if there exists moves scheduled earlier whose source node
* overlaps with the current move's target node. * overlaps with the current move's target node.
@ -2104,7 +2107,6 @@ static void
UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId, UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId,
ShardMoveDependencies shardMoveDependencies) ShardMoveDependencies shardMoveDependencies)
{ {
bool found; bool found;
ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search( ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search(
shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER, shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER,
@ -2204,13 +2206,14 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
if (HasNodesWithMissingReferenceTables(&referenceTableIdList)) if (HasNodesWithMissingReferenceTables(&referenceTableIdList))
{ {
refTablesDepTaskIds = ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(jobId, TRANSFER_MODE_BLOCK_WRITES, &refTablesDepTaskIdsCount); refTablesDepTaskIds = ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(
jobId, TRANSFER_MODE_BLOCK_WRITES, &refTablesDepTaskIdsCount);
ereport(DEBUG2, ereport(DEBUG2,
(errmsg("%d dependent copy reference table tasks for job %ld", (errmsg("%d dependent copy reference table tasks for job %ld",
refTablesDepTaskIdsCount, jobId), refTablesDepTaskIdsCount, jobId),
errdetail("Rebalance scheduled as background job"), errdetail("Rebalance scheduled as background job"),
errhint("To monitor progress, run: SELECT * FROM " errhint("To monitor progress, run: SELECT * FROM "
"citus_rebalance_status();"))); "citus_rebalance_status();")));
} }
PlacementUpdateEvent *move = NULL; PlacementUpdateEvent *move = NULL;

View File

@ -305,6 +305,7 @@ citus_copy_one_shard_placement(PG_FUNCTION_ARGS)
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
/* /*
* citus_move_shard_placement moves given shard (and its co-located shards) from one * citus_move_shard_placement moves given shard (and its co-located shards) from one
* node to the other node. To accomplish this it entirely recreates the table structure * node to the other node. To accomplish this it entirely recreates the table structure
@ -378,6 +379,8 @@ citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS)
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
/* /*
* TransferShards is the function for shard transfers. * TransferShards is the function for shard transfers.
*/ */
@ -464,9 +467,11 @@ TransferShards(int64 shardId, char *sourceNodeName,
} }
bool transferAlreadyCompleted = TransferAlreadyCompleted(colocatedShardList, bool transferAlreadyCompleted = TransferAlreadyCompleted(colocatedShardList,
sourceNodeName, sourceNodePort, sourceNodeName,
targetNodeName, targetNodePort, sourceNodePort,
transferType); targetNodeName,
targetNodePort,
transferType);
/* /*
* If we just need to create the shard relationships,We don't need to do anything * If we just need to create the shard relationships,We don't need to do anything
@ -482,15 +487,18 @@ TransferShards(int64 shardId, char *sourceNodeName,
* the relationships, we can return right away * the relationships, we can return right away
*/ */
ereport(WARNING, (errmsg("shard is not present on node %s:%d", ereport(WARNING, (errmsg("shard is not present on node %s:%d",
targetNodeName, targetNodePort), targetNodeName, targetNodePort),
errdetail("%s may have not completed.", errdetail("%s may have not completed.",
operationNameCapitalized))); operationNameCapitalized)));
return; return;
} }
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName
targetNodePort, (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL), ,
targetNodePort, (shardReplicationMode ==
TRANSFER_MODE_FORCE_LOGICAL),
operationFunctionName, optionFlags); operationFunctionName, optionFlags);
/* We don't need to do anything else, just return */ /* We don't need to do anything else, just return */
return; return;
} }
@ -499,8 +507,8 @@ TransferShards(int64 shardId, char *sourceNodeName,
{ {
/* if the transfer is already completed, we can return right away */ /* if the transfer is already completed, we can return right away */
ereport(WARNING, (errmsg("shard is already present on node %s:%d", ereport(WARNING, (errmsg("shard is already present on node %s:%d",
targetNodeName, targetNodePort), targetNodeName, targetNodePort),
errdetail("%s may have already completed.", errdetail("%s may have already completed.",
operationNameCapitalized))); operationNameCapitalized)));
return; return;
} }
@ -591,7 +599,8 @@ TransferShards(int64 shardId, char *sourceNodeName,
} }
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName,
targetNodePort, useLogicalReplication, operationFunctionName, optionFlags); targetNodePort, useLogicalReplication, operationFunctionName,
optionFlags);
if (transferType == SHARD_TRANSFER_MOVE) if (transferType == SHARD_TRANSFER_MOVE)
{ {
@ -649,6 +658,7 @@ TransferShards(int64 shardId, char *sourceNodeName,
FinalizeCurrentProgressMonitor(); FinalizeCurrentProgressMonitor();
} }
/* /*
* Insert deferred cleanup records. * Insert deferred cleanup records.
* The shards will be dropped by background cleaner later. * The shards will be dropped by background cleaner later.
@ -1534,28 +1544,30 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
foreach_declared_ptr(shardInterval, shardIntervalList) foreach_declared_ptr(shardInterval, shardIntervalList)
{ {
/* /*
* For each shard we first create the shard table in a separate * For each shard we first create the shard table in a separate
* transaction and then we copy the data and create the indexes in a * transaction and then we copy the data and create the indexes in a
* second separate transaction. The reason we don't do both in a single * second separate transaction. The reason we don't do both in a single
* transaction is so we can see the size of the new shard growing * transaction is so we can see the size of the new shard growing
* during the copy when we run get_rebalance_progress in another * during the copy when we run get_rebalance_progress in another
* session. If we wouldn't split these two phases up, then the table * session. If we wouldn't split these two phases up, then the table
* wouldn't be visible in the session that get_rebalance_progress uses. * wouldn't be visible in the session that get_rebalance_progress uses.
* So get_rebalance_progress would always report its size as 0. * So get_rebalance_progress would always report its size as 0.
*/ */
List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, List *ddlCommandList = RecreateShardDDLCommandList(shardInterval,
sourceNodePort); sourceNodeName,
sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId); char *tableOwner = TableOwner(shardInterval->relationId);
/* drop the shard we created on the target, in case of failure */ /* drop the shard we created on the target, in case of failure */
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval), ConstructQualifiedShardName(
GroupForNode(targetNodeName, shardInterval),
targetNodePort), GroupForNode(targetNodeName,
CLEANUP_ON_FAILURE); targetNodePort),
CLEANUP_ON_FAILURE);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList); tableOwner, ddlCommandList);
} }
UpdatePlacementUpdateStatusForShardIntervalList( UpdatePlacementUpdateStatusForShardIntervalList(
@ -1578,10 +1590,10 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
{ {
List *ddlCommandList = List *ddlCommandList =
PostLoadShardCreationCommandList(shardInterval, sourceNodeName, PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
sourceNodePort); sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId); char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList); tableOwner, ddlCommandList);
MemoryContextReset(localContext); MemoryContextReset(localContext);
} }
@ -1594,9 +1606,9 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
if (!(optionFlags & SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS)) if (!(optionFlags & SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS))
{ {
/* /*
* Once all shards are copied, we can recreate relationships between shards. * Once all shards are copied, we can recreate relationships between shards.
* Create DDL commands to Attach child tables to their parents in a partitioning hierarchy. * Create DDL commands to Attach child tables to their parents in a partitioning hierarchy.
*/ */
List *shardIntervalWithDDCommandsList = NIL; List *shardIntervalWithDDCommandsList = NIL;
foreach_declared_ptr(shardInterval, shardIntervalList) foreach_declared_ptr(shardInterval, shardIntervalList)
{ {
@ -1609,7 +1621,7 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
shardInterval, shardInterval,
list_make1(attachPartitionCommand)); list_make1(attachPartitionCommand));
shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList,
shardCommandList); shardCommandList);
} }
} }
@ -1620,24 +1632,26 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS); PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS);
/* /*
* Iterate through the colocated shards and create DDL commamnds * Iterate through the colocated shards and create DDL commamnds
* to create the foreign constraints. * to create the foreign constraints.
*/ */
foreach_declared_ptr(shardInterval, shardIntervalList) foreach_declared_ptr(shardInterval, shardIntervalList)
{ {
List *shardForeignConstraintCommandList = NIL; List *shardForeignConstraintCommandList = NIL;
List *referenceTableForeignConstraintList = NIL; List *referenceTableForeignConstraintList = NIL;
CopyShardForeignConstraintCommandListGrouped(shardInterval, CopyShardForeignConstraintCommandListGrouped(shardInterval,
&shardForeignConstraintCommandList, &
&referenceTableForeignConstraintList); shardForeignConstraintCommandList,
&
referenceTableForeignConstraintList);
ShardCommandList *shardCommandList = CreateShardCommandList( ShardCommandList *shardCommandList = CreateShardCommandList(
shardInterval, shardInterval,
list_concat(shardForeignConstraintCommandList, list_concat(shardForeignConstraintCommandList,
referenceTableForeignConstraintList)); referenceTableForeignConstraintList));
shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList, shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList,
shardCommandList); shardCommandList);
} }
/* Now execute the Partitioning & Foreign constraints creation commads. */ /* Now execute the Partitioning & Foreign constraints creation commads. */
@ -1646,8 +1660,8 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
{ {
char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId); char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, tableOwner,
shardCommandList->ddlCommandList); shardCommandList->ddlCommandList);
} }
UpdatePlacementUpdateStatusForShardIntervalList( UpdatePlacementUpdateStatusForShardIntervalList(
@ -1736,7 +1750,8 @@ CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardInte
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList, ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList,
MaxAdaptiveExecutorPoolSize, MaxAdaptiveExecutorPoolSize,
NULL /* jobIdList (ignored by API implementation) */); NULL /* jobIdList (ignored by API implementation) */
);
} }