Cleaning up and adding comments

pull/6029/head
Nitish Upreti 2022-06-24 11:54:07 -07:00
parent 8b4956e9e9
commit a836a322c4
7 changed files with 90 additions and 33 deletions

View File

@ -63,6 +63,7 @@ static void InsertSplitOffShardMetadata(List *splitOffShardList,
static void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList); static void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList);
static void ExecuteCommandListOnWorker(char *nodeName, int nodePort, List *commandList); static void ExecuteCommandListOnWorker(char *nodeName, int nodePort, List *commandList);
/* /*
* isolate_tenant_to_new_shard isolates a tenant to its own shard by spliting * isolate_tenant_to_new_shard isolates a tenant to its own shard by spliting
* the current matching shard. * the current matching shard.

View File

@ -304,12 +304,13 @@ ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
/* /*
* SplitShard API to split a given shard (or shard group) in blocking / non-blocking fashion * SplitShard API to split a given shard (or shard group) based on specified split points
* based on specified split points to a set of destination nodes. * to a set of destination nodes.
* 'splitMode' : Mode of split operation.
* 'splitOperation' : Customer operation that triggered split. * 'splitOperation' : Customer operation that triggered split.
* 'shardInterval' : Source shard interval to be split. * 'shardInterval' : Source shard interval to be split.
* 'shardSplitPointsList' : Split Points list for the source 'shardInterval'. * 'shardSplitPointsList' : Split Points list for the source 'shardInterval'.
* 'workersForPlacementList' : Placement list corresponding to split children. * 'nodeIdsForPlacementList' : Placement list corresponding to split children.
*/ */
void void
SplitShard(SplitMode splitMode, SplitShard(SplitMode splitMode,
@ -389,7 +390,7 @@ SplitShard(SplitMode splitMode,
* SplitShard API to split a given shard (or shard group) in blocking fashion * SplitShard API to split a given shard (or shard group) in blocking fashion
* based on specified split points to a set of destination nodes. * based on specified split points to a set of destination nodes.
* 'splitOperation' : Customer operation that triggered split. * 'splitOperation' : Customer operation that triggered split.
* 'shardInterval' : Source shard interval to be split. * 'shardIntervalToSplit' : Source shard interval to be split.
* 'shardSplitPointsList' : Split Points list for the source 'shardInterval'. * 'shardSplitPointsList' : Split Points list for the source 'shardInterval'.
* 'workersForPlacementList' : Placement list corresponding to split children. * 'workersForPlacementList' : Placement list corresponding to split children.
*/ */
@ -409,7 +410,7 @@ BlockingShardSplit(SplitOperation splitOperation,
sourceColocatedShardIntervalList, sourceColocatedShardIntervalList,
shardSplitPointsList); shardSplitPointsList);
/* Only single placement allowed (already validated by caller) */ /* Only single placement allowed (already validated RelationReplicationFactor = 1) */
List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId); List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId);
Assert(sourcePlacementList->length == 1); Assert(sourcePlacementList->length == 1);
ShardPlacement *sourceShardPlacement = (ShardPlacement *) linitial( ShardPlacement *sourceShardPlacement = (ShardPlacement *) linitial(
@ -417,7 +418,11 @@ BlockingShardSplit(SplitOperation splitOperation,
WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId,
false /* missingOk */); false /* missingOk */);
/* Physically create split children and perform split copy */ /*
* Physically create split children, perform split copy and create auxillary structures.
* This includes: indexes, replicaIdentity. triggers and statistics.
* Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints).
*/
CreateSplitShardsForShardGroup( CreateSplitShardsForShardGroup(
sourceShardToCopyNode, sourceShardToCopyNode,
sourceColocatedShardIntervalList, sourceColocatedShardIntervalList,
@ -453,10 +458,11 @@ CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
List *workersForPlacementList) List *workersForPlacementList)
{ {
/* Iterate on shard intervals for shard group */ /* Iterate on shard interval list for shard group */
List *shardIntervalList = NULL; List *shardIntervalList = NULL;
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{ {
/* Iterate on split shard interval list and corresponding placement worker */
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL; WorkerNode *workerPlacementNode = NULL;
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
@ -481,11 +487,11 @@ CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode,
shardGroupSplitIntervalListList, workersForPlacementList); shardGroupSplitIntervalListList, workersForPlacementList);
/* /*
* Create Indexes post copy. * Create auxillary structures post copy.
* TODO(niupre) : Can we use Adaptive execution for creating multiple indexes parallely
*/ */
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{ {
/* Iterate on split shard interval list and corresponding placement worker */
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL; WorkerNode *workerPlacementNode = NULL;
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
@ -528,7 +534,7 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList,
splitShardIntervalList, splitShardIntervalList,
destinationWorkerNodesList); destinationWorkerNodesList);
Task *task = CreateBasicTask( Task *splitCopyTask = CreateBasicTask(
sourceShardIntervalToCopy->shardId, /* jobId */ sourceShardIntervalToCopy->shardId, /* jobId */
taskId, taskId,
READ_TASK, READ_TASK,
@ -537,14 +543,15 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList,
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(taskPlacement, sourceShardNode); SetPlacementNodeMetadata(taskPlacement, sourceShardNode);
task->taskPlacementList = list_make1(taskPlacement); splitCopyTask->taskPlacementList = list_make1(taskPlacement);
splitCopyTaskList = lappend(splitCopyTaskList, task); splitCopyTaskList = lappend(splitCopyTaskList, splitCopyTask);
taskId++; taskId++;
} }
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList, ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList,
MaxAdaptiveExecutorPoolSize, NULL /* jobIdList */); MaxAdaptiveExecutorPoolSize,
NULL /* jobIdList (ignored by API implementation) */);
} }
@ -596,7 +603,7 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
/* /*
* Create an object (shard/index) on a worker node. * Create an object on a worker node.
*/ */
static void static void
CreateObjectOnPlacement(List *objectCreationCommandList, CreateObjectOnPlacement(List *objectCreationCommandList,
@ -682,6 +689,7 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard,
/* /*
* Insert new shard and placement metadata. * Insert new shard and placement metadata.
* Sync the Metadata with all nodes if enabled.
*/ */
static void static void
InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
@ -709,7 +717,7 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
shardInterval->shardId, shardInterval->shardId,
INVALID_PLACEMENT_ID, /* triggers generation of new id */ INVALID_PLACEMENT_ID, /* triggers generation of new id */
SHARD_STATE_ACTIVE, SHARD_STATE_ACTIVE,
0, /* shard length */ 0, /* shard length (zero for HashDistributed Table) */
workerPlacementNode->groupId); workerPlacementNode->groupId);
} }
@ -740,6 +748,7 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
List *shardIntervalList = NULL; List *shardIntervalList = NULL;
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{ {
/* Iterate on split children shards along with the respective placement workers */
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL; WorkerNode *workerPlacementNode = NULL;
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,

View File

@ -1,7 +1,7 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* worker_shard_copy.c * worker_shard_copy.c
* Functions for copying a shard to desintaion with push copy. * Functions for copying a shard to destination.
* *
* Copyright (c) Citus Data, Inc. * Copyright (c) Citus Data, Inc.
* *
@ -100,6 +100,7 @@ ShouldSendCopyNow(StringInfo buffer)
} }
/* Connect to node with source shard and trigger copy start. */
static void static void
ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
{ {
@ -123,8 +124,7 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
ReportConnectionError(copyDest->connection, ERROR); ReportConnectionError(copyDest->connection, ERROR);
} }
PGresult *result = GetRemoteCommandResult(copyDest->connection, PGresult *result = GetRemoteCommandResult(copyDest->connection, true /* raiseInterrupts */);
true /* raiseInterrupts */);
if (PQresultStatus(result) != PGRES_COPY_IN) if (PQresultStatus(result) != PGRES_COPY_IN)
{ {
ReportResultError(copyDest->connection, result, ERROR); ReportResultError(copyDest->connection, result, ERROR);
@ -134,6 +134,11 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
} }
/*
* ShardCopyDestReceiverReceive implements the receiveSlot function of
* ShardCopyDestReceiver. It takes a TupleTableSlot and sends the contents to
* the appropriate destination node.
*/
static bool static bool
ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{ {
@ -168,16 +173,17 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
} }
else else
{ {
FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions;
resetStringInfo(copyOutState->fe_msgbuf); resetStringInfo(copyOutState->fe_msgbuf);
if (copyDest->copyOutState->binary && copyDest->tuplesSent == 0) if (copyDest->copyOutState->binary && copyDest->tuplesSent == 0)
{ {
AppendCopyBinaryHeaders(copyDest->copyOutState); AppendCopyBinaryHeaders(copyDest->copyOutState);
} }
AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor, AppendCopyRowData(columnValues,
copyOutState, columnOutputFunctions, columnNulls,
copyDest->tupleDescriptor,
copyOutState,
copyDest->columnOutputFunctions,
NULL /* columnCoercionPaths */); NULL /* columnCoercionPaths */);
if (!PutRemoteCopyData(copyDest->connection, copyOutState->fe_msgbuf->data, if (!PutRemoteCopyData(copyDest->connection, copyOutState->fe_msgbuf->data,
copyOutState->fe_msgbuf->len)) copyOutState->fe_msgbuf->len))
@ -188,7 +194,6 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
copyDest->destinationShardFullyQualifiedName); copyDest->destinationShardFullyQualifiedName);
char *errorMessage = PQerrorMessage(copyDest->connection->pgConn); char *errorMessage = PQerrorMessage(copyDest->connection->pgConn);
ereport(ERROR, (errcode(ERRCODE_IO_ERROR), ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to COPY to shard %s.%s : %s,", errmsg("Failed to COPY to shard %s.%s : %s,",
destinationShardSchemaName, destinationShardSchemaName,
@ -209,6 +214,9 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
} }
/*
* ShardCopyDestReceiverStartup implements the rStartup interface of ShardCopyDestReceiver.
*/
static void static void
ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc
inputTupleDescriptor) inputTupleDescriptor)
@ -234,6 +242,11 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc
} }
/*
* ShardCopyDestReceiverShutdown implements the rShutdown interface of
* ShardCopyDestReceiver. It ends all open COPY operations, copying any pending
* data in buffer.
*/
static void static void
ShardCopyDestReceiverShutdown(DestReceiver *dest) ShardCopyDestReceiverShutdown(DestReceiver *dest)
{ {
@ -244,6 +257,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
if (copyDest->copyOutState != NULL && if (copyDest->copyOutState != NULL &&
copyDest->copyOutState->fe_msgbuf->len > 0) copyDest->copyOutState->fe_msgbuf->len > 0)
{ {
/* end the COPY input */
LocalCopyToShard(copyDest, copyDest->copyOutState); LocalCopyToShard(copyDest, copyDest->copyOutState);
} }
} }
@ -266,7 +280,11 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
ereport(ERROR, (errcode(ERRCODE_IO_ERROR), ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to COPY to destination shard %s.%s", errmsg("Failed to COPY to destination shard %s.%s",
destinationShardSchemaName, destinationShardSchemaName,
destinationShardRelationName))); destinationShardRelationName),
errdetail("failed to send %d bytes %s on node %u",
copyDest->copyOutState->fe_msgbuf->len,
copyDest->copyOutState->fe_msgbuf->data,
copyDest->destinationNodeId)));
} }
/* check whether there were any COPY errors */ /* check whether there were any COPY errors */
@ -284,6 +302,10 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
} }
/*
* CreateShardCopyDestReceiver creates a DestReceiver that copies into
* a destinationShardFullyQualifiedName on destinationNodeId.
*/
DestReceiver * DestReceiver *
CreateShardCopyDestReceiver(EState *executorState, CreateShardCopyDestReceiver(EState *executorState,
List *destinationShardFullyQualifiedName, List *destinationShardFullyQualifiedName,
@ -309,7 +331,9 @@ CreateShardCopyDestReceiver(EState *executorState,
return (DestReceiver *) copyDest; return (DestReceiver *) copyDest;
} }
/*
* ShardCopyDestReceiverDestroy frees the DestReceiver.
*/
static void static void
ShardCopyDestReceiverDestroy(DestReceiver *dest) ShardCopyDestReceiverDestroy(DestReceiver *dest)
{ {
@ -356,6 +380,7 @@ ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool useBinaryF
} }
/* Write Tuple to Local Shard. */
static void static void
WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest) WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest)
{ {
@ -386,7 +411,7 @@ WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest)
/* /*
* LocalCopyToShard finishes local copy for the given destination shard. * LocalCopyToShard performs local copy for the given destination shard.
*/ */
static void static void
LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState) LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState)

View File

@ -51,6 +51,11 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot,
static void SplitCopyDestReceiverShutdown(DestReceiver *dest); static void SplitCopyDestReceiverShutdown(DestReceiver *dest);
static void SplitCopyDestReceiverDestroy(DestReceiver *copyDest); static void SplitCopyDestReceiverDestroy(DestReceiver *copyDest);
/*
* CreateSplitCopyDestReceiver creates a DestReceiver that performs
* split copy for sourceShardIdToCopy to destination split children
* based on splitCopyInfoList.
*/
DestReceiver * DestReceiver *
CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy,
List *splitCopyInfoList) List *splitCopyInfoList)
@ -76,7 +81,6 @@ CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy,
SplitCopyInfo *splitCopyInfo = NULL; SplitCopyInfo *splitCopyInfo = NULL;
int index = 0; int index = 0;
char *sourceShardNamePrefix = get_rel_name(shardIntervalToSplitCopy->relationId); char *sourceShardNamePrefix = get_rel_name(shardIntervalToSplitCopy->relationId);
foreach_ptr(splitCopyInfo, splitCopyInfoList) foreach_ptr(splitCopyInfo, splitCopyInfoList)
{ {
@ -103,6 +107,9 @@ CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy,
} }
/*
* SplitCopyDestReceiverStartup implements the rStartup interface of SplitCopyDestReceiver.
*/
static void static void
SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc
inputTupleDescriptor) inputTupleDescriptor)
@ -117,6 +124,11 @@ SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc
} }
/*
* SplitCopyDestReceiverReceive implements the receiveSlot function of
* SplitCopyDestReceiver. It takes a TupleTableSlot and sends the contents to
* the appropriate destination shard.
*/
static bool static bool
SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{ {
@ -175,6 +187,10 @@ SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
} }
/*
* SplitCopyDestReceiverShutdown implements the rShutdown interface of
* SplitCopyDestReceiver. It ends all open COPY operations.
*/
static void static void
SplitCopyDestReceiverShutdown(DestReceiver *dest) SplitCopyDestReceiverShutdown(DestReceiver *dest)
{ {
@ -188,6 +204,9 @@ SplitCopyDestReceiverShutdown(DestReceiver *dest)
} }
/*
* SplitCopyDestReceiverDestroy frees the DestReceiver.
*/
static void static void
SplitCopyDestReceiverDestroy(DestReceiver *dest) SplitCopyDestReceiverDestroy(DestReceiver *dest)
{ {

View File

@ -21,7 +21,10 @@ static void ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum,
SplitCopyInfo **splitCopyInfo); SplitCopyInfo **splitCopyInfo);
/* /*
* * worker_split_copy(source_shard_id bigint, splitCopyInfo citus.split_copy_info[])
* UDF to split copy shard to list of destination shards.
* 'source_shard_id' : Source ShardId to split copy.
* 'splitCopyInfos' : Array of Split Copy Info (destination_shard's id, min/max ranges and node_id)
*/ */
Datum Datum
worker_split_copy(PG_FUNCTION_ARGS) worker_split_copy(PG_FUNCTION_ARGS)
@ -72,7 +75,7 @@ worker_split_copy(PG_FUNCTION_ARGS)
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
/* Parse a single SplitCopyInfo Tuple */
static void static void
ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo)
{ {

View File

@ -9,9 +9,9 @@ CREATE TYPE citus.split_copy_info AS (
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy( CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy(
source_shard_id bigint, source_shard_id bigint,
splitCopyInfo citus.split_copy_info[]) splitCopyInfos citus.split_copy_info[])
RETURNS void RETURNS void
LANGUAGE C STRICT LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_copy$$; AS 'MODULE_PATHNAME', $$worker_split_copy$$;
COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfo citus.split_copy_info[]) COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[])
IS 'Perform split copy for shard' IS 'Perform split copy for shard'

View File

@ -9,9 +9,9 @@ CREATE TYPE citus.split_copy_info AS (
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy( CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy(
source_shard_id bigint, source_shard_id bigint,
splitCopyInfo citus.split_copy_info[]) splitCopyInfos citus.split_copy_info[])
RETURNS void RETURNS void
LANGUAGE C STRICT LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_copy$$; AS 'MODULE_PATHNAME', $$worker_split_copy$$;
COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfo citus.split_copy_info[]) COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[])
IS 'Perform split copy for shard' IS 'Perform split copy for shard'