diff --git a/src/backend/distributed/operations/isolate_shards.c b/src/backend/distributed/operations/isolate_shards.c index c6d2a9131..f78e48cc2 100644 --- a/src/backend/distributed/operations/isolate_shards.c +++ b/src/backend/distributed/operations/isolate_shards.c @@ -63,6 +63,7 @@ static void InsertSplitOffShardMetadata(List *splitOffShardList, static void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList); static void ExecuteCommandListOnWorker(char *nodeName, int nodePort, List *commandList); + /* * isolate_tenant_to_new_shard isolates a tenant to its own shard by spliting * the current matching shard. diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 181a4c1d4..22368d957 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -304,12 +304,13 @@ ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, /* - * SplitShard API to split a given shard (or shard group) in blocking / non-blocking fashion - * based on specified split points to a set of destination nodes. + * SplitShard API to split a given shard (or shard group) based on specified split points + * to a set of destination nodes. + * 'splitMode' : Mode of split operation. * 'splitOperation' : Customer operation that triggered split. * 'shardInterval' : Source shard interval to be split. * 'shardSplitPointsList' : Split Points list for the source 'shardInterval'. - * 'workersForPlacementList' : Placement list corresponding to split children. + * 'nodeIdsForPlacementList' : Placement list corresponding to split children. */ void SplitShard(SplitMode splitMode, @@ -389,7 +390,7 @@ SplitShard(SplitMode splitMode, * SplitShard API to split a given shard (or shard group) in blocking fashion * based on specified split points to a set of destination nodes. * 'splitOperation' : Customer operation that triggered split. - * 'shardInterval' : Source shard interval to be split. + * 'shardIntervalToSplit' : Source shard interval to be split. * 'shardSplitPointsList' : Split Points list for the source 'shardInterval'. * 'workersForPlacementList' : Placement list corresponding to split children. */ @@ -409,7 +410,7 @@ BlockingShardSplit(SplitOperation splitOperation, sourceColocatedShardIntervalList, shardSplitPointsList); - /* Only single placement allowed (already validated by caller) */ + /* Only single placement allowed (already validated RelationReplicationFactor = 1) */ List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId); Assert(sourcePlacementList->length == 1); ShardPlacement *sourceShardPlacement = (ShardPlacement *) linitial( @@ -417,7 +418,11 @@ BlockingShardSplit(SplitOperation splitOperation, WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, false /* missingOk */); - /* Physically create split children and perform split copy */ + /* + * Physically create split children, perform split copy and create auxillary structures. + * This includes: indexes, replicaIdentity. triggers and statistics. + * Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints). + */ CreateSplitShardsForShardGroup( sourceShardToCopyNode, sourceColocatedShardIntervalList, @@ -453,10 +458,11 @@ CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode, List *shardGroupSplitIntervalListList, List *workersForPlacementList) { - /* Iterate on shard intervals for shard group */ + /* Iterate on shard interval list for shard group */ List *shardIntervalList = NULL; foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { + /* Iterate on split shard interval list and corresponding placement worker */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, @@ -481,11 +487,11 @@ CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode, shardGroupSplitIntervalListList, workersForPlacementList); /* - * Create Indexes post copy. - * TODO(niupre) : Can we use Adaptive execution for creating multiple indexes parallely + * Create auxillary structures post copy. */ foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { + /* Iterate on split shard interval list and corresponding placement worker */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, @@ -528,7 +534,7 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, splitShardIntervalList, destinationWorkerNodesList); - Task *task = CreateBasicTask( + Task *splitCopyTask = CreateBasicTask( sourceShardIntervalToCopy->shardId, /* jobId */ taskId, READ_TASK, @@ -537,14 +543,15 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); SetPlacementNodeMetadata(taskPlacement, sourceShardNode); - task->taskPlacementList = list_make1(taskPlacement); + splitCopyTask->taskPlacementList = list_make1(taskPlacement); - splitCopyTaskList = lappend(splitCopyTaskList, task); + splitCopyTaskList = lappend(splitCopyTaskList, splitCopyTask); taskId++; } ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList, - MaxAdaptiveExecutorPoolSize, NULL /* jobIdList */); + MaxAdaptiveExecutorPoolSize, + NULL /* jobIdList (ignored by API implementation) */); } @@ -596,7 +603,7 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, /* - * Create an object (shard/index) on a worker node. + * Create an object on a worker node. */ static void CreateObjectOnPlacement(List *objectCreationCommandList, @@ -682,6 +689,7 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard, /* * Insert new shard and placement metadata. + * Sync the Metadata with all nodes if enabled. */ static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, @@ -709,7 +717,7 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, shardInterval->shardId, INVALID_PLACEMENT_ID, /* triggers generation of new id */ SHARD_STATE_ACTIVE, - 0, /* shard length */ + 0, /* shard length (zero for HashDistributed Table) */ workerPlacementNode->groupId); } @@ -740,6 +748,7 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *shardIntervalList = NULL; foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { + /* Iterate on split children shards along with the respective placement workers */ ShardInterval *shardInterval = NULL; WorkerNode *workerPlacementNode = NULL; forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 76d751e2b..c4d69e9f2 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * worker_shard_copy.c - * Functions for copying a shard to desintaion with push copy. + * Functions for copying a shard to destination. * * Copyright (c) Citus Data, Inc. * @@ -100,6 +100,7 @@ ShouldSendCopyNow(StringInfo buffer) } +/* Connect to node with source shard and trigger copy start. */ static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) { @@ -123,8 +124,7 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) ReportConnectionError(copyDest->connection, ERROR); } - PGresult *result = GetRemoteCommandResult(copyDest->connection, - true /* raiseInterrupts */); + PGresult *result = GetRemoteCommandResult(copyDest->connection, true /* raiseInterrupts */); if (PQresultStatus(result) != PGRES_COPY_IN) { ReportResultError(copyDest->connection, result, ERROR); @@ -134,6 +134,11 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) } +/* + * ShardCopyDestReceiverReceive implements the receiveSlot function of + * ShardCopyDestReceiver. It takes a TupleTableSlot and sends the contents to + * the appropriate destination node. + */ static bool ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { @@ -168,16 +173,17 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) } else { - FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; resetStringInfo(copyOutState->fe_msgbuf); - if (copyDest->copyOutState->binary && copyDest->tuplesSent == 0) { AppendCopyBinaryHeaders(copyDest->copyOutState); } - AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor, - copyOutState, columnOutputFunctions, + AppendCopyRowData(columnValues, + columnNulls, + copyDest->tupleDescriptor, + copyOutState, + copyDest->columnOutputFunctions, NULL /* columnCoercionPaths */); if (!PutRemoteCopyData(copyDest->connection, copyOutState->fe_msgbuf->data, copyOutState->fe_msgbuf->len)) @@ -188,7 +194,6 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) copyDest->destinationShardFullyQualifiedName); char *errorMessage = PQerrorMessage(copyDest->connection->pgConn); - ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("Failed to COPY to shard %s.%s : %s,", destinationShardSchemaName, @@ -209,6 +214,9 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) } +/* + * ShardCopyDestReceiverStartup implements the rStartup interface of ShardCopyDestReceiver. + */ static void ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor) @@ -234,6 +242,11 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc } +/* + * ShardCopyDestReceiverShutdown implements the rShutdown interface of + * ShardCopyDestReceiver. It ends all open COPY operations, copying any pending + * data in buffer. + */ static void ShardCopyDestReceiverShutdown(DestReceiver *dest) { @@ -244,6 +257,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) if (copyDest->copyOutState != NULL && copyDest->copyOutState->fe_msgbuf->len > 0) { + /* end the COPY input */ LocalCopyToShard(copyDest, copyDest->copyOutState); } } @@ -266,7 +280,11 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("Failed to COPY to destination shard %s.%s", destinationShardSchemaName, - destinationShardRelationName))); + destinationShardRelationName), + errdetail("failed to send %d bytes %s on node %u", + copyDest->copyOutState->fe_msgbuf->len, + copyDest->copyOutState->fe_msgbuf->data, + copyDest->destinationNodeId))); } /* check whether there were any COPY errors */ @@ -284,6 +302,10 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) } +/* + * CreateShardCopyDestReceiver creates a DestReceiver that copies into + * a destinationShardFullyQualifiedName on destinationNodeId. + */ DestReceiver * CreateShardCopyDestReceiver(EState *executorState, List *destinationShardFullyQualifiedName, @@ -309,7 +331,9 @@ CreateShardCopyDestReceiver(EState *executorState, return (DestReceiver *) copyDest; } - +/* + * ShardCopyDestReceiverDestroy frees the DestReceiver. + */ static void ShardCopyDestReceiverDestroy(DestReceiver *dest) { @@ -356,6 +380,7 @@ ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool useBinaryF } +/* Write Tuple to Local Shard. */ static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest) { @@ -386,7 +411,7 @@ WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest) /* - * LocalCopyToShard finishes local copy for the given destination shard. + * LocalCopyToShard performs local copy for the given destination shard. */ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) diff --git a/src/backend/distributed/operations/worker_split_copy.c b/src/backend/distributed/operations/worker_split_copy.c index 0dfc43c43..cf85a4595 100644 --- a/src/backend/distributed/operations/worker_split_copy.c +++ b/src/backend/distributed/operations/worker_split_copy.c @@ -51,6 +51,11 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, static void SplitCopyDestReceiverShutdown(DestReceiver *dest); static void SplitCopyDestReceiverDestroy(DestReceiver *copyDest); +/* + * CreateSplitCopyDestReceiver creates a DestReceiver that performs + * split copy for sourceShardIdToCopy to destination split children + * based on splitCopyInfoList. + */ DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, List *splitCopyInfoList) @@ -76,7 +81,6 @@ CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, SplitCopyInfo *splitCopyInfo = NULL; int index = 0; - char *sourceShardNamePrefix = get_rel_name(shardIntervalToSplitCopy->relationId); foreach_ptr(splitCopyInfo, splitCopyInfoList) { @@ -103,6 +107,9 @@ CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, } +/* + * SplitCopyDestReceiverStartup implements the rStartup interface of SplitCopyDestReceiver. + */ static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor) @@ -117,6 +124,11 @@ SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc } +/* + * SplitCopyDestReceiverReceive implements the receiveSlot function of + * SplitCopyDestReceiver. It takes a TupleTableSlot and sends the contents to + * the appropriate destination shard. + */ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { @@ -175,6 +187,10 @@ SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) } +/* + * SplitCopyDestReceiverShutdown implements the rShutdown interface of + * SplitCopyDestReceiver. It ends all open COPY operations. + */ static void SplitCopyDestReceiverShutdown(DestReceiver *dest) { @@ -188,6 +204,9 @@ SplitCopyDestReceiverShutdown(DestReceiver *dest) } +/* + * SplitCopyDestReceiverDestroy frees the DestReceiver. + */ static void SplitCopyDestReceiverDestroy(DestReceiver *dest) { diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index 72894ce52..b782c2196 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -21,7 +21,10 @@ static void ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo); /* - * + * worker_split_copy(source_shard_id bigint, splitCopyInfo citus.split_copy_info[]) + * UDF to split copy shard to list of destination shards. + * 'source_shard_id' : Source ShardId to split copy. + * 'splitCopyInfos' : Array of Split Copy Info (destination_shard's id, min/max ranges and node_id) */ Datum worker_split_copy(PG_FUNCTION_ARGS) @@ -72,7 +75,7 @@ worker_split_copy(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } - +/* Parse a single SplitCopyInfo Tuple */ static void ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) { diff --git a/src/backend/distributed/sql/udfs/worker_split_copy/11.0-2.sql b/src/backend/distributed/sql/udfs/worker_split_copy/11.0-2.sql index 9a69b3b68..7f35d2b4f 100644 --- a/src/backend/distributed/sql/udfs/worker_split_copy/11.0-2.sql +++ b/src/backend/distributed/sql/udfs/worker_split_copy/11.0-2.sql @@ -9,9 +9,9 @@ CREATE TYPE citus.split_copy_info AS ( CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy( source_shard_id bigint, - splitCopyInfo citus.split_copy_info[]) + splitCopyInfos citus.split_copy_info[]) RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_copy$$; -COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfo citus.split_copy_info[]) +COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[]) IS 'Perform split copy for shard' diff --git a/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql b/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql index 9a69b3b68..7f35d2b4f 100644 --- a/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql +++ b/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql @@ -9,9 +9,9 @@ CREATE TYPE citus.split_copy_info AS ( CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy( source_shard_id bigint, - splitCopyInfo citus.split_copy_info[]) + splitCopyInfos citus.split_copy_info[]) RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_copy$$; -COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfo citus.split_copy_info[]) +COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[]) IS 'Perform split copy for shard'