From 3d46860fbb20d5766e8a9c77d360c416bad4a450 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Thu, 25 Aug 2022 18:45:41 -0700 Subject: [PATCH] Reindent --- .../distributed/operations/shard_cleaner.c | 56 +++++++++++-------- .../distributed/operations/shard_split.c | 28 +++++----- src/include/distributed/pg_dist_cleanup.h | 1 - src/include/distributed/shard_cleaner.h | 4 +- 4 files changed, 49 insertions(+), 40 deletions(-) diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index b203b663a..5ed339bca 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -69,7 +69,8 @@ PG_FUNCTION_INFO_V1(citus_cleanup_orphaned_shards); PG_FUNCTION_INFO_V1(isolation_cleanup_orphaned_shards); static int DropOrphanedShardsForMove(bool waitForLocks); -static bool TryDropShardOutsideTransaction(char *qualifiedTableName, char *nodeName, int nodePort); +static bool TryDropShardOutsideTransaction(char *qualifiedTableName, char *nodeName, int + nodePort); static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode); /* Functions for cleanup infrastructure */ @@ -209,7 +210,7 @@ DropOrphanedShardsForCleanup() if (record->objectType != CLEANUP_SHARD_PLACEMENT) { ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ", - record->objectType))); + record->objectType))); continue; } @@ -231,7 +232,7 @@ DropOrphanedShardsForCleanup() } if (TryDropShardOutsideTransaction(qualifiedTableName, workerNode->workerName, - workerNode->workerPort)) + workerNode->workerPort)) { /* delete the cleanup record */ DeleteCleanupRecordByRecordId(record->recordId); @@ -246,7 +247,8 @@ DropOrphanedShardsForCleanup() if (failedShardCountForCleanup > 0) { ereport(WARNING, (errmsg("Failed to drop %d cleanup shards out of %d", - failedShardCountForCleanup, list_length(cleanupRecordList)))); + failedShardCountForCleanup, list_length( + cleanupRecordList)))); } return removedShardCountForCleanup; @@ -321,7 +323,7 @@ DropOrphanedShardsForMove(bool waitForLocks) char *qualifiedTableName = ConstructQualifiedShardName(shardInterval); if (TryDropShardOutsideTransaction(qualifiedTableName, shardPlacement->nodeName, - shardPlacement->nodePort)) + shardPlacement->nodePort)) { /* delete the actual placement */ DeleteShardPlacementRow(placement->placementId); @@ -359,13 +361,13 @@ CompleteNewOperationNeedingCleanup(bool isSuccess) { /* * As part of operation completion: - * 1. Drop all resources of CurrentOperationId that are marked with 'CLEANUP_ALWAYS' policy and + * 1. Drop all resources of CurrentOperationId that are marked with 'CLEANUP_ALWAYS' policy and * the respective cleanup records in seperate transaction. * * 2. For all resources of CurrentOperationId that are marked with 'CLEANUP_ON_FAILURE': * a) If isSuccess = true, drop cleanup records as operation is nearing completion. * As the operation is nearing successful completion. This is done as part of the - * same transaction so will rollback in case of potential failure later. + * same transaction so will rollback in case of potential failure later. * * b) If isSuccess = false, drop resource and cleanup records in a seperate transaction. */ @@ -387,18 +389,18 @@ CompleteNewOperationNeedingCleanup(bool isSuccess) if (record->objectType != CLEANUP_SHARD_PLACEMENT) { ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ", - record->objectType))); + record->objectType))); continue; } if (record->policy == CLEANUP_ALWAYS || - (record->policy == CLEANUP_ON_FAILURE && !isSuccess)) + (record->policy == CLEANUP_ON_FAILURE && !isSuccess)) { char *qualifiedTableName = record->objectName; WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId); if (TryDropShardOutsideTransaction(qualifiedTableName, workerNode->workerName, - workerNode->workerPort)) + workerNode->workerPort)) { DeleteCleanupRecordByRecordIdOutsideTransaction(record->recordId); removedShardCountOnComplete++; @@ -417,12 +419,14 @@ CompleteNewOperationNeedingCleanup(bool isSuccess) if (list_length(currentOperationRecordList) > 0) { ereport(LOG, (errmsg("Removed %d orphaned shards out of %d", - removedShardCountOnComplete, list_length(currentOperationRecordList)))); + removedShardCountOnComplete, list_length( + currentOperationRecordList)))); if (failedShardCountOnComplete > 0) { ereport(WARNING, (errmsg("Failed to drop %d cleanup shards out of %d", - failedShardCountOnComplete, list_length(currentOperationRecordList)))); + failedShardCountOnComplete, list_length( + currentOperationRecordList)))); } } } @@ -459,7 +463,7 @@ InsertCleanupRecordInCurrentTransaction(CleanupObject objectType, values[Anum_pg_dist_cleanup_object_type - 1] = Int32GetDatum(objectType); values[Anum_pg_dist_cleanup_object_name - 1] = CStringGetTextDatum(objectName); values[Anum_pg_dist_cleanup_node_group_id - 1] = Int32GetDatum(nodeGroupId); - values[Anum_pg_dist_cleanup_policy_type -1] = Int32GetDatum(policy); + values[Anum_pg_dist_cleanup_policy_type - 1] = Int32GetDatum(policy); /* open cleanup relation and insert new tuple */ Oid relationId = DistCleanupRelationId(); @@ -507,9 +511,9 @@ InsertCleanupRecordInSubtransaction(CleanupObject objectType, policy); SendCommandListToWorkerOutsideTransaction(LocalHostName, - PostPortNumber, - CitusExtensionOwnerName(), - list_make1(command->data)); + PostPortNumber, + CitusExtensionOwnerName(), + list_make1(command->data)); } @@ -525,9 +529,9 @@ DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId) recordId); SendCommandListToWorkerOutsideTransaction(LocalHostName, - PostPortNumber, - CitusExtensionOwnerName(), - list_make1(command->data)); + PostPortNumber, + CitusExtensionOwnerName(), + list_make1(command->data)); } @@ -621,8 +625,8 @@ GetNextOperationId() /* token location, or -1 if unknown */ const int location = -1; RangeVar *sequenceName = makeRangeVar(PG_CATALOG, - OPERATIONID_SEQUENCE_NAME, - location); + OPERATIONID_SEQUENCE_NAME, + location); bool missingOK = false; Oid sequenceId = RangeVarGetRelid(sequenceName, NoLock, missingOK); @@ -680,7 +684,8 @@ ListCleanupRecordsForCurrentOperation(void) int scanKeyCount = 1; Oid scanIndexId = InvalidOid; bool useIndex = false; - SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanup, scanIndexId, useIndex, NULL, + SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanup, scanIndexId, useIndex, + NULL, scanKeyCount, scanKey); HeapTuple heapTuple = NULL; @@ -697,6 +702,7 @@ ListCleanupRecordsForCurrentOperation(void) return recordList; } + /* * TupleToCleanupRecord converts a pg_dist_cleanup record tuple into a CleanupRecord struct. */ @@ -736,7 +742,7 @@ static bool CleanupRecordExists(uint64 recordId) { Relation pgDistCleanup = table_open(DistCleanupRelationId(), - AccessShareLock); + AccessShareLock); const int scanKeyCount = 1; ScanKeyData scanKey[1]; @@ -769,7 +775,7 @@ static void DeleteCleanupRecordByRecordId(uint64 recordId) { Relation pgDistCleanup = table_open(DistCleanupRelationId(), - RowExclusiveLock); + RowExclusiveLock); const int scanKeyCount = 1; ScanKeyData scanKey[1]; @@ -808,6 +814,7 @@ static uint64 GetNextCleanupRecordId(void) { uint64 recordId = INVALID_CLEANUP_RECORD_ID; + /* * In regression tests, we would like to generate record IDs consistently * even if the tests run in parallel. Instead of the sequence, we can use @@ -844,6 +851,7 @@ LockOperationId(OperationId operationId) (void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); } + static bool TryLockOperationId(OperationId operationId) { diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index a77b30812..49699ed75 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -75,11 +75,10 @@ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, ShardInterval *shardIntervalToSplit, List *shardSplitPointsList, List *nodeIdsForPlacementList); -static void CreateAndCopySplitShardsForShardGroup( - WorkerNode *sourceShardNode, - List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *workersForPlacementList); +static void CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode, + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *workersForPlacementList); static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList, @@ -133,7 +132,8 @@ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList); -static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32 targetNodeId, +static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32 + targetNodeId, ShardInterval *shardInterval); static uint64 GetNextShardIdForSplitChild(void); @@ -581,7 +581,8 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, /* Log resource for cleanup in case of failure only. */ CleanupPolicy policy = CLEANUP_ON_FAILURE; InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, - ConstructQualifiedShardName(shardInterval), + ConstructQualifiedShardName( + shardInterval), workerPlacementNode->groupId, policy); @@ -1132,7 +1133,6 @@ DropShardList(List *shardIntervalList) if (DeferShardDeleteOnSplit) { - /* Log shard in pg_dist_cleanup. * Parent shards are to be dropped only on sucess after split workflow is complete, * so mark the policy as 'CLEANUP_DEFERRED_ON_SUCCESS'. @@ -1151,12 +1151,12 @@ DropShardList(List *shardIntervalList) if (storageType == SHARD_STORAGE_TABLE) { appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, - qualifiedShardName); + qualifiedShardName); } else if (storageType == SHARD_STORAGE_FOREIGN) { appendStringInfo(dropQuery, DROP_FOREIGN_TABLE_COMMAND, - qualifiedShardName); + qualifiedShardName); } /* drop old shard */ @@ -1232,7 +1232,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, * information. */ HTAB *mapOfPlacementToDummyShardList = CreateSimpleHash(NodeAndOwner, - GroupedShardSplitInfos); + GroupedShardSplitInfos); CreateDummyShardsForShardGroup( mapOfPlacementToDummyShardList, sourceColocatedShardIntervalList, @@ -1465,7 +1465,8 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList, */ CleanupPolicy policy = CLEANUP_ALWAYS; InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, - ConstructQualifiedShardName(shardInterval), + ConstructQualifiedShardName( + shardInterval), workerPlacementNode->groupId, policy); @@ -1509,7 +1510,8 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList, */ CleanupPolicy policy = CLEANUP_ALWAYS; InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, - ConstructQualifiedShardName(shardInterval), + ConstructQualifiedShardName( + shardInterval), workerPlacementNode->groupId, policy); diff --git a/src/include/distributed/pg_dist_cleanup.h b/src/include/distributed/pg_dist_cleanup.h index 5f77911bf..5dfcc2074 100644 --- a/src/include/distributed/pg_dist_cleanup.h +++ b/src/include/distributed/pg_dist_cleanup.h @@ -31,4 +31,3 @@ #define CLEANUPRECORDID_SEQUENCE_NAME "pg_dist_cleanup_recordid_seq" #endif /* PG_DIST_CLEANUP_H */ - diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index 60470726c..e051bf77e 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -50,13 +50,13 @@ typedef enum CleanupPolicy /* * Resources that are cleanup only on failure. - * (Example: Split Children for Blocking/Non-Blocking splits) + * (Example: Split Children for Blocking/Non-Blocking splits) */ CLEANUP_ON_FAILURE = 1, /* * Resources that need 'deferred' clean up only on success . - * (Example: Parent child being split for Blocking/Non-Blocking splits) + * (Example: Parent child being split for Blocking/Non-Blocking splits) */ CLEANUP_DEFERRED_ON_SUCCESS = 2, } CleanupPolicy;