UDF calling

pull/6029/head
Nitish Upreti 2022-06-16 18:06:09 -07:00
parent 77253cdafb
commit 0f1d2bbd1a
2 changed files with 114 additions and 29 deletions

View File

@ -16,6 +16,7 @@
#include "lib/stringinfo.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "distributed/adaptive_executor.h"
#include "distributed/colocation_utils.h"
#include "distributed/metadata_cache.h"
#include "distributed/shardinterval_utils.h"
@ -25,10 +26,12 @@
#include "distributed/shard_split.h"
#include "distributed/reference_table_utils.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "distributed/shared_library_init.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_physical_planner.h"
/*
* These will be public function when citus-enterprise is merged
@ -45,9 +48,11 @@ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
ShardInterval *shardIntervalToSplit,
List *shardSplitPointsList,
List *nodeIdsForPlacementList);
static void CreateSplitShardsForShardGroup(List *splitChildrenShardIntervalListList,
List *workersForPlacementList,
List **splitOffShardList);
static void CreateSplitShardsForShardGroup(uint32_t sourceShardNodeId,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList,
List **splitOffShardList);
static void CreateObjectOnPlacement(List *objectCreationCommandList,
WorkerNode *workerNode);
static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList,
@ -59,6 +64,8 @@ static void BlockingShardSplit(SplitOperation splitOperation,
ShardInterval *shardIntervalToSplit,
List *shardSplitPointsList,
List *workersForPlacementList);
static void DoSplitCopy(uint32_t sourceShardNodeId, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList);
static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChildrenShardIntervalList, List* workersForPlacementList);
/* Customize error message strings based on operation type */
static const char *const SplitOperationName[] =
@ -408,16 +415,19 @@ BlockingShardSplit(SplitOperation splitOperation,
sourceColocatedShardIntervalList,
shardSplitPointsList);
/* Physically create split children and perform split copy */
List *splitOffShardList = NIL;
CreateSplitShardsForShardGroup(
shardGroupSplitIntervalListList,
workersForPlacementList,
&splitOffShardList);
// Only single placement allowed (already validated by caller)
List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId);
Assert(sourcePlacementList->length == 1);
WorkerNode* sourceShardToCopyNode = (WorkerNode *) linitial(sourcePlacementList);
/* Physically create split children and perform split copy */
List *splitOffShardList = NULL;
CreateSplitShardsForShardGroup(
sourceShardToCopyNode->nodeId,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
workersForPlacementList,
&splitOffShardList);
/*
* Drop old shards and delete related metadata. Have to do that before
@ -441,7 +451,9 @@ BlockingShardSplit(SplitOperation splitOperation,
/* Create ShardGroup split children on a list of corresponding workers. */
static void
CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
CreateSplitShardsForShardGroup(uint32_t sourceShardNodeId,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList,
List **splitOffShardList)
{
@ -471,28 +483,80 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
}
/* Perform Split Copy */
DoSplitCopy(sourceShardNodeId, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList);
// TODO(niupre) : Use Adaptive execution for creating multiple indexes parallely.
/* Create Indexes post copy */
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL;
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
workersForPlacementList)
{
List *indexCommandList = GetPostLoadTableCreationCommands(
shardInterval->relationId,
true /* includeIndexes */,
true /* includeReplicaIdentity */);
indexCommandList = WorkerApplyShardDDLCommandList(
indexCommandList,
shardInterval->shardId);
}
CreateObjectOnPlacement(indexCommandList, workerPlacementNode);
}
static void
DoSplitCopy(uint32_t sourceShardNodeId, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList)
{
ShardInterval *sourceShardIntervalToCopy = NULL;
List *splitShardIntervalList = NULL;
WorkerNode *workerNodeSource = NULL;
WorkerNode *workerNodeDestination = NULL;
int taskId = 0;
List *splitCopyTaskList = NULL;
forthree_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList,
splitShardIntervalList, shardGroupSplitIntervalListList,
workerNodeDestination, workersForPlacementList)
{
StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy, splitShardIntervalList, workersForPlacementList);
Task *task = CreateBasicTask(
sourceShardIntervalToCopy->shardId, /* jobId */
taskId,
READ_TASK,
splitCopyUdfCommand->data);
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(taskPlacement, workerNodeSource);
task->taskPlacementList = list_make1(taskPlacement);
splitCopyTaskList = lappend(splitCopyTaskList, task);
taskId++;
}
// TODO(niupre) : Pass appropriate MaxParallelShards value from GUC.
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList, 1, NULL /* jobIdList */);
}
static StringInfo
CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List* splitChildrenShardIntervalList, List* workersForPlacementList)
{
StringInfo splitCopyInfoArray = makeStringInfo();
appendStringInfo(splitCopyInfoArray, "ARRAY[");
ShardInterval *splitChildShardInterval = NULL;
bool addComma = false;
WorkerNode *destinationWorkerNode = NULL;
forboth_ptr(splitChildShardInterval, splitChildrenShardIntervalList, destinationWorkerNode, workersForPlacementList)
{
if(addComma)
{
appendStringInfo(splitCopyInfoArray, ",");
}
StringInfo splitCopyInfoRow = makeStringInfo();
appendStringInfo(splitCopyInfoRow, "ROW(%lu, %lu, %lu, %u)::citus.split_copy_info",
splitChildShardInterval->shardId,
splitChildShardInterval->minValue,
splitChildShardInterval->maxValue,
destinationWorkerNode->nodeId);
appendStringInfo(splitCopyInfoArray, "%s", splitCopyInfoRow->data);
addComma = true;
}
appendStringInfo(splitCopyInfoArray, "]");
StringInfo splitCopyUdf = makeStringInfo();
appendStringInfo(splitCopyUdf, "SELECT worker_split_copy(%lu, %s);",
sourceShardSplitInterval->shardId,
splitCopyInfoArray->data);
return splitCopyUdf;
}
/*

View File

@ -97,6 +97,27 @@ typedef struct ListCellAndListWrapper
var2 ## CellDoNotUse = lnext_compat(l2, var2 ## CellDoNotUse) \
)
/*
* forthree_ptr -
* a convenience macro which loops through three lists of pointers at the same
* time, without needing a ListCell. It only needs three declared pointer
* variables to store the pointer of each of the three cells in.
*/
#define forthree_ptr(var1, l1, var2, l2, var3, l3) \
for (ListCell *(var1 ## CellDoNotUse) = list_head(l1), \
*(var2 ## CellDoNotUse) = list_head(l2), \
*(var3 ## CellDoNotUse) = list_head(l3); \
(var1 ## CellDoNotUse) != NULL && \
(var2 ## CellDoNotUse) != NULL && \
(var3 ## CellDoNotUse) != NULL && \
(((var1) = lfirst(var1 ## CellDoNotUse)) || true) && \
(((var2) = lfirst(var2 ## CellDoNotUse)) || true) && \
(((var3) = lfirst(var3 ## CellDoNotUse)) || true); \
var1 ## CellDoNotUse = lnext_compat(l1, var1 ## CellDoNotUse), \
var2 ## CellDoNotUse = lnext_compat(l2, var2 ## CellDoNotUse), \
var3 ## CellDoNotUse = lnext_compat(l3, var3 ## CellDoNotUse) \
)
/*
* forboth_ptr_oid -
* a convenience macro which loops through two lists at the same time. The