pull/6029/head
Nitish Upreti 2022-06-24 09:56:06 -07:00
parent f3a391b80f
commit 8b4956e9e9
4 changed files with 43 additions and 24 deletions

View File

@ -30,7 +30,7 @@ PG_FUNCTION_INFO_V1(citus_split_shard_by_split_points);
static SplitMode LookupSplitMode(Oid shardSplitModeOid);
/*
* citus_split_shard_by_split_points(shard_id bigint, split_points integer[], node_ids integer[])
* citus_split_shard_by_split_points(shard_id bigint, split_points text[], node_ids integer[], split_mode citus.split_mode)
* Split source shard into multiple shards using the given split points.
* 'shard_id' is the id of source shard to split.
* 'split_points' is an array that represents the split points.

View File

@ -60,8 +60,10 @@ static void DoSplitCopy(WorkerNode *sourceShardNode,
static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
List *splitChildrenShardIntervalList,
List *workersForPlacementList);
static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList);
static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList);
static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
/* Customize error message strings based on operation type */
static const char *const SplitOperationName[] =
@ -410,8 +412,10 @@ BlockingShardSplit(SplitOperation splitOperation,
/* Only single placement allowed (already validated by caller) */
List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId);
Assert(sourcePlacementList->length == 1);
ShardPlacement *sourceShardPlacement = (ShardPlacement *) linitial(sourcePlacementList);
WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, false /* missingOk */);
ShardPlacement *sourceShardPlacement = (ShardPlacement *) linitial(
sourcePlacementList);
WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId,
false /* missingOk */);
/* Physically create split children and perform split copy */
CreateSplitShardsForShardGroup(
@ -428,7 +432,8 @@ BlockingShardSplit(SplitOperation splitOperation,
DropShardList(sourceColocatedShardIntervalList);
/* Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, workersForPlacementList);
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);
/*
* Create foreign keys if exists after the metadata changes happening in
@ -679,7 +684,8 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard,
* Insert new shard and placement metadata.
*/
static void
InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList)
InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
{
/* Iterate on shard intervals for shard group */
List *shardIntervalList = NULL;
@ -727,10 +733,11 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *wo
* Create foreign key constraints on the split children shards.
*/
static void
CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList)
CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
{
/* Create constraints between shards */
List* shardIntervalList = NULL;
List *shardIntervalList = NULL;
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
ShardInterval *shardInterval = NULL;
@ -742,8 +749,10 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workers
List *referenceTableForeignConstraintList = NIL;
CopyShardForeignConstraintCommandListGrouped(shardInterval,
&shardForeignConstraintCommandList,
&referenceTableForeignConstraintList);
&
shardForeignConstraintCommandList,
&
referenceTableForeignConstraintList);
List *commandList = NIL;
commandList = list_concat(commandList, shardForeignConstraintCommandList);

View File

@ -99,6 +99,7 @@ ShouldSendCopyNow(StringInfo buffer)
return buffer->len > LocalCopyFlushThresholdByte;
}
static void
ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
{
@ -122,7 +123,8 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
ReportConnectionError(copyDest->connection, ERROR);
}
PGresult *result = GetRemoteCommandResult(copyDest->connection, true /* raiseInterrupts */);
PGresult *result = GetRemoteCommandResult(copyDest->connection,
true /* raiseInterrupts */);
if (PQresultStatus(result) != PGRES_COPY_IN)
{
ReportResultError(copyDest->connection, result, ERROR);
@ -131,6 +133,7 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
PQclear(result);
}
static bool
ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{
@ -179,8 +182,10 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
if (!PutRemoteCopyData(copyDest->connection, copyOutState->fe_msgbuf->data,
copyOutState->fe_msgbuf->len))
{
char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName);
char *destinationShardSchemaName = linitial(
copyDest->destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(
copyDest->destinationShardFullyQualifiedName);
char *errorMessage = PQerrorMessage(copyDest->connection->pgConn);
@ -253,12 +258,15 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
/* end the COPY input */
if (!PutRemoteCopyEnd(copyDest->connection, NULL /* errormsg */))
{
char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName);
char *destinationShardSchemaName = linitial(
copyDest->destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(
copyDest->destinationShardFullyQualifiedName);
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to COPY to destination shard %s.%s",
destinationShardSchemaName, destinationShardRelationName)));
destinationShardSchemaName,
destinationShardRelationName)));
}
/* check whether there were any COPY errors */
@ -396,8 +404,10 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState
*/
LocalCopyBuffer = localCopyOutState->fe_msgbuf;
char *destinationShardSchemaName = linitial(copyDest->destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(copyDest->destinationShardFullyQualifiedName);
char *destinationShardSchemaName = linitial(
copyDest->destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(
copyDest->destinationShardFullyQualifiedName);
Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName,
false /* missing_ok */);