From 0f1d2bbd1af911a36e82237af040f7068c7d5fee Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Thu, 16 Jun 2022 18:06:09 -0700 Subject: [PATCH] UDF calling --- .../distributed/operations/shard_split.c | 122 +++++++++++++----- src/include/distributed/listutils.h | 21 +++ 2 files changed, 114 insertions(+), 29 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 032c7a252..f1a0d04d2 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -16,6 +16,7 @@ #include "lib/stringinfo.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "distributed/adaptive_executor.h" #include "distributed/colocation_utils.h" #include "distributed/metadata_cache.h" #include "distributed/shardinterval_utils.h" @@ -25,10 +26,12 @@ #include "distributed/shard_split.h" #include "distributed/reference_table_utils.h" #include "distributed/multi_partitioning_utils.h" +#include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" #include "distributed/shared_library_init.h" #include "distributed/pg_dist_shard.h" #include "distributed/metadata_sync.h" +#include "distributed/multi_physical_planner.h" /* * These will be public function when citus-enterprise is merged @@ -45,9 +48,11 @@ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, ShardInterval *shardIntervalToSplit, List *shardSplitPointsList, List *nodeIdsForPlacementList); -static void CreateSplitShardsForShardGroup(List *splitChildrenShardIntervalListList, - List *workersForPlacementList, - List **splitOffShardList); +static void CreateSplitShardsForShardGroup(uint32_t sourceShardNodeId, + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *workersForPlacementList, + List **splitOffShardList); static void CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerNode); static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, @@ -59,6 +64,8 @@ static void BlockingShardSplit(SplitOperation splitOperation, ShardInterval *shardIntervalToSplit, List *shardSplitPointsList, List *workersForPlacementList); +static void DoSplitCopy(uint32_t sourceShardNodeId, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList); +static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChildrenShardIntervalList, List* workersForPlacementList); /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = @@ -408,16 +415,19 @@ BlockingShardSplit(SplitOperation splitOperation, sourceColocatedShardIntervalList, shardSplitPointsList); - /* Physically create split children and perform split copy */ - List *splitOffShardList = NIL; - CreateSplitShardsForShardGroup( - shardGroupSplitIntervalListList, - workersForPlacementList, - &splitOffShardList); - // Only single placement allowed (already validated by caller) List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId); Assert(sourcePlacementList->length == 1); + WorkerNode* sourceShardToCopyNode = (WorkerNode *) linitial(sourcePlacementList); + + /* Physically create split children and perform split copy */ + List *splitOffShardList = NULL; + CreateSplitShardsForShardGroup( + sourceShardToCopyNode->nodeId, + sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + workersForPlacementList, + &splitOffShardList); /* * Drop old shards and delete related metadata. Have to do that before @@ -441,7 +451,9 @@ BlockingShardSplit(SplitOperation splitOperation, /* Create ShardGroup split children on a list of corresponding workers. */ static void -CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, +CreateSplitShardsForShardGroup(uint32_t sourceShardNodeId, + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, List *workersForPlacementList, List **splitOffShardList) { @@ -471,28 +483,80 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, } /* Perform Split Copy */ - + DoSplitCopy(sourceShardNodeId, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList); // TODO(niupre) : Use Adaptive execution for creating multiple indexes parallely. - /* Create Indexes post copy */ - foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) - { - ShardInterval *shardInterval = NULL; - WorkerNode *workerPlacementNode = NULL; - forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, - workersForPlacementList) - { - List *indexCommandList = GetPostLoadTableCreationCommands( - shardInterval->relationId, - true /* includeIndexes */, - true /* includeReplicaIdentity */); - indexCommandList = WorkerApplyShardDDLCommandList( - indexCommandList, - shardInterval->shardId); +} - CreateObjectOnPlacement(indexCommandList, workerPlacementNode); - } +static void +DoSplitCopy(uint32_t sourceShardNodeId, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList) +{ + ShardInterval *sourceShardIntervalToCopy = NULL; + List *splitShardIntervalList = NULL; + + WorkerNode *workerNodeSource = NULL; + WorkerNode *workerNodeDestination = NULL; + int taskId = 0; + List *splitCopyTaskList = NULL; + forthree_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, + splitShardIntervalList, shardGroupSplitIntervalListList, + workerNodeDestination, workersForPlacementList) + { + StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy, splitShardIntervalList, workersForPlacementList); + + Task *task = CreateBasicTask( + sourceShardIntervalToCopy->shardId, /* jobId */ + taskId, + READ_TASK, + splitCopyUdfCommand->data); + + ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); + SetPlacementNodeMetadata(taskPlacement, workerNodeSource); + + task->taskPlacementList = list_make1(taskPlacement); + + splitCopyTaskList = lappend(splitCopyTaskList, task); + taskId++; } + + // TODO(niupre) : Pass appropriate MaxParallelShards value from GUC. + ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList, 1, NULL /* jobIdList */); +} + +static StringInfo +CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChildrenShardIntervalList, List* workersForPlacementList) +{ + StringInfo splitCopyInfoArray = makeStringInfo(); + appendStringInfo(splitCopyInfoArray, "ARRAY["); + + ShardInterval *splitChildShardInterval = NULL; + bool addComma = false; + WorkerNode *destinationWorkerNode = NULL; + forboth_ptr(splitChildShardInterval, splitChildrenShardIntervalList, destinationWorkerNode, workersForPlacementList) + { + if(addComma) + { + appendStringInfo(splitCopyInfoArray, ","); + } + + StringInfo splitCopyInfoRow = makeStringInfo(); + appendStringInfo(splitCopyInfoRow, "ROW(%lu, %lu, %lu, %u)::citus.split_copy_info", + splitChildShardInterval->shardId, + splitChildShardInterval->minValue, + splitChildShardInterval->maxValue, + destinationWorkerNode->nodeId); + appendStringInfo(splitCopyInfoArray, "%s", splitCopyInfoRow->data); + + addComma = true; + } + appendStringInfo(splitCopyInfoArray, "]"); + + StringInfo splitCopyUdf = makeStringInfo(); + appendStringInfo(splitCopyUdf, "SELECT worker_split_copy(%lu, %s);", + sourceShardSplitInterval->shardId, + splitCopyInfoArray->data); + + return splitCopyUdf; } /* diff --git a/src/include/distributed/listutils.h b/src/include/distributed/listutils.h index e4a185b4d..c1ea8c20f 100644 --- a/src/include/distributed/listutils.h +++ b/src/include/distributed/listutils.h @@ -97,6 +97,27 @@ typedef struct ListCellAndListWrapper var2 ## CellDoNotUse = lnext_compat(l2, var2 ## CellDoNotUse) \ ) +/* + * forthree_ptr - + * a convenience macro which loops through three lists of pointers at the same + * time, without needing a ListCell. It only needs three declared pointer + * variables to store the pointer of each of the three cells in. + */ +#define forthree_ptr(var1, l1, var2, l2, var3, l3) \ + for (ListCell *(var1 ## CellDoNotUse) = list_head(l1), \ + *(var2 ## CellDoNotUse) = list_head(l2), \ + *(var3 ## CellDoNotUse) = list_head(l3); \ + (var1 ## CellDoNotUse) != NULL && \ + (var2 ## CellDoNotUse) != NULL && \ + (var3 ## CellDoNotUse) != NULL && \ + (((var1) = lfirst(var1 ## CellDoNotUse)) || true) && \ + (((var2) = lfirst(var2 ## CellDoNotUse)) || true) && \ + (((var3) = lfirst(var3 ## CellDoNotUse)) || true); \ + var1 ## CellDoNotUse = lnext_compat(l1, var1 ## CellDoNotUse), \ + var2 ## CellDoNotUse = lnext_compat(l2, var2 ## CellDoNotUse), \ + var3 ## CellDoNotUse = lnext_compat(l3, var3 ## CellDoNotUse) \ + ) + /* * forboth_ptr_oid - * a convenience macro which loops through two lists at the same time. The