From 29f00811695b8fb50f6f7079cabeb4a3fbd11507 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Sun, 19 Jun 2022 17:18:28 -0700 Subject: [PATCH] Reindent code --- .../citus_split_shard_by_split_points.c | 8 +- .../distributed/operations/isolate_shards.c | 1 + .../distributed/operations/shard_split.c | 77 +++++++----- .../operations/worker_shard_copy.c | 110 +++++++++++------- .../operations/worker_split_copy.c | 54 ++++++--- .../operations/worker_split_copy_udf.c | 51 +++++--- src/backend/distributed/utils/array_type.c | 14 ++- src/include/distributed/shard_split.h | 6 +- src/include/distributed/worker_shard_copy.h | 7 +- src/include/distributed/worker_split_copy.h | 8 +- 10 files changed, 216 insertions(+), 120 deletions(-) diff --git a/src/backend/distributed/operations/citus_split_shard_by_split_points.c b/src/backend/distributed/operations/citus_split_shard_by_split_points.c index 071a41073..f9a802858 100644 --- a/src/backend/distributed/operations/citus_split_shard_by_split_points.c +++ b/src/backend/distributed/operations/citus_split_shard_by_split_points.c @@ -46,10 +46,12 @@ citus_split_shard_by_split_points(PG_FUNCTION_ARGS) uint64 shardIdToSplit = DatumGetUInt64(PG_GETARG_DATUM(0)); ArrayType *splitPointsArrayObject = PG_GETARG_ARRAYTYPE_P(1); - List *shardSplitPointsList = TextArrayTypeToIntegerList(splitPointsArrayObject, INT4OID); + List *shardSplitPointsList = TextArrayTypeToIntegerList(splitPointsArrayObject, + INT4OID); ArrayType *nodeIdsArrayObject = PG_GETARG_ARRAYTYPE_P(2); - List *nodeIdsForPlacementList = TextArrayTypeToIntegerList(nodeIdsArrayObject, INT4OID); + List *nodeIdsForPlacementList = TextArrayTypeToIntegerList(nodeIdsArrayObject, + INT4OID); Oid shardSplitModeOid = PG_GETARG_OID(3); SplitMode shardSplitMode = LookupSplitMode(shardSplitModeOid); @@ -64,6 +66,7 @@ citus_split_shard_by_split_points(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + /* * LookupSplitMode maps the oids of citus.shard_split_mode enum * values to an enum. @@ -80,6 +83,7 @@ LookupSplitMode(Oid shardSplitModeOid) { shardSplitMode = BLOCKING_SPLIT; } + /* Extend with other modes as we support them */ else { diff --git a/src/backend/distributed/operations/isolate_shards.c b/src/backend/distributed/operations/isolate_shards.c index 4f377a3b3..52ef9e9f1 100644 --- a/src/backend/distributed/operations/isolate_shards.c +++ b/src/backend/distributed/operations/isolate_shards.c @@ -276,6 +276,7 @@ SplitShardByValue(ShardInterval *sourceShard, Datum distributionValueDatum) return isolatedShardId; } + /* * CreateSplitOffShards gets a shard and a hashed value to pick the split point. * First, it creates templates to create new shards. Then, for every colocated diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index b8cbc56fb..2fe862480 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -38,13 +38,13 @@ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, ShardInterval *shardIntervalToSplit, List *shardSplitPointsList, List *nodeIdsForPlacementList); -static void CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode, - List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *workersForPlacementList, - List **splitOffShardList); +static void CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode, + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *workersForPlacementList, + List **splitOffShardList); static void CreateObjectOnPlacement(List *objectCreationCommandList, - WorkerNode *workerNode); + WorkerNode *workerNode); static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, List *splitPointsForShard); static void CreateSplitIntervalsForShard(ShardInterval *sourceShard, @@ -54,8 +54,13 @@ static void BlockingShardSplit(SplitOperation splitOperation, ShardInterval *shardIntervalToSplit, List *shardSplitPointsList, List *workersForPlacementList); -static void DoSplitCopy(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList); -static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChildrenShardIntervalList, List* workersForPlacementList); +static void DoSplitCopy(WorkerNode *sourceShardNode, + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *workersForPlacementList); +static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, + List *splitChildrenShardIntervalList, + List *workersForPlacementList); /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = @@ -406,7 +411,7 @@ BlockingShardSplit(SplitOperation splitOperation, /* Only single placement allowed (already validated by caller) */ List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId); Assert(sourcePlacementList->length == 1); - WorkerNode* sourceShardToCopyNode = (WorkerNode *) linitial(sourcePlacementList); + WorkerNode *sourceShardToCopyNode = (WorkerNode *) linitial(sourcePlacementList); /* Physically create split children and perform split copy */ List *splitOffShardList = NULL; @@ -437,9 +442,10 @@ BlockingShardSplit(SplitOperation splitOperation, CitusInvalidateRelcacheByRelid(DistShardRelationId()); } + /* Create ShardGroup split children on a list of corresponding workers. */ static void -CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode, +CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList, @@ -471,9 +477,10 @@ CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode, } /* Perform Split Copy */ - DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList); + DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, workersForPlacementList); - // TODO(niupre) : Can we use Adaptive execution for creating multiple indexes parallely? + /* TODO(niupre) : Can we use Adaptive execution for creating multiple indexes parallely? */ foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { ShardInterval *shardInterval = NULL; @@ -494,6 +501,7 @@ CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode, } } + /* * Perform Split Copy from source shard(s) to split children. * 'sourceShardNode' : Source shard worker node. @@ -502,7 +510,8 @@ CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode, * 'workersForPlacementList' : List of workers for split children placement. */ static void -DoSplitCopy(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList) +DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList) { ShardInterval *sourceShardIntervalToCopy = NULL; List *splitShardIntervalList = NULL; @@ -510,9 +519,11 @@ DoSplitCopy(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList, int taskId = 0; List *splitCopyTaskList = NULL; forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, - splitShardIntervalList, shardGroupSplitIntervalListList) + splitShardIntervalList, shardGroupSplitIntervalListList) { - StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy, splitShardIntervalList, destinationWorkerNodesList); + StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy, + splitShardIntervalList, + destinationWorkerNodesList); Task *task = CreateBasicTask( sourceShardIntervalToCopy->shardId, /* jobId */ @@ -529,9 +540,11 @@ DoSplitCopy(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList, taskId++; } - ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList, MaxAdaptiveExecutorPoolSize, NULL /* jobIdList */); + ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList, + MaxAdaptiveExecutorPoolSize, NULL /* jobIdList */); } + /* * Create Copy command for a given shard source shard to be copied to corresponding split children. * 'sourceShardSplitInterval' : Source shard interval to be copied. @@ -539,7 +552,9 @@ DoSplitCopy(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList, * 'destinationWorkerNodesList' : List of workers for split children placement. */ static StringInfo -CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChildrenShardIntervalList, List* destinationWorkerNodesList) +CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, + List *splitChildrenShardIntervalList, + List *destinationWorkerNodesList) { StringInfo splitCopyInfoArray = makeStringInfo(); appendStringInfo(splitCopyInfoArray, "ARRAY["); @@ -547,19 +562,21 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChild ShardInterval *splitChildShardInterval = NULL; bool addComma = false; WorkerNode *destinationWorkerNode = NULL; - forboth_ptr(splitChildShardInterval, splitChildrenShardIntervalList, destinationWorkerNode, destinationWorkerNodesList) + forboth_ptr(splitChildShardInterval, splitChildrenShardIntervalList, + destinationWorkerNode, destinationWorkerNodesList) { - if(addComma) + if (addComma) { appendStringInfo(splitCopyInfoArray, ","); } StringInfo splitCopyInfoRow = makeStringInfo(); - appendStringInfo(splitCopyInfoRow, "ROW(%lu, %lu, %lu, %u)::citus.split_copy_info", - splitChildShardInterval->shardId, - splitChildShardInterval->minValue, - splitChildShardInterval->maxValue, - destinationWorkerNode->nodeId); + appendStringInfo(splitCopyInfoRow, + "ROW(%lu, %lu, %lu, %u)::citus.split_copy_info", + splitChildShardInterval->shardId, + splitChildShardInterval->minValue, + splitChildShardInterval->maxValue, + destinationWorkerNode->nodeId); appendStringInfo(splitCopyInfoArray, "%s", splitCopyInfoRow->data); addComma = true; @@ -568,18 +585,19 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChild StringInfo splitCopyUdf = makeStringInfo(); appendStringInfo(splitCopyUdf, "SELECT worker_split_copy(%lu, %s);", - sourceShardSplitInterval->shardId, - splitCopyInfoArray->data); + sourceShardSplitInterval->shardId, + splitCopyInfoArray->data); return splitCopyUdf; } + /* * Create an object (shard/index) on a worker node. */ static void CreateObjectOnPlacement(List *objectCreationCommandList, - WorkerNode *workerPlacementNode) + WorkerNode *workerPlacementNode) { char *currentUser = CurrentUserName(); SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName, @@ -588,6 +606,7 @@ CreateObjectOnPlacement(List *objectCreationCommandList, objectCreationCommandList); } + /* * Create split children intervals for a shardgroup given list of split points. */ @@ -657,6 +676,7 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard, } } + /* * InsertSplitOffShardMetadata inserts new shard and shard placement data into * catolog tables both the coordinator and mx nodes. @@ -710,6 +730,7 @@ InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList) } } + /* * DropShardList drops shards and their metadata from both the coordinator and * mx nodes. @@ -775,6 +796,7 @@ DropShardList(List *shardIntervalList) } } + /* * CreateForeignConstraints creates the foreign constraints on the newly * created shards via the tenant isolation. @@ -847,6 +869,7 @@ CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList) } } + /* * ExecuteCommandListOnPlacements runs the given command list on the nodes of * the given shard placement list. First, it creates connections. Then it sends diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 2f74b2417..3a1539cbf 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -38,7 +38,7 @@ typedef struct ShardCopyDestReceiver DestReceiver pub; /* Destination Relation Name */ - char* destinationShardFullyQualifiedName; + char *destinationShardFullyQualifiedName; /* descriptor of the tuples that are sent to the worker */ TupleDesc tupleDescriptor; @@ -61,29 +61,32 @@ typedef struct ShardCopyDestReceiver /* * Connection for destination shard (NULL if useLocalCopy is true) - */ + */ MultiConnection *connection; - } ShardCopyDestReceiver; static bool ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); static void ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, - TupleDesc inputTupleDescriptor); + TupleDesc inputTupleDescriptor); static void ShardCopyDestReceiverShutdown(DestReceiver *destReceiver); static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver); static bool CanUseLocalCopy(uint64 destinationNodeId); -static StringInfo ConstructCopyStatement(char* destinationShardFullyQualifiedName, bool useBinaryFormat); +static StringInfo ConstructCopyStatement(char *destinationShardFullyQualifiedName, bool + useBinaryFormat); static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest); static bool ShouldSendCopyNow(StringInfo buffer); -static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); -static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); +static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); +static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState + localCopyOutState); -static bool CanUseLocalCopy(uint64 destinationNodeId) +static bool +CanUseLocalCopy(uint64 destinationNodeId) { /* If destination node is same as source, use local copy */ return GetLocalNodeId() == destinationNodeId; } + /* * ShouldSendCopyNow returns true if the given buffer size exceeds the * local copy buffer size threshold. @@ -95,6 +98,7 @@ ShouldSendCopyNow(StringInfo buffer) return buffer->len > LocalCopyFlushThresholdByte; } + static bool ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { @@ -109,19 +113,21 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); /* Create connection lazily */ - if(copyDest->tuplesSent == 0 && (!copyDest->useLocalCopy)) + if (copyDest->tuplesSent == 0 && (!copyDest->useLocalCopy)) { int connectionFlags = OUTSIDE_TRANSACTION; char *currentUser = CurrentUserName(); - WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, false /* missingOk */); + WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, + false /* missingOk */); copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags, - workerNode->workerName, - workerNode->workerPort, - currentUser, - NULL /* database (current) */); + workerNode->workerName, + workerNode->workerPort, + currentUser, + NULL /* database (current) */); ClaimConnectionExclusively(copyDest->connection); - StringInfo copyStatement = ConstructCopyStatement(copyDest->destinationShardFullyQualifiedName, + StringInfo copyStatement = ConstructCopyStatement( + copyDest->destinationShardFullyQualifiedName, copyDest->copyOutState->binary); ExecuteCriticalRemoteCommand(copyDest->connection, copyStatement->data); } @@ -131,7 +137,7 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) bool *columnNulls = slot->tts_isnull; CopyOutState copyOutState = copyDest->copyOutState; - if(copyDest->useLocalCopy) + if (copyDest->useLocalCopy) { WriteLocalTuple(slot, copyDest); if (ShouldSendCopyNow(copyOutState->fe_msgbuf)) @@ -145,15 +151,18 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) resetStringInfo(copyOutState->fe_msgbuf); AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor, - copyOutState, columnOutputFunctions, NULL /* columnCoercionPaths */); - if (!PutRemoteCopyData(copyDest->connection, copyOutState->fe_msgbuf->data, copyOutState->fe_msgbuf->len)) + copyOutState, columnOutputFunctions, + NULL /* columnCoercionPaths */); + if (!PutRemoteCopyData(copyDest->connection, copyOutState->fe_msgbuf->data, + copyOutState->fe_msgbuf->len)) { ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("Failed to COPY to shard %s,", - copyDest->destinationShardFullyQualifiedName), - errdetail("failed to send %d bytes %s on node %u", copyOutState->fe_msgbuf->len, - copyOutState->fe_msgbuf->data, - copyDest->destinationNodeId))); + errmsg("Failed to COPY to shard %s,", + copyDest->destinationShardFullyQualifiedName), + errdetail("failed to send %d bytes %s on node %u", + copyOutState->fe_msgbuf->len, + copyOutState->fe_msgbuf->data, + copyDest->destinationNodeId))); } } @@ -164,8 +173,10 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) return true; } + static void -ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor) +ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc + inputTupleDescriptor) { ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest; copyDest->tupleDescriptor = inputTupleDescriptor; @@ -183,16 +194,17 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputT copyOutState->delim = (char *) delimiterCharacter; copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState); copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, - copyOutState->binary); + copyOutState->binary); copyDest->copyOutState = copyOutState; } + static void ShardCopyDestReceiverShutdown(DestReceiver *dest) { ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest; - if(copyDest->useLocalCopy) + if (copyDest->useLocalCopy) { if (copyDest->copyOutState != NULL && copyDest->copyOutState->fe_msgbuf->len > 0) @@ -200,10 +212,10 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) LocalCopyToShard(copyDest, copyDest->copyOutState); } } - else if(copyDest->connection != NULL) + else if (copyDest->connection != NULL) { resetStringInfo(copyDest->copyOutState->fe_msgbuf); - if(copyDest->copyOutState->binary) + if (copyDest->copyOutState->binary) { AppendCopyBinaryFooters(copyDest->copyOutState); } @@ -217,7 +229,8 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) } /* check whether there were any COPY errors */ - PGresult *result = GetRemoteCommandResult(copyDest->connection, true /* raiseInterrupts */); + PGresult *result = GetRemoteCommandResult(copyDest->connection, + true /* raiseInterrupts */); if (PQresultStatus(result) != PGRES_COMMAND_OK) { ReportCopyError(copyDest->connection, result); @@ -229,10 +242,11 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) } } -DestReceiver * CreateShardCopyDestReceiver( - EState *executorState, - char* destinationShardFullyQualifiedName, - uint32_t destinationNodeId) + +DestReceiver * +CreateShardCopyDestReceiver(EState *executorState, + char *destinationShardFullyQualifiedName, + uint32_t destinationNodeId) { ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) palloc0( sizeof(ShardCopyDestReceiver)); @@ -254,6 +268,7 @@ DestReceiver * CreateShardCopyDestReceiver( return (DestReceiver *) copyDest; } + static void ShardCopyDestReceiverDestroy(DestReceiver *dest) { @@ -272,6 +287,7 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest) pfree(copyDest); } + /* * ConstructCopyStatement constructs the text of a COPY statement * for copying into a result table @@ -279,11 +295,11 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest) static StringInfo ConstructCopyStatement(char *destinationShardFullyQualifiedName, bool useBinaryFormat) { - StringInfo command = makeStringInfo(); + StringInfo command = makeStringInfo(); appendStringInfo(command, "COPY %s FROM STDIN", - destinationShardFullyQualifiedName); + destinationShardFullyQualifiedName); - if(useBinaryFormat) + if (useBinaryFormat) { appendStringInfo(command, "WITH (format binary);"); } @@ -295,7 +311,9 @@ ConstructCopyStatement(char *destinationShardFullyQualifiedName, bool useBinaryF return command; } -static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest) + +static void +WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest) { CopyOutState localCopyOutState = copyDest->copyOutState; @@ -306,7 +324,8 @@ static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDes SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); bool isBinaryCopy = localCopyOutState->binary; - bool shouldAddBinaryHeaders = (isBinaryCopy && localCopyOutState->fe_msgbuf->len == 0); + bool shouldAddBinaryHeaders = (isBinaryCopy && localCopyOutState->fe_msgbuf->len == + 0); if (shouldAddBinaryHeaders) { AppendCopyBinaryHeaders(localCopyOutState); @@ -321,6 +340,7 @@ static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDes NULL /* columnCoercionPaths */); } + /* * LocalCopyToShard finishes local copy for the given destination shard. */ @@ -342,10 +362,13 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState char *destinationShardSchemaName = NULL; char *destinationShardRelationName = NULL; - DeconstructQualifiedName(list_make1(copyDest->destinationShardFullyQualifiedName), &destinationShardSchemaName, &destinationShardRelationName); + DeconstructQualifiedName(list_make1(copyDest->destinationShardFullyQualifiedName), + &destinationShardSchemaName, &destinationShardRelationName); - Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false /* missing_ok */); - Oid destinationShardOid = get_relname_relid(destinationShardRelationName, destinationSchemaOid); + Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, + false /* missing_ok */); + Oid destinationShardOid = get_relname_relid(destinationShardRelationName, + destinationSchemaOid); DefElem *binaryFormatOption = NULL; if (isBinaryCopy) @@ -356,7 +379,8 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState Relation shard = table_open(destinationShardOid, RowExclusiveLock); ParseState *pState = make_parsestate(NULL /* parentParseState */); (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, - NULL /* alias */, false /* inh */, false /* inFromCl */); + NULL /* alias */, false /* inh */, + false /* inFromCl */); CopyFromState cstate = BeginCopyFrom_compat(pState, shard, NULL /* whereClause */, NULL /* fileName */, @@ -373,6 +397,7 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState free_parsestate(pState); } + /* * ReadFromLocalBufferCallback is the copy callback. * It always tries to copy maxRead bytes. @@ -393,4 +418,3 @@ ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead) return bytesRead; } - diff --git a/src/backend/distributed/operations/worker_split_copy.c b/src/backend/distributed/operations/worker_split_copy.c index 4df59e62a..7c22fa485 100644 --- a/src/backend/distributed/operations/worker_split_copy.c +++ b/src/backend/distributed/operations/worker_split_copy.c @@ -45,13 +45,15 @@ typedef struct SplitCopyDestReceiver static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, - TupleDesc inputTupleDescriptor); + TupleDesc inputTupleDescriptor); static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, - DestReceiver *dest); + DestReceiver *dest); static void SplitCopyDestReceiverShutdown(DestReceiver *dest); static void SplitCopyDestReceiverDestroy(DestReceiver *copyDest); -DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, List* splitCopyInfoList) +DestReceiver * +CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, + List *splitCopyInfoList) { SplitCopyDestReceiver *splitCopyDest = palloc0(sizeof(SplitCopyDestReceiver)); @@ -67,8 +69,10 @@ DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceS ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(sourceShardIdToCopy); splitCopyDest->sourceShardRelationOid = shardIntervalToSplitCopy->relationId; - DestReceiver **shardCopyDests = palloc0(splitCopyDest->splitFactor * sizeof(DestReceiver *)); - SplitCopyInfo **splitCopyInfos = palloc0(splitCopyDest->splitFactor * sizeof(SplitCopyInfo *)); + DestReceiver **shardCopyDests = palloc0(splitCopyDest->splitFactor * + sizeof(DestReceiver *)); + SplitCopyInfo **splitCopyInfos = palloc0(splitCopyDest->splitFactor * + sizeof(SplitCopyInfo *)); SplitCopyInfo *splitCopyInfo = NULL; int index = 0; @@ -76,12 +80,15 @@ DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceS char *sourceShardNamePrefix = get_rel_name(shardIntervalToSplitCopy->relationId); foreach_ptr(splitCopyInfo, splitCopyInfoList) { - char *destinationShardSchemaName = get_namespace_name(get_rel_namespace(splitCopyDest->sourceShardRelationOid));; + char *destinationShardSchemaName = get_namespace_name(get_rel_namespace( + splitCopyDest-> + sourceShardRelationOid)); char *destinationShardNameCopy = strdup(sourceShardNamePrefix); AppendShardIdToName(&destinationShardNameCopy, splitCopyInfo->destinationShardId); char *destinationShardFullyQualifiedName = - quote_qualified_identifier(destinationShardSchemaName, destinationShardNameCopy); + quote_qualified_identifier(destinationShardSchemaName, + destinationShardNameCopy); DestReceiver *shardCopyDest = CreateShardCopyDestReceiver( executorState, @@ -99,7 +106,10 @@ DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceS return (DestReceiver *) splitCopyDest; } -static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor) + +static void +SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc + inputTupleDescriptor) { SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest; @@ -110,7 +120,9 @@ static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, Tupl } } -static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) + +static bool +SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest; @@ -119,7 +131,8 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *des MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(copyDest->sourceShardRelationOid); + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry( + copyDest->sourceShardRelationOid); if (cacheEntry == NULL) { ereport(ERROR, errmsg("Could not find shard %s for split copy.", @@ -137,21 +150,24 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *des /* Partition Column Value cannot be null */ if (columnNulls[partitionColumnIndex]) { - ereport(ERROR, errmsg("Found null partition value for shard %s during split copy.", - get_rel_name(copyDest->sourceShardRelationOid))); + ereport(ERROR, errmsg( + "Found null partition value for shard %s during split copy.", + get_rel_name(copyDest->sourceShardRelationOid))); } - Datum hashedValueDatum = FunctionCall1(hashFunction, columnValues[partitionColumnIndex]); + Datum hashedValueDatum = FunctionCall1(hashFunction, + columnValues[partitionColumnIndex]); int32_t hashedValue = DatumGetInt32(hashedValueDatum); - for(int index = 0 ; index < copyDest->splitFactor; index++) + for (int index = 0; index < copyDest->splitFactor; index++) { SplitCopyInfo *splitCopyInfo = copyDest->splitCopyInfoArray[index]; if (splitCopyInfo->destinationShardMinHashValue <= hashedValue && splitCopyInfo->destinationShardMaxHashValue >= hashedValue) { - DestReceiver *shardCopyDestReceiver = copyDest->shardCopyDestReceiverArray[index]; + DestReceiver *shardCopyDestReceiver = + copyDest->shardCopyDestReceiverArray[index]; shardCopyDestReceiver->receiveSlot(slot, shardCopyDestReceiver); } } @@ -162,7 +178,9 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *des return true; } -static void SplitCopyDestReceiverShutdown(DestReceiver *dest) + +static void +SplitCopyDestReceiverShutdown(DestReceiver *dest) { SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest; @@ -173,7 +191,9 @@ static void SplitCopyDestReceiverShutdown(DestReceiver *dest) } } -static void SplitCopyDestReceiverDestroy(DestReceiver *dest) + +static void +SplitCopyDestReceiverDestroy(DestReceiver *dest) { SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest; diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index 10272fd8d..0193a2827 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -17,8 +17,8 @@ PG_FUNCTION_INFO_V1(worker_split_copy); -static void -ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo** splitCopyInfo); +static void ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, + SplitCopyInfo **splitCopyInfo); /* * @@ -33,13 +33,15 @@ worker_split_copy(PG_FUNCTION_ARGS) if (array_contains_nulls(splitCopyInfoArrayObject)) { ereport(ERROR, - (errmsg("Shard Copy Info cannot have null values."))); + (errmsg("Shard Copy Info cannot have null values."))); } - ArrayIterator copyInfo_iterator = array_create_iterator(splitCopyInfoArrayObject, 0 /* slice_ndim */, NULL /* mState */); + ArrayIterator copyInfo_iterator = array_create_iterator(splitCopyInfoArrayObject, + 0 /* slice_ndim */, + NULL /* mState */); Datum copyInfoDatum = 0; bool isnull = false; - List* splitCopyInfoList = NULL; + List *splitCopyInfoList = NULL; while (array_iterate(copyInfo_iterator, ©InfoDatum, &isnull)) { SplitCopyInfo *splitCopyInfo = NULL; @@ -49,56 +51,69 @@ worker_split_copy(PG_FUNCTION_ARGS) } EState *executor = CreateExecutorState(); - DestReceiver *splitCopyDestReceiver = CreateSplitCopyDestReceiver(executor, shardIdToSplitCopy, splitCopyInfoList); + DestReceiver *splitCopyDestReceiver = CreateSplitCopyDestReceiver(executor, + shardIdToSplitCopy, + splitCopyInfoList); StringInfo selectShardQueryForCopy = makeStringInfo(); appendStringInfo(selectShardQueryForCopy, - "SELECT * FROM %s;", - generate_qualified_relation_name(shardIntervalToSplitCopy->relationId)); + "SELECT * FROM %s;", + generate_qualified_relation_name( + shardIntervalToSplitCopy->relationId)); ParamListInfo params = NULL; - ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, (DestReceiver *) splitCopyDestReceiver); + ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, + (DestReceiver *) splitCopyDestReceiver); FreeExecutorState(executor); PG_RETURN_VOID(); } + static void -ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo** splitCopyInfo) +ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) { HeapTupleHeader dataTuple = DatumGetHeapTupleHeader(splitCopyInfoDatum); SplitCopyInfo *copyInfo = palloc0(sizeof(SplitCopyInfo)); bool isnull = false; - Datum destinationShardIdDatum = GetAttributeByName(dataTuple, "destination_shard_id", &isnull); + Datum destinationShardIdDatum = GetAttributeByName(dataTuple, "destination_shard_id", + &isnull); if (isnull) { - ereport(ERROR, (errmsg("destination_shard_id for split_copy_info cannot be null."))); + ereport(ERROR, (errmsg( + "destination_shard_id for split_copy_info cannot be null."))); } copyInfo->destinationShardId = DatumGetUInt64(destinationShardIdDatum); - Datum minValueDatum = GetAttributeByName(dataTuple, "destination_shard_min_value", &isnull); + Datum minValueDatum = GetAttributeByName(dataTuple, "destination_shard_min_value", + &isnull); if (isnull) { - ereport(ERROR, (errmsg("destination_shard_min_value for split_copy_info cannot be null."))); + ereport(ERROR, (errmsg( + "destination_shard_min_value for split_copy_info cannot be null."))); } char *destinationMinHash = text_to_cstring(DatumGetTextP(minValueDatum)); copyInfo->destinationShardMinHashValue = pg_strtoint64(destinationMinHash); - Datum maxValueDatum = GetAttributeByName(dataTuple, "destination_shard_max_value", &isnull); + Datum maxValueDatum = GetAttributeByName(dataTuple, "destination_shard_max_value", + &isnull); if (isnull) { - ereport(ERROR, (errmsg("destination_shard_max_value for split_copy_info cannot be null."))); + ereport(ERROR, (errmsg( + "destination_shard_max_value for split_copy_info cannot be null."))); } char *destinationMaxHash = text_to_cstring(DatumGetTextP(maxValueDatum)); copyInfo->destinationShardMaxHashValue = pg_strtoint64(destinationMaxHash); - Datum nodeIdDatum = GetAttributeByName(dataTuple, "destination_shard_node_id", &isnull); + Datum nodeIdDatum = GetAttributeByName(dataTuple, "destination_shard_node_id", + &isnull); if (isnull) { - ereport(ERROR, (errmsg("destination_shard_node_id for split_copy_info cannot be null."))); + ereport(ERROR, (errmsg( + "destination_shard_node_id for split_copy_info cannot be null."))); } copyInfo->destinationShardNodeId = DatumGetInt32(nodeIdDatum); diff --git a/src/backend/distributed/utils/array_type.c b/src/backend/distributed/utils/array_type.c index aca06de99..ca84caf44 100644 --- a/src/backend/distributed/utils/array_type.c +++ b/src/backend/distributed/utils/array_type.c @@ -102,6 +102,7 @@ DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId) return arrayObject; } + /* * Converts ArrayType to List. */ @@ -120,10 +121,12 @@ IntegerArrayTypeToList(ArrayType *arrayObject) return list; } + /* * Converts Text ArrayType to Integer List. */ -extern List * TextArrayTypeToIntegerList(ArrayType *arrayObject, Oid datumTypeId) +extern List * +TextArrayTypeToIntegerList(ArrayType *arrayObject, Oid datumTypeId) { List *list = NULL; Datum *datumObjectArray = DeconstructArrayObject(arrayObject); @@ -139,23 +142,26 @@ extern List * TextArrayTypeToIntegerList(ArrayType *arrayObject, Oid datumTypeId { int16_t *int16Value = palloc0(sizeof(int16_t)); *int16Value = pg_strtoint16(intAsStr); - list = lappend(list, (void*) int16Value); + list = lappend(list, (void *) int16Value); break; } + case INT4OID: { int32_t *int32Value = palloc0(sizeof(int32_t)); *int32Value = pg_strtoint32(intAsStr); - list = lappend(list, (void*) int32Value); + list = lappend(list, (void *) int32Value); break; } + case INT8OID: { int64_t *int64Value = palloc0(sizeof(int64_t)); *int64Value = pg_strtoint64(intAsStr); - list = lappend(list, (void*) int64Value); + list = lappend(list, (void *) int64Value); break; } + default: ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Unsupported datum type for array."))); diff --git a/src/include/distributed/shard_split.h b/src/include/distributed/shard_split.h index d67fb6036..5452e0224 100644 --- a/src/include/distributed/shard_split.h +++ b/src/include/distributed/shard_split.h @@ -39,8 +39,10 @@ extern void SplitShard(SplitMode splitMode, List *nodeIdsForPlacementList); /* TODO(niupre): Make all these APIs private when all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API. */ -extern void ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard); -extern void InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList); +extern void ErrorIfCannotSplitShard(SplitOperation splitOperation, + ShardInterval *sourceShard); +extern void InsertSplitOffShardMetadata(List *splitOffShardList, + List *sourcePlacementList); extern void DropShardList(List *shardIntervalList); extern void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList); extern void ExecuteCommandListOnPlacements(List *commandList, List *placementList); diff --git a/src/include/distributed/worker_shard_copy.h b/src/include/distributed/worker_shard_copy.h index ce2eda51c..7c4a68f30 100644 --- a/src/include/distributed/worker_shard_copy.h +++ b/src/include/distributed/worker_shard_copy.h @@ -14,9 +14,8 @@ struct FullRelationName; -extern DestReceiver * CreateShardCopyDestReceiver( - EState *executorState, - char* destinationShardFullyQualifiedName, - uint32_t destinationNodeId); +extern DestReceiver * CreateShardCopyDestReceiver(EState *executorState, + char *destinationShardFullyQualifiedName, + uint32_t destinationNodeId); #endif /* WORKER_SHARD_COPY_H_ */ diff --git a/src/include/distributed/worker_split_copy.h b/src/include/distributed/worker_split_copy.h index 97f863164..ac81a79ca 100644 --- a/src/include/distributed/worker_split_copy.h +++ b/src/include/distributed/worker_split_copy.h @@ -14,12 +14,14 @@ typedef struct SplitCopyInfo { - uint64 destinationShardId; /* destination shard id */ + uint64 destinationShardId; /* destination shard id */ int32 destinationShardMinHashValue; /* min hash value of destination shard */ int32 destinationShardMaxHashValue; /* max hash value of destination shard */ - uint32_t destinationShardNodeId; /* node where split child shard is to be placed */ + uint32_t destinationShardNodeId; /* node where split child shard is to be placed */ } SplitCopyInfo; -extern DestReceiver* CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, List* splitCopyInfoList); +extern DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64 + sourceShardIdToCopy, + List *splitCopyInfoList); #endif /* WORKER_SPLIT_COPY_H_ */