From 4fcdff53ae89d702dcbab094b14bd88b9e399fb5 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Tue, 28 Jun 2022 16:45:26 -0700 Subject: [PATCH] Addressing open comments --- .../distributed/operations/shard_split.c | 16 +++++ .../operations/worker_shard_copy.c | 60 +++++++++---------- .../operations/worker_split_copy_udf.c | 9 +-- 3 files changed, 51 insertions(+), 34 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index aeeca3c1e..1b779f65b 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -575,6 +575,22 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, * 'sourceShardSplitInterval' : Source shard interval to be copied. * 'splitChildrenShardINnerIntervalList' : List of shard intervals for split children. * 'destinationWorkerNodesList' : List of workers for split children placement. + * Here is an example of a 2 way split copy : + * SELECT * from worker_split_copy( + * 81060000, -- source shard id to split copy + * ARRAY[ + * -- split copy info for split children 1 + * ROW(81060015, -- destination shard id + * -2147483648, -- split range begin + * 1073741823, --split range end + * 10 -- worker node id)::citus.split_copy_info, + * -- split copy info for split children 2 + * ROW(81060016, --destination shard id + * 1073741824, --split range begin + * 2147483647, --split range end + * 11 -- workef node id)::citus.split_copy_info + * ] + * ); */ static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index 71042d7f6..d488ef473 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -135,6 +135,36 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) } +/* + * CreateShardCopyDestReceiver creates a DestReceiver that copies into + * a destinationShardFullyQualifiedName on destinationNodeId. + */ +DestReceiver * +CreateShardCopyDestReceiver(EState *executorState, + List *destinationShardFullyQualifiedName, + uint32_t destinationNodeId) +{ + ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) palloc0( + sizeof(ShardCopyDestReceiver)); + + /* set up the DestReceiver function pointers */ + copyDest->pub.receiveSlot = ShardCopyDestReceiverReceive; + copyDest->pub.rStartup = ShardCopyDestReceiverStartup; + copyDest->pub.rShutdown = ShardCopyDestReceiverShutdown; + copyDest->pub.rDestroy = ShardCopyDestReceiverDestroy; + copyDest->pub.mydest = DestCopyOut; + copyDest->executorState = executorState; + + copyDest->destinationNodeId = destinationNodeId; + copyDest->destinationShardFullyQualifiedName = destinationShardFullyQualifiedName; + copyDest->tuplesSent = 0; + copyDest->connection = NULL; + copyDest->useLocalCopy = CanUseLocalCopy(destinationNodeId); + + return (DestReceiver *) copyDest; +} + + /* * ShardCopyDestReceiverReceive implements the receiveSlot function of * ShardCopyDestReceiver. It takes a TupleTableSlot and sends the contents to @@ -304,36 +334,6 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) } -/* - * CreateShardCopyDestReceiver creates a DestReceiver that copies into - * a destinationShardFullyQualifiedName on destinationNodeId. - */ -DestReceiver * -CreateShardCopyDestReceiver(EState *executorState, - List *destinationShardFullyQualifiedName, - uint32_t destinationNodeId) -{ - ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) palloc0( - sizeof(ShardCopyDestReceiver)); - - /* set up the DestReceiver function pointers */ - copyDest->pub.receiveSlot = ShardCopyDestReceiverReceive; - copyDest->pub.rStartup = ShardCopyDestReceiverStartup; - copyDest->pub.rShutdown = ShardCopyDestReceiverShutdown; - copyDest->pub.rDestroy = ShardCopyDestReceiverDestroy; - copyDest->pub.mydest = DestCopyOut; - copyDest->executorState = executorState; - - copyDest->destinationNodeId = destinationNodeId; - copyDest->destinationShardFullyQualifiedName = destinationShardFullyQualifiedName; - copyDest->tuplesSent = 0; - copyDest->connection = NULL; - copyDest->useLocalCopy = CanUseLocalCopy(destinationNodeId); - - return (DestReceiver *) copyDest; -} - - /* * ShardCopyDestReceiverDestroy frees the DestReceiver. */ diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index 524bdf7d5..0314875b0 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -60,9 +60,11 @@ worker_split_copy(PG_FUNCTION_ARGS) (errmsg("Shard Copy Info cannot have null values."))); } + const int slice_ndim = 0; + ArrayMetaState *mState = NULL; ArrayIterator copyInfo_iterator = array_create_iterator(splitCopyInfoArrayObject, - 0 /* slice_ndim */, - NULL /* mState */); + slice_ndim, + mState); Datum copyInfoDatum = 0; bool isnull = false; List *splitCopyInfoList = NULL; @@ -230,10 +232,9 @@ CreatePartitionedSplitCopyDestReceiver(EState *estate, ArrayType *minValuesArray = NULL; ArrayType *maxValuesArray = NULL; BuildMinMaxRangeArrays(splitCopyInfoList, &minValuesArray, &maxValuesArray); - char partitionMethod = PartitionMethodViaCatalog( - shardIntervalToSplitCopy->relationId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry( shardIntervalToSplitCopy->relationId); + char partitionMethod = cacheEntry->partitionMethod; Var *partitionColumn = cacheEntry->partitionColumn; CitusTableCacheEntry *shardSearchInfo =