From 8b4956e9e92e9b2ee697cfb21dd7786666d2fa56 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Fri, 24 Jun 2022 09:56:06 -0700 Subject: [PATCH] Reindent --- .../citus_split_shard_by_split_points.c | 2 +- .../distributed/operations/shard_split.c | 29 ++++++++++------ .../operations/worker_shard_copy.c | 34 ++++++++++++------- .../operations/worker_split_copy_udf.c | 2 +- 4 files changed, 43 insertions(+), 24 deletions(-) diff --git a/src/backend/distributed/operations/citus_split_shard_by_split_points.c b/src/backend/distributed/operations/citus_split_shard_by_split_points.c index 1392f40fc..848597e38 100644 --- a/src/backend/distributed/operations/citus_split_shard_by_split_points.c +++ b/src/backend/distributed/operations/citus_split_shard_by_split_points.c @@ -30,7 +30,7 @@ PG_FUNCTION_INFO_V1(citus_split_shard_by_split_points); static SplitMode LookupSplitMode(Oid shardSplitModeOid); /* - * citus_split_shard_by_split_points(shard_id bigint, split_points integer[], node_ids integer[]) + * citus_split_shard_by_split_points(shard_id bigint, split_points text[], node_ids integer[], split_mode citus.split_mode) * Split source shard into multiple shards using the given split points. * 'shard_id' is the id of source shard to split. * 'split_points' is an array that represents the split points. diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index ecd201720..181a4c1d4 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -60,8 +60,10 @@ static void DoSplitCopy(WorkerNode *sourceShardNode, static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List *splitChildrenShardIntervalList, List *workersForPlacementList); -static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList); -static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList); +static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, + List *workersForPlacementList); +static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, + List *workersForPlacementList); /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = @@ -410,8 +412,10 @@ BlockingShardSplit(SplitOperation splitOperation, /* Only single placement allowed (already validated by caller) */ List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId); Assert(sourcePlacementList->length == 1); - ShardPlacement *sourceShardPlacement = (ShardPlacement *) linitial(sourcePlacementList); - WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, false /* missingOk */); + ShardPlacement *sourceShardPlacement = (ShardPlacement *) linitial( + sourcePlacementList); + WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, + false /* missingOk */); /* Physically create split children and perform split copy */ CreateSplitShardsForShardGroup( @@ -428,7 +432,8 @@ BlockingShardSplit(SplitOperation splitOperation, DropShardList(sourceColocatedShardIntervalList); /* Insert new shard and placement metdata */ - InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, workersForPlacementList); + InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, + workersForPlacementList); /* * Create foreign keys if exists after the metadata changes happening in @@ -679,7 +684,8 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard, * Insert new shard and placement metadata. */ static void -InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList) +InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, + List *workersForPlacementList) { /* Iterate on shard intervals for shard group */ List *shardIntervalList = NULL; @@ -727,10 +733,11 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *wo * Create foreign key constraints on the split children shards. */ static void -CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList) +CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, + List *workersForPlacementList) { /* Create constraints between shards */ - List* shardIntervalList = NULL; + List *shardIntervalList = NULL; foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { ShardInterval *shardInterval = NULL; @@ -742,8 +749,10 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workers List *referenceTableForeignConstraintList = NIL; CopyShardForeignConstraintCommandListGrouped(shardInterval, - &shardForeignConstraintCommandList, - &referenceTableForeignConstraintList); + & + shardForeignConstraintCommandList, + & + referenceTableForeignConstraintList); List *commandList = NIL; commandList = list_concat(commandList, shardForeignConstraintCommandList); diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index b6f146faf..76d751e2b 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -99,6 +99,7 @@ ShouldSendCopyNow(StringInfo buffer) return buffer->len > LocalCopyFlushThresholdByte; } + static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) { @@ -107,10 +108,10 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, false /* missingOk */); copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, - workerNode->workerName, - workerNode->workerPort, - currentUser, - NULL /* database (current) */); + workerNode->workerName, + workerNode->workerPort, + currentUser, + NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); StringInfo copyStatement = ConstructCopyStatement( @@ -122,7 +123,8 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) ReportConnectionError(copyDest->connection, ERROR); } - PGresult *result = GetRemoteCommandResult(copyDest->connection, true /* raiseInterrupts */); + PGresult *result = GetRemoteCommandResult(copyDest->connection, + true /* raiseInterrupts */); if (PQresultStatus(result) != PGRES_COPY_IN) { ReportResultError(copyDest->connection, result, ERROR); @@ -131,6 +133,7 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) PQclear(result); } + static bool ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { @@ -179,8 +182,10 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) if (!PutRemoteCopyData(copyDest->connection, copyOutState->fe_msgbuf->data, copyOutState->fe_msgbuf->len)) { - char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName); - char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName); + char *destinationShardSchemaName = linitial( + copyDest->destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond( + copyDest->destinationShardFullyQualifiedName); char *errorMessage = PQerrorMessage(copyDest->connection->pgConn); @@ -253,12 +258,15 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) /* end the COPY input */ if (!PutRemoteCopyEnd(copyDest->connection, NULL /* errormsg */)) { - char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName); - char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName); + char *destinationShardSchemaName = linitial( + copyDest->destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond( + copyDest->destinationShardFullyQualifiedName); ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("Failed to COPY to destination shard %s.%s", - destinationShardSchemaName, destinationShardRelationName))); + destinationShardSchemaName, + destinationShardRelationName))); } /* check whether there were any COPY errors */ @@ -396,8 +404,10 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState */ LocalCopyBuffer = localCopyOutState->fe_msgbuf; - char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName); - char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName); + char *destinationShardSchemaName = linitial( + copyDest->destinationShardFullyQualifiedName); + char *destinationShardRelationName = lsecond( + copyDest->destinationShardFullyQualifiedName); Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false /* missing_ok */); diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index aff3e1239..72894ce52 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -56,7 +56,7 @@ worker_split_copy(PG_FUNCTION_ARGS) splitCopyInfoList); char *sourceShardToCopyName = generate_qualified_relation_name( - shardIntervalToSplitCopy->relationId); + shardIntervalToSplitCopy->relationId); AppendShardIdToName(&sourceShardToCopyName, shardIdToSplitCopy); StringInfo selectShardQueryForCopy = makeStringInfo();