Reindent code

pull/6029/head
Nitish Upreti 2022-06-19 17:18:28 -07:00
parent 4a05f1f1e8
commit 29f0081169
10 changed files with 216 additions and 120 deletions

View File

@ -46,10 +46,12 @@ citus_split_shard_by_split_points(PG_FUNCTION_ARGS)
uint64 shardIdToSplit = DatumGetUInt64(PG_GETARG_DATUM(0));
ArrayType *splitPointsArrayObject = PG_GETARG_ARRAYTYPE_P(1);
List *shardSplitPointsList = TextArrayTypeToIntegerList(splitPointsArrayObject, INT4OID);
List *shardSplitPointsList = TextArrayTypeToIntegerList(splitPointsArrayObject,
INT4OID);
ArrayType *nodeIdsArrayObject = PG_GETARG_ARRAYTYPE_P(2);
List *nodeIdsForPlacementList = TextArrayTypeToIntegerList(nodeIdsArrayObject, INT4OID);
List *nodeIdsForPlacementList = TextArrayTypeToIntegerList(nodeIdsArrayObject,
INT4OID);
Oid shardSplitModeOid = PG_GETARG_OID(3);
SplitMode shardSplitMode = LookupSplitMode(shardSplitModeOid);
@ -64,6 +66,7 @@ citus_split_shard_by_split_points(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/*
* LookupSplitMode maps the oids of citus.shard_split_mode enum
* values to an enum.
@ -80,6 +83,7 @@ LookupSplitMode(Oid shardSplitModeOid)
{
shardSplitMode = BLOCKING_SPLIT;
}
/* Extend with other modes as we support them */
else
{

View File

@ -276,6 +276,7 @@ SplitShardByValue(ShardInterval *sourceShard, Datum distributionValueDatum)
return isolatedShardId;
}
/*
* CreateSplitOffShards gets a shard and a hashed value to pick the split point.
* First, it creates templates to create new shards. Then, for every colocated

View File

@ -38,13 +38,13 @@ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
ShardInterval *shardIntervalToSplit,
List *shardSplitPointsList,
List *nodeIdsForPlacementList);
static void CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList,
List **splitOffShardList);
static void CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList,
List **splitOffShardList);
static void CreateObjectOnPlacement(List *objectCreationCommandList,
WorkerNode *workerNode);
WorkerNode *workerNode);
static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList,
List *splitPointsForShard);
static void CreateSplitIntervalsForShard(ShardInterval *sourceShard,
@ -54,8 +54,13 @@ static void BlockingShardSplit(SplitOperation splitOperation,
ShardInterval *shardIntervalToSplit,
List *shardSplitPointsList,
List *workersForPlacementList);
static void DoSplitCopy(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList);
static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChildrenShardIntervalList, List* workersForPlacementList);
static void DoSplitCopy(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
List *splitChildrenShardIntervalList,
List *workersForPlacementList);
/* Customize error message strings based on operation type */
static const char *const SplitOperationName[] =
@ -406,7 +411,7 @@ BlockingShardSplit(SplitOperation splitOperation,
/* Only single placement allowed (already validated by caller) */
List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId);
Assert(sourcePlacementList->length == 1);
WorkerNode* sourceShardToCopyNode = (WorkerNode *) linitial(sourcePlacementList);
WorkerNode *sourceShardToCopyNode = (WorkerNode *) linitial(sourcePlacementList);
/* Physically create split children and perform split copy */
List *splitOffShardList = NULL;
@ -437,9 +442,10 @@ BlockingShardSplit(SplitOperation splitOperation,
CitusInvalidateRelcacheByRelid(DistShardRelationId());
}
/* Create ShardGroup split children on a list of corresponding workers. */
static void
CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode,
CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList,
@ -471,9 +477,10 @@ CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode,
}
/* Perform Split Copy */
DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList);
DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, workersForPlacementList);
// TODO(niupre) : Can we use Adaptive execution for creating multiple indexes parallely?
/* TODO(niupre) : Can we use Adaptive execution for creating multiple indexes parallely? */
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
ShardInterval *shardInterval = NULL;
@ -494,6 +501,7 @@ CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode,
}
}
/*
* Perform Split Copy from source shard(s) to split children.
* 'sourceShardNode' : Source shard worker node.
@ -502,7 +510,8 @@ CreateSplitShardsForShardGroup(WorkerNode* sourceShardNode,
* 'workersForPlacementList' : List of workers for split children placement.
*/
static void
DoSplitCopy(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList)
DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList)
{
ShardInterval *sourceShardIntervalToCopy = NULL;
List *splitShardIntervalList = NULL;
@ -510,9 +519,11 @@ DoSplitCopy(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList,
int taskId = 0;
List *splitCopyTaskList = NULL;
forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList,
splitShardIntervalList, shardGroupSplitIntervalListList)
splitShardIntervalList, shardGroupSplitIntervalListList)
{
StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy, splitShardIntervalList, destinationWorkerNodesList);
StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy,
splitShardIntervalList,
destinationWorkerNodesList);
Task *task = CreateBasicTask(
sourceShardIntervalToCopy->shardId, /* jobId */
@ -529,9 +540,11 @@ DoSplitCopy(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList,
taskId++;
}
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList, MaxAdaptiveExecutorPoolSize, NULL /* jobIdList */);
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList,
MaxAdaptiveExecutorPoolSize, NULL /* jobIdList */);
}
/*
* Create Copy command for a given shard source shard to be copied to corresponding split children.
* 'sourceShardSplitInterval' : Source shard interval to be copied.
@ -539,7 +552,9 @@ DoSplitCopy(WorkerNode* sourceShardNode, List *sourceColocatedShardIntervalList,
* 'destinationWorkerNodesList' : List of workers for split children placement.
*/
static StringInfo
CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChildrenShardIntervalList, List* destinationWorkerNodesList)
CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
List *splitChildrenShardIntervalList,
List *destinationWorkerNodesList)
{
StringInfo splitCopyInfoArray = makeStringInfo();
appendStringInfo(splitCopyInfoArray, "ARRAY[");
@ -547,19 +562,21 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChild
ShardInterval *splitChildShardInterval = NULL;
bool addComma = false;
WorkerNode *destinationWorkerNode = NULL;
forboth_ptr(splitChildShardInterval, splitChildrenShardIntervalList, destinationWorkerNode, destinationWorkerNodesList)
forboth_ptr(splitChildShardInterval, splitChildrenShardIntervalList,
destinationWorkerNode, destinationWorkerNodesList)
{
if(addComma)
if (addComma)
{
appendStringInfo(splitCopyInfoArray, ",");
}
StringInfo splitCopyInfoRow = makeStringInfo();
appendStringInfo(splitCopyInfoRow, "ROW(%lu, %lu, %lu, %u)::citus.split_copy_info",
splitChildShardInterval->shardId,
splitChildShardInterval->minValue,
splitChildShardInterval->maxValue,
destinationWorkerNode->nodeId);
appendStringInfo(splitCopyInfoRow,
"ROW(%lu, %lu, %lu, %u)::citus.split_copy_info",
splitChildShardInterval->shardId,
splitChildShardInterval->minValue,
splitChildShardInterval->maxValue,
destinationWorkerNode->nodeId);
appendStringInfo(splitCopyInfoArray, "%s", splitCopyInfoRow->data);
addComma = true;
@ -568,18 +585,19 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChild
StringInfo splitCopyUdf = makeStringInfo();
appendStringInfo(splitCopyUdf, "SELECT worker_split_copy(%lu, %s);",
sourceShardSplitInterval->shardId,
splitCopyInfoArray->data);
sourceShardSplitInterval->shardId,
splitCopyInfoArray->data);
return splitCopyUdf;
}
/*
* Create an object (shard/index) on a worker node.
*/
static void
CreateObjectOnPlacement(List *objectCreationCommandList,
WorkerNode *workerPlacementNode)
WorkerNode *workerPlacementNode)
{
char *currentUser = CurrentUserName();
SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName,
@ -588,6 +606,7 @@ CreateObjectOnPlacement(List *objectCreationCommandList,
objectCreationCommandList);
}
/*
* Create split children intervals for a shardgroup given list of split points.
*/
@ -657,6 +676,7 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard,
}
}
/*
* InsertSplitOffShardMetadata inserts new shard and shard placement data into
* catolog tables both the coordinator and mx nodes.
@ -710,6 +730,7 @@ InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList)
}
}
/*
* DropShardList drops shards and their metadata from both the coordinator and
* mx nodes.
@ -775,6 +796,7 @@ DropShardList(List *shardIntervalList)
}
}
/*
* CreateForeignConstraints creates the foreign constraints on the newly
* created shards via the tenant isolation.
@ -847,6 +869,7 @@ CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList)
}
}
/*
* ExecuteCommandListOnPlacements runs the given command list on the nodes of
* the given shard placement list. First, it creates connections. Then it sends

View File

@ -38,7 +38,7 @@ typedef struct ShardCopyDestReceiver
DestReceiver pub;
/* Destination Relation Name */
char* destinationShardFullyQualifiedName;
char *destinationShardFullyQualifiedName;
/* descriptor of the tuples that are sent to the worker */
TupleDesc tupleDescriptor;
@ -61,29 +61,32 @@ typedef struct ShardCopyDestReceiver
/*
* Connection for destination shard (NULL if useLocalCopy is true)
*/
*/
MultiConnection *connection;
} ShardCopyDestReceiver;
static bool ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
static void ShardCopyDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor);
TupleDesc inputTupleDescriptor);
static void ShardCopyDestReceiverShutdown(DestReceiver *destReceiver);
static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver);
static bool CanUseLocalCopy(uint64 destinationNodeId);
static StringInfo ConstructCopyStatement(char* destinationShardFullyQualifiedName, bool useBinaryFormat);
static StringInfo ConstructCopyStatement(char *destinationShardFullyQualifiedName, bool
useBinaryFormat);
static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest);
static bool ShouldSendCopyNow(StringInfo buffer);
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState);
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
localCopyOutState);
static bool CanUseLocalCopy(uint64 destinationNodeId)
static bool
CanUseLocalCopy(uint64 destinationNodeId)
{
/* If destination node is same as source, use local copy */
return GetLocalNodeId() == destinationNodeId;
}
/*
* ShouldSendCopyNow returns true if the given buffer size exceeds the
* local copy buffer size threshold.
@ -95,6 +98,7 @@ ShouldSendCopyNow(StringInfo buffer)
return buffer->len > LocalCopyFlushThresholdByte;
}
static bool
ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{
@ -109,19 +113,21 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
/* Create connection lazily */
if(copyDest->tuplesSent == 0 && (!copyDest->useLocalCopy))
if (copyDest->tuplesSent == 0 && (!copyDest->useLocalCopy))
{
int connectionFlags = OUTSIDE_TRANSACTION;
char *currentUser = CurrentUserName();
WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId, false /* missingOk */);
WorkerNode *workerNode = FindNodeWithNodeId(copyDest->destinationNodeId,
false /* missingOk */);
copyDest->connection = GetNodeUserDatabaseConnection(connectionFlags,
workerNode->workerName,
workerNode->workerPort,
currentUser,
NULL /* database (current) */);
workerNode->workerName,
workerNode->workerPort,
currentUser,
NULL /* database (current) */);
ClaimConnectionExclusively(copyDest->connection);
StringInfo copyStatement = ConstructCopyStatement(copyDest->destinationShardFullyQualifiedName,
StringInfo copyStatement = ConstructCopyStatement(
copyDest->destinationShardFullyQualifiedName,
copyDest->copyOutState->binary);
ExecuteCriticalRemoteCommand(copyDest->connection, copyStatement->data);
}
@ -131,7 +137,7 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
bool *columnNulls = slot->tts_isnull;
CopyOutState copyOutState = copyDest->copyOutState;
if(copyDest->useLocalCopy)
if (copyDest->useLocalCopy)
{
WriteLocalTuple(slot, copyDest);
if (ShouldSendCopyNow(copyOutState->fe_msgbuf))
@ -145,15 +151,18 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor,
copyOutState, columnOutputFunctions, NULL /* columnCoercionPaths */);
if (!PutRemoteCopyData(copyDest->connection, copyOutState->fe_msgbuf->data, copyOutState->fe_msgbuf->len))
copyOutState, columnOutputFunctions,
NULL /* columnCoercionPaths */);
if (!PutRemoteCopyData(copyDest->connection, copyOutState->fe_msgbuf->data,
copyOutState->fe_msgbuf->len))
{
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to COPY to shard %s,",
copyDest->destinationShardFullyQualifiedName),
errdetail("failed to send %d bytes %s on node %u", copyOutState->fe_msgbuf->len,
copyOutState->fe_msgbuf->data,
copyDest->destinationNodeId)));
errmsg("Failed to COPY to shard %s,",
copyDest->destinationShardFullyQualifiedName),
errdetail("failed to send %d bytes %s on node %u",
copyOutState->fe_msgbuf->len,
copyOutState->fe_msgbuf->data,
copyDest->destinationNodeId)));
}
}
@ -164,8 +173,10 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
return true;
}
static void
ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor)
ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc
inputTupleDescriptor)
{
ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest;
copyDest->tupleDescriptor = inputTupleDescriptor;
@ -183,16 +194,17 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputT
copyOutState->delim = (char *) delimiterCharacter;
copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState);
copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary);
copyOutState->binary);
copyDest->copyOutState = copyOutState;
}
static void
ShardCopyDestReceiverShutdown(DestReceiver *dest)
{
ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest;
if(copyDest->useLocalCopy)
if (copyDest->useLocalCopy)
{
if (copyDest->copyOutState != NULL &&
copyDest->copyOutState->fe_msgbuf->len > 0)
@ -200,10 +212,10 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
LocalCopyToShard(copyDest, copyDest->copyOutState);
}
}
else if(copyDest->connection != NULL)
else if (copyDest->connection != NULL)
{
resetStringInfo(copyDest->copyOutState->fe_msgbuf);
if(copyDest->copyOutState->binary)
if (copyDest->copyOutState->binary)
{
AppendCopyBinaryFooters(copyDest->copyOutState);
}
@ -217,7 +229,8 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
}
/* check whether there were any COPY errors */
PGresult *result = GetRemoteCommandResult(copyDest->connection, true /* raiseInterrupts */);
PGresult *result = GetRemoteCommandResult(copyDest->connection,
true /* raiseInterrupts */);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
ReportCopyError(copyDest->connection, result);
@ -229,10 +242,11 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
}
}
DestReceiver * CreateShardCopyDestReceiver(
EState *executorState,
char* destinationShardFullyQualifiedName,
uint32_t destinationNodeId)
DestReceiver *
CreateShardCopyDestReceiver(EState *executorState,
char *destinationShardFullyQualifiedName,
uint32_t destinationNodeId)
{
ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) palloc0(
sizeof(ShardCopyDestReceiver));
@ -254,6 +268,7 @@ DestReceiver * CreateShardCopyDestReceiver(
return (DestReceiver *) copyDest;
}
static void
ShardCopyDestReceiverDestroy(DestReceiver *dest)
{
@ -272,6 +287,7 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest)
pfree(copyDest);
}
/*
* ConstructCopyStatement constructs the text of a COPY statement
* for copying into a result table
@ -279,11 +295,11 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest)
static StringInfo
ConstructCopyStatement(char *destinationShardFullyQualifiedName, bool useBinaryFormat)
{
StringInfo command = makeStringInfo();
StringInfo command = makeStringInfo();
appendStringInfo(command, "COPY %s FROM STDIN",
destinationShardFullyQualifiedName);
destinationShardFullyQualifiedName);
if(useBinaryFormat)
if (useBinaryFormat)
{
appendStringInfo(command, "WITH (format binary);");
}
@ -295,7 +311,9 @@ ConstructCopyStatement(char *destinationShardFullyQualifiedName, bool useBinaryF
return command;
}
static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest)
static void
WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest)
{
CopyOutState localCopyOutState = copyDest->copyOutState;
@ -306,7 +324,8 @@ static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDes
SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
bool isBinaryCopy = localCopyOutState->binary;
bool shouldAddBinaryHeaders = (isBinaryCopy && localCopyOutState->fe_msgbuf->len == 0);
bool shouldAddBinaryHeaders = (isBinaryCopy && localCopyOutState->fe_msgbuf->len ==
0);
if (shouldAddBinaryHeaders)
{
AppendCopyBinaryHeaders(localCopyOutState);
@ -321,6 +340,7 @@ static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDes
NULL /* columnCoercionPaths */);
}
/*
* LocalCopyToShard finishes local copy for the given destination shard.
*/
@ -342,10 +362,13 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState
char *destinationShardSchemaName = NULL;
char *destinationShardRelationName = NULL;
DeconstructQualifiedName(list_make1(copyDest->destinationShardFullyQualifiedName), &destinationShardSchemaName, &destinationShardRelationName);
DeconstructQualifiedName(list_make1(copyDest->destinationShardFullyQualifiedName),
&destinationShardSchemaName, &destinationShardRelationName);
Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false /* missing_ok */);
Oid destinationShardOid = get_relname_relid(destinationShardRelationName, destinationSchemaOid);
Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName,
false /* missing_ok */);
Oid destinationShardOid = get_relname_relid(destinationShardRelationName,
destinationSchemaOid);
DefElem *binaryFormatOption = NULL;
if (isBinaryCopy)
@ -356,7 +379,8 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState
Relation shard = table_open(destinationShardOid, RowExclusiveLock);
ParseState *pState = make_parsestate(NULL /* parentParseState */);
(void) addRangeTableEntryForRelation(pState, shard, AccessShareLock,
NULL /* alias */, false /* inh */, false /* inFromCl */);
NULL /* alias */, false /* inh */,
false /* inFromCl */);
CopyFromState cstate = BeginCopyFrom_compat(pState, shard,
NULL /* whereClause */,
NULL /* fileName */,
@ -373,6 +397,7 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState
free_parsestate(pState);
}
/*
* ReadFromLocalBufferCallback is the copy callback.
* It always tries to copy maxRead bytes.
@ -393,4 +418,3 @@ ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead)
return bytesRead;
}

View File

@ -45,13 +45,15 @@ typedef struct SplitCopyDestReceiver
static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor);
TupleDesc inputTupleDescriptor);
static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot,
DestReceiver *dest);
DestReceiver *dest);
static void SplitCopyDestReceiverShutdown(DestReceiver *dest);
static void SplitCopyDestReceiverDestroy(DestReceiver *copyDest);
DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, List* splitCopyInfoList)
DestReceiver *
CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy,
List *splitCopyInfoList)
{
SplitCopyDestReceiver *splitCopyDest =
palloc0(sizeof(SplitCopyDestReceiver));
@ -67,8 +69,10 @@ DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceS
ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(sourceShardIdToCopy);
splitCopyDest->sourceShardRelationOid = shardIntervalToSplitCopy->relationId;
DestReceiver **shardCopyDests = palloc0(splitCopyDest->splitFactor * sizeof(DestReceiver *));
SplitCopyInfo **splitCopyInfos = palloc0(splitCopyDest->splitFactor * sizeof(SplitCopyInfo *));
DestReceiver **shardCopyDests = palloc0(splitCopyDest->splitFactor *
sizeof(DestReceiver *));
SplitCopyInfo **splitCopyInfos = palloc0(splitCopyDest->splitFactor *
sizeof(SplitCopyInfo *));
SplitCopyInfo *splitCopyInfo = NULL;
int index = 0;
@ -76,12 +80,15 @@ DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceS
char *sourceShardNamePrefix = get_rel_name(shardIntervalToSplitCopy->relationId);
foreach_ptr(splitCopyInfo, splitCopyInfoList)
{
char *destinationShardSchemaName = get_namespace_name(get_rel_namespace(splitCopyDest->sourceShardRelationOid));;
char *destinationShardSchemaName = get_namespace_name(get_rel_namespace(
splitCopyDest->
sourceShardRelationOid));
char *destinationShardNameCopy = strdup(sourceShardNamePrefix);
AppendShardIdToName(&destinationShardNameCopy, splitCopyInfo->destinationShardId);
char *destinationShardFullyQualifiedName =
quote_qualified_identifier(destinationShardSchemaName, destinationShardNameCopy);
quote_qualified_identifier(destinationShardSchemaName,
destinationShardNameCopy);
DestReceiver *shardCopyDest = CreateShardCopyDestReceiver(
executorState,
@ -99,7 +106,10 @@ DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceS
return (DestReceiver *) splitCopyDest;
}
static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor)
static void
SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc
inputTupleDescriptor)
{
SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest;
@ -110,7 +120,9 @@ static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, Tupl
}
}
static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
static bool
SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{
SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest;
@ -119,7 +131,8 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *des
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(copyDest->sourceShardRelationOid);
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(
copyDest->sourceShardRelationOid);
if (cacheEntry == NULL)
{
ereport(ERROR, errmsg("Could not find shard %s for split copy.",
@ -137,21 +150,24 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *des
/* Partition Column Value cannot be null */
if (columnNulls[partitionColumnIndex])
{
ereport(ERROR, errmsg("Found null partition value for shard %s during split copy.",
get_rel_name(copyDest->sourceShardRelationOid)));
ereport(ERROR, errmsg(
"Found null partition value for shard %s during split copy.",
get_rel_name(copyDest->sourceShardRelationOid)));
}
Datum hashedValueDatum = FunctionCall1(hashFunction, columnValues[partitionColumnIndex]);
Datum hashedValueDatum = FunctionCall1(hashFunction,
columnValues[partitionColumnIndex]);
int32_t hashedValue = DatumGetInt32(hashedValueDatum);
for(int index = 0 ; index < copyDest->splitFactor; index++)
for (int index = 0; index < copyDest->splitFactor; index++)
{
SplitCopyInfo *splitCopyInfo = copyDest->splitCopyInfoArray[index];
if (splitCopyInfo->destinationShardMinHashValue <= hashedValue &&
splitCopyInfo->destinationShardMaxHashValue >= hashedValue)
{
DestReceiver *shardCopyDestReceiver = copyDest->shardCopyDestReceiverArray[index];
DestReceiver *shardCopyDestReceiver =
copyDest->shardCopyDestReceiverArray[index];
shardCopyDestReceiver->receiveSlot(slot, shardCopyDestReceiver);
}
}
@ -162,7 +178,9 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *des
return true;
}
static void SplitCopyDestReceiverShutdown(DestReceiver *dest)
static void
SplitCopyDestReceiverShutdown(DestReceiver *dest)
{
SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest;
@ -173,7 +191,9 @@ static void SplitCopyDestReceiverShutdown(DestReceiver *dest)
}
}
static void SplitCopyDestReceiverDestroy(DestReceiver *dest)
static void
SplitCopyDestReceiverDestroy(DestReceiver *dest)
{
SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest;

View File

@ -17,8 +17,8 @@
PG_FUNCTION_INFO_V1(worker_split_copy);
static void
ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo** splitCopyInfo);
static void ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum,
SplitCopyInfo **splitCopyInfo);
/*
*
@ -33,13 +33,15 @@ worker_split_copy(PG_FUNCTION_ARGS)
if (array_contains_nulls(splitCopyInfoArrayObject))
{
ereport(ERROR,
(errmsg("Shard Copy Info cannot have null values.")));
(errmsg("Shard Copy Info cannot have null values.")));
}
ArrayIterator copyInfo_iterator = array_create_iterator(splitCopyInfoArrayObject, 0 /* slice_ndim */, NULL /* mState */);
ArrayIterator copyInfo_iterator = array_create_iterator(splitCopyInfoArrayObject,
0 /* slice_ndim */,
NULL /* mState */);
Datum copyInfoDatum = 0;
bool isnull = false;
List* splitCopyInfoList = NULL;
List *splitCopyInfoList = NULL;
while (array_iterate(copyInfo_iterator, &copyInfoDatum, &isnull))
{
SplitCopyInfo *splitCopyInfo = NULL;
@ -49,56 +51,69 @@ worker_split_copy(PG_FUNCTION_ARGS)
}
EState *executor = CreateExecutorState();
DestReceiver *splitCopyDestReceiver = CreateSplitCopyDestReceiver(executor, shardIdToSplitCopy, splitCopyInfoList);
DestReceiver *splitCopyDestReceiver = CreateSplitCopyDestReceiver(executor,
shardIdToSplitCopy,
splitCopyInfoList);
StringInfo selectShardQueryForCopy = makeStringInfo();
appendStringInfo(selectShardQueryForCopy,
"SELECT * FROM %s;",
generate_qualified_relation_name(shardIntervalToSplitCopy->relationId));
"SELECT * FROM %s;",
generate_qualified_relation_name(
shardIntervalToSplitCopy->relationId));
ParamListInfo params = NULL;
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, (DestReceiver *) splitCopyDestReceiver);
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params,
(DestReceiver *) splitCopyDestReceiver);
FreeExecutorState(executor);
PG_RETURN_VOID();
}
static void
ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo** splitCopyInfo)
ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo)
{
HeapTupleHeader dataTuple = DatumGetHeapTupleHeader(splitCopyInfoDatum);
SplitCopyInfo *copyInfo = palloc0(sizeof(SplitCopyInfo));
bool isnull = false;
Datum destinationShardIdDatum = GetAttributeByName(dataTuple, "destination_shard_id", &isnull);
Datum destinationShardIdDatum = GetAttributeByName(dataTuple, "destination_shard_id",
&isnull);
if (isnull)
{
ereport(ERROR, (errmsg("destination_shard_id for split_copy_info cannot be null.")));
ereport(ERROR, (errmsg(
"destination_shard_id for split_copy_info cannot be null.")));
}
copyInfo->destinationShardId = DatumGetUInt64(destinationShardIdDatum);
Datum minValueDatum = GetAttributeByName(dataTuple, "destination_shard_min_value", &isnull);
Datum minValueDatum = GetAttributeByName(dataTuple, "destination_shard_min_value",
&isnull);
if (isnull)
{
ereport(ERROR, (errmsg("destination_shard_min_value for split_copy_info cannot be null.")));
ereport(ERROR, (errmsg(
"destination_shard_min_value for split_copy_info cannot be null.")));
}
char *destinationMinHash = text_to_cstring(DatumGetTextP(minValueDatum));
copyInfo->destinationShardMinHashValue = pg_strtoint64(destinationMinHash);
Datum maxValueDatum = GetAttributeByName(dataTuple, "destination_shard_max_value", &isnull);
Datum maxValueDatum = GetAttributeByName(dataTuple, "destination_shard_max_value",
&isnull);
if (isnull)
{
ereport(ERROR, (errmsg("destination_shard_max_value for split_copy_info cannot be null.")));
ereport(ERROR, (errmsg(
"destination_shard_max_value for split_copy_info cannot be null.")));
}
char *destinationMaxHash = text_to_cstring(DatumGetTextP(maxValueDatum));
copyInfo->destinationShardMaxHashValue = pg_strtoint64(destinationMaxHash);
Datum nodeIdDatum = GetAttributeByName(dataTuple, "destination_shard_node_id", &isnull);
Datum nodeIdDatum = GetAttributeByName(dataTuple, "destination_shard_node_id",
&isnull);
if (isnull)
{
ereport(ERROR, (errmsg("destination_shard_node_id for split_copy_info cannot be null.")));
ereport(ERROR, (errmsg(
"destination_shard_node_id for split_copy_info cannot be null.")));
}
copyInfo->destinationShardNodeId = DatumGetInt32(nodeIdDatum);

View File

@ -102,6 +102,7 @@ DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId)
return arrayObject;
}
/*
* Converts ArrayType to List.
*/
@ -120,10 +121,12 @@ IntegerArrayTypeToList(ArrayType *arrayObject)
return list;
}
/*
* Converts Text ArrayType to Integer List.
*/
extern List * TextArrayTypeToIntegerList(ArrayType *arrayObject, Oid datumTypeId)
extern List *
TextArrayTypeToIntegerList(ArrayType *arrayObject, Oid datumTypeId)
{
List *list = NULL;
Datum *datumObjectArray = DeconstructArrayObject(arrayObject);
@ -139,23 +142,26 @@ extern List * TextArrayTypeToIntegerList(ArrayType *arrayObject, Oid datumTypeId
{
int16_t *int16Value = palloc0(sizeof(int16_t));
*int16Value = pg_strtoint16(intAsStr);
list = lappend(list, (void*) int16Value);
list = lappend(list, (void *) int16Value);
break;
}
case INT4OID:
{
int32_t *int32Value = palloc0(sizeof(int32_t));
*int32Value = pg_strtoint32(intAsStr);
list = lappend(list, (void*) int32Value);
list = lappend(list, (void *) int32Value);
break;
}
case INT8OID:
{
int64_t *int64Value = palloc0(sizeof(int64_t));
*int64Value = pg_strtoint64(intAsStr);
list = lappend(list, (void*) int64Value);
list = lappend(list, (void *) int64Value);
break;
}
default:
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Unsupported datum type for array.")));

View File

@ -39,8 +39,10 @@ extern void SplitShard(SplitMode splitMode,
List *nodeIdsForPlacementList);
/* TODO(niupre): Make all these APIs private when all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API. */
extern void ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard);
extern void InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList);
extern void ErrorIfCannotSplitShard(SplitOperation splitOperation,
ShardInterval *sourceShard);
extern void InsertSplitOffShardMetadata(List *splitOffShardList,
List *sourcePlacementList);
extern void DropShardList(List *shardIntervalList);
extern void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList);
extern void ExecuteCommandListOnPlacements(List *commandList, List *placementList);

View File

@ -14,9 +14,8 @@
struct FullRelationName;
extern DestReceiver * CreateShardCopyDestReceiver(
EState *executorState,
char* destinationShardFullyQualifiedName,
uint32_t destinationNodeId);
extern DestReceiver * CreateShardCopyDestReceiver(EState *executorState,
char *destinationShardFullyQualifiedName,
uint32_t destinationNodeId);
#endif /* WORKER_SHARD_COPY_H_ */

View File

@ -14,12 +14,14 @@
typedef struct SplitCopyInfo
{
uint64 destinationShardId; /* destination shard id */
uint64 destinationShardId; /* destination shard id */
int32 destinationShardMinHashValue; /* min hash value of destination shard */
int32 destinationShardMaxHashValue; /* max hash value of destination shard */
uint32_t destinationShardNodeId; /* node where split child shard is to be placed */
uint32_t destinationShardNodeId; /* node where split child shard is to be placed */
} SplitCopyInfo;
extern DestReceiver* CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, List* splitCopyInfoList);
extern DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64
sourceShardIdToCopy,
List *splitCopyInfoList);
#endif /* WORKER_SPLIT_COPY_H_ */