From 75ae1d02658413206cfdbc323700d4aac0c913e1 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Mon, 27 Jun 2022 13:45:58 -0700 Subject: [PATCH] Remove SplitCopyDestReceiver and use PartitionedResultDestReceiver --- .../distributed_intermediate_results.c | 4 +- .../partitioned_intermediate_results.c | 16 +- .../operations/worker_split_copy.c | 223 ------------------ .../operations/worker_split_copy_udf.c | 129 +++++++++- .../distributed/intermediate_results.h | 17 ++ src/include/distributed/worker_split_copy.h | 27 --- 6 files changed, 142 insertions(+), 274 deletions(-) delete mode 100644 src/backend/distributed/operations/worker_split_copy.c delete mode 100644 src/include/distributed/worker_split_copy.h diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index f7d62e157..e7a5830e6 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -74,8 +74,6 @@ static void PartitioningTupleDestPutTuple(TupleDestination *self, Task *task, HeapTuple heapTuple, uint64 tupleLibpqSize); static TupleDesc PartitioningTupleDestTupleDescForQuery(TupleDestination *self, int queryNumber); -static ArrayType * CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int - datumCount, Oid typeId); static char * SourceShardPrefix(const char *resultPrefix, uint64 shardId); static DistributedResultFragment * TupleToDistributedResultFragment(HeapTuple heapTuple, TupleDesc tupleDesc, @@ -372,7 +370,7 @@ ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount, /* * CreateArrayFromDatums creates an array consisting of given values and nulls. */ -static ArrayType * +ArrayType * CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int datumCount, Oid typeId) { bool typeByValue = false; diff --git a/src/backend/distributed/executor/partitioned_intermediate_results.c b/src/backend/distributed/executor/partitioned_intermediate_results.c index b59538888..e19829ca2 100644 --- a/src/backend/distributed/executor/partitioned_intermediate_results.c +++ b/src/backend/distributed/executor/partitioned_intermediate_results.c @@ -95,18 +95,6 @@ typedef struct PartitionedResultDestReceiver } PartitionedResultDestReceiver; static Portal StartPortalForQueryExecution(const char *queryString); -static CitusTableCacheEntry * QueryTupleShardSearchInfo(ArrayType *minValuesArray, - ArrayType *maxValuesArray, - char partitionMethod, - Var *partitionColumn); -static DestReceiver * CreatePartitionedResultDestReceiver(int partitionColumnIndex, - int partitionCount, - CitusTableCacheEntry * - shardSearchInfo, - DestReceiver ** - partitionedDestReceivers, - bool lazyStartup, - bool allowNullPartitionValues); static void PartitionedResultDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor); static bool PartitionedResultDestReceiverReceive(TupleTableSlot *slot, @@ -319,7 +307,7 @@ StartPortalForQueryExecution(const char *queryString) * information so that FindShardInterval() can find the shard corresponding * to a tuple. */ -static CitusTableCacheEntry * +CitusTableCacheEntry * QueryTupleShardSearchInfo(ArrayType *minValuesArray, ArrayType *maxValuesArray, char partitionMethod, Var *partitionColumn) { @@ -408,7 +396,7 @@ QueryTupleShardSearchInfo(ArrayType *minValuesArray, ArrayType *maxValuesArray, /* * CreatePartitionedResultDestReceiver sets up a partitioned dest receiver. */ -static DestReceiver * +DestReceiver * CreatePartitionedResultDestReceiver(int partitionColumnIndex, int partitionCount, CitusTableCacheEntry *shardSearchInfo, diff --git a/src/backend/distributed/operations/worker_split_copy.c b/src/backend/distributed/operations/worker_split_copy.c deleted file mode 100644 index cf85a4595..000000000 --- a/src/backend/distributed/operations/worker_split_copy.c +++ /dev/null @@ -1,223 +0,0 @@ -/*------------------------------------------------------------------------- - * - * worker_split_copy.c - * API implementation for worker shard split copy. - * - * Copyright (c) Citus Data, Inc. - * - * - *------------------------------------------------------------------------- - */ - -#include -#include "c.h" -#include "postgres.h" -#include "catalog/namespace.h" -#include "utils/lsyscache.h" -#include "utils/builtins.h" -#include "distributed/listutils.h" -#include "distributed/metadata_cache.h" -#include "distributed/relation_utils.h" -#include "distributed/worker_split_copy.h" -#include "distributed/worker_shard_copy.h" -#include "distributed/relay_utility.h" - -typedef struct SplitCopyDestReceiver -{ - /* public DestReceiver interface */ - DestReceiver pub; - - /* Underlying shard copy dest receivers */ - DestReceiver **shardCopyDestReceiverArray; - - /* Split Copy Info */ - SplitCopyInfo **splitCopyInfoArray; - - /* Split factor */ - uint splitFactor; - - /* EState for per-tuple memory allocation */ - EState *executorState; - - /* Source shard Oid */ - Oid sourceShardRelationOid; -} SplitCopyDestReceiver; - - -static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, - TupleDesc inputTupleDescriptor); -static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, - DestReceiver *dest); -static void SplitCopyDestReceiverShutdown(DestReceiver *dest); -static void SplitCopyDestReceiverDestroy(DestReceiver *copyDest); - -/* - * CreateSplitCopyDestReceiver creates a DestReceiver that performs - * split copy for sourceShardIdToCopy to destination split children - * based on splitCopyInfoList. - */ -DestReceiver * -CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy, - List *splitCopyInfoList) -{ - SplitCopyDestReceiver *splitCopyDest = - palloc0(sizeof(SplitCopyDestReceiver)); - - /* set up the DestReceiver function pointers */ - splitCopyDest->pub.receiveSlot = SplitCopyDestReceiverReceive; - splitCopyDest->pub.rStartup = SplitCopyDestReceiverStartup; - splitCopyDest->pub.rShutdown = SplitCopyDestReceiverShutdown; - splitCopyDest->pub.rDestroy = SplitCopyDestReceiverDestroy; - - splitCopyDest->executorState = executorState; - splitCopyDest->splitFactor = splitCopyInfoList->length; - ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(sourceShardIdToCopy); - splitCopyDest->sourceShardRelationOid = shardIntervalToSplitCopy->relationId; - - DestReceiver **shardCopyDests = palloc0(splitCopyDest->splitFactor * - sizeof(DestReceiver *)); - SplitCopyInfo **splitCopyInfos = palloc0(splitCopyDest->splitFactor * - sizeof(SplitCopyInfo *)); - - SplitCopyInfo *splitCopyInfo = NULL; - int index = 0; - char *sourceShardNamePrefix = get_rel_name(shardIntervalToSplitCopy->relationId); - foreach_ptr(splitCopyInfo, splitCopyInfoList) - { - char *destinationShardSchemaName = get_namespace_name(get_rel_namespace( - splitCopyDest-> - sourceShardRelationOid)); - char *destinationShardNameCopy = pstrdup(sourceShardNamePrefix); - AppendShardIdToName(&destinationShardNameCopy, splitCopyInfo->destinationShardId); - - DestReceiver *shardCopyDest = CreateShardCopyDestReceiver( - executorState, - list_make2(destinationShardSchemaName, destinationShardNameCopy), - splitCopyInfo->destinationShardNodeId); - - shardCopyDests[index] = shardCopyDest; - splitCopyInfos[index] = splitCopyInfo; - index++; - } - - splitCopyDest->shardCopyDestReceiverArray = shardCopyDests; - splitCopyDest->splitCopyInfoArray = splitCopyInfos; - - return (DestReceiver *) splitCopyDest; -} - - -/* - * SplitCopyDestReceiverStartup implements the rStartup interface of SplitCopyDestReceiver. - */ -static void -SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc - inputTupleDescriptor) -{ - SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest; - - for (int index = 0; index < copyDest->splitFactor; index++) - { - DestReceiver *shardCopyDest = copyDest->shardCopyDestReceiverArray[index]; - shardCopyDest->rStartup(shardCopyDest, operation, inputTupleDescriptor); - } -} - - -/* - * SplitCopyDestReceiverReceive implements the receiveSlot function of - * SplitCopyDestReceiver. It takes a TupleTableSlot and sends the contents to - * the appropriate destination shard. - */ -static bool -SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) -{ - SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest; - - /* Switch to a per-tuple memory memory context */ - EState *executorState = copyDest->executorState; - MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); - MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); - - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry( - copyDest->sourceShardRelationOid); - if (cacheEntry == NULL) - { - ereport(ERROR, errmsg("Could not find shard %s for split copy.", - get_rel_name(copyDest->sourceShardRelationOid))); - } - - /* Partition Column Metadata on source shard */ - int partitionColumnIndex = cacheEntry->partitionColumn->varattno - 1; - FmgrInfo *hashFunction = cacheEntry->hashFunction; - - slot_getallattrs(slot); - Datum *columnValues = slot->tts_values; - bool *columnNulls = slot->tts_isnull; - - /* Partition Column Value cannot be null */ - if (columnNulls[partitionColumnIndex]) - { - ereport(ERROR, errmsg( - "Found null partition value for shard %s during split copy.", - get_rel_name(copyDest->sourceShardRelationOid))); - } - - Datum hashedValueDatum = FunctionCall1(hashFunction, - columnValues[partitionColumnIndex]); - int32_t hashedValue = DatumGetInt32(hashedValueDatum); - - for (int index = 0; index < copyDest->splitFactor; index++) - { - SplitCopyInfo *splitCopyInfo = copyDest->splitCopyInfoArray[index]; - - if (splitCopyInfo->destinationShardMinHashValue <= hashedValue && - splitCopyInfo->destinationShardMaxHashValue >= hashedValue) - { - DestReceiver *shardCopyDestReceiver = - copyDest->shardCopyDestReceiverArray[index]; - shardCopyDestReceiver->receiveSlot(slot, shardCopyDestReceiver); - } - } - - MemoryContextSwitchTo(oldContext); - ResetPerTupleExprContext(executorState); - - return true; -} - - -/* - * SplitCopyDestReceiverShutdown implements the rShutdown interface of - * SplitCopyDestReceiver. It ends all open COPY operations. - */ -static void -SplitCopyDestReceiverShutdown(DestReceiver *dest) -{ - SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest; - - for (int index = 0; index < copyDest->splitFactor; index++) - { - DestReceiver *shardCopyDest = copyDest->shardCopyDestReceiverArray[index]; - shardCopyDest->rShutdown(shardCopyDest); - } -} - - -/* - * SplitCopyDestReceiverDestroy frees the DestReceiver. - */ -static void -SplitCopyDestReceiverDestroy(DestReceiver *dest) -{ - SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest; - - for (int index = 0; index < copyDest->splitFactor; index++) - { - DestReceiver *shardCopyDest = copyDest->shardCopyDestReceiverArray[index]; - shardCopyDest->rDestroy(shardCopyDest); - - pfree(shardCopyDest); - pfree(copyDest->splitCopyInfoArray[index]); - } -} diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index 66da99845..0bea0f1c5 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -9,16 +9,34 @@ #include "postgres.h" #include "pg_version_compat.h" +#include "utils/lsyscache.h" #include "utils/array.h" #include "utils/builtins.h" +#include "distributed/listutils.h" #include "distributed/multi_executor.h" +#include "distributed/worker_shard_copy.h" #include "distributed/worker_split_copy.h" +#include "distributed/intermediate_results.h" #include "distributed/citus_ruleutils.h" PG_FUNCTION_INFO_V1(worker_split_copy); +typedef struct SplitCopyInfo +{ + uint64 destinationShardId; /* destination shard id */ + Datum destinationShardMinHashValue; /* min hash value of destination shard */ + Datum destinationShardMaxHashValue; /* max hash value of destination shard */ + uint32_t destinationShardNodeId; /* node where split child shard is to be placed */ +} SplitCopyInfo; + static void ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo); +static DestReceiver** CreateShardCopyDestReceivers(EState *estate, ShardInterval *shardIntervalToSplitCopy, List *splitCopyInfoList); +static DestReceiver* CreatePartitionedSplitCopyDestReceiver( + EState *executor, + ShardInterval *shardIntervalToSplitCopy, + List *splitCopyInfoList); +static void BuildMinMaxRangeArrays(List *splitCopyInfoList, ArrayType **minValueArray, ArrayType **maxValueArray); /* * worker_split_copy(source_shard_id bigint, splitCopyInfo citus.split_copy_info[]) @@ -54,9 +72,9 @@ worker_split_copy(PG_FUNCTION_ARGS) } EState *executor = CreateExecutorState(); - DestReceiver *splitCopyDestReceiver = CreateSplitCopyDestReceiver(executor, - shardIdToSplitCopy, - splitCopyInfoList); + DestReceiver *splitCopyDestReceiver = CreatePartitionedSplitCopyDestReceiver(executor, + shardIntervalToSplitCopy, + splitCopyInfoList); char *sourceShardToCopyName = generate_qualified_relation_name( shardIntervalToSplitCopy->relationId); @@ -101,8 +119,7 @@ ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) ereport(ERROR, (errmsg( "destination_shard_min_value for split_copy_info cannot be null."))); } - char *destinationMinHash = text_to_cstring(DatumGetTextP(minValueDatum)); - copyInfo->destinationShardMinHashValue = pg_strtoint32(destinationMinHash); + copyInfo->destinationShardMinHashValue = minValueDatum; Datum maxValueDatum = GetAttributeByName(dataTuple, "destination_shard_max_value", &isnull); @@ -111,8 +128,7 @@ ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) ereport(ERROR, (errmsg( "destination_shard_max_value for split_copy_info cannot be null."))); } - char *destinationMaxHash = text_to_cstring(DatumGetTextP(maxValueDatum)); - copyInfo->destinationShardMaxHashValue = pg_strtoint32(destinationMaxHash); + copyInfo->destinationShardMaxHashValue = maxValueDatum; Datum nodeIdDatum = GetAttributeByName(dataTuple, "destination_shard_node_id", &isnull); @@ -125,3 +141,102 @@ ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) *splitCopyInfo = copyInfo; } + + +/* Build 'min/max' hash range arrays for PartitionedResultDestReceiver */ +static void +BuildMinMaxRangeArrays(List *splitCopyInfoList, ArrayType **minValueArray, ArrayType **maxValueArray) +{ + int partitionCount = list_length(splitCopyInfoList); + + Datum *minValues = palloc0(partitionCount * sizeof(Datum)); + bool *minValueNulls = palloc0(partitionCount * sizeof(bool)); + Datum *maxValues = palloc0(partitionCount * sizeof(Datum)); + bool *maxValueNulls = palloc0(partitionCount * sizeof(bool)); + + SplitCopyInfo *splitCopyInfo = NULL; + int index = 0; + foreach_ptr(splitCopyInfo, splitCopyInfoList) + { + minValues[index] = splitCopyInfo->destinationShardMinHashValue; + maxValues[index] = splitCopyInfo->destinationShardMaxHashValue; + + /* Caller enforces that min/max values will be not-null */ + minValueNulls[index] = false; + maxValueNulls[index] = false; + index++; + } + + *minValueArray = CreateArrayFromDatums(minValues, minValueNulls, partitionCount, TEXTOID); + *maxValueArray = CreateArrayFromDatums(maxValues, maxValueNulls, partitionCount, TEXTOID); +} + + +/* + * Create underlying ShardCopyDestReceivers for PartitionedResultDestReceiver + * Each ShardCopyDestReceivers will be responsible for copying tuples from source shard, + * that fall under its min/max range, to specified destination shard. + */ +static DestReceiver** +CreateShardCopyDestReceivers(EState *estate, ShardInterval *shardIntervalToSplitCopy, List *splitCopyInfoList) +{ + DestReceiver **shardCopyDests = palloc0(splitCopyInfoList->length * sizeof(DestReceiver *)); + + SplitCopyInfo *splitCopyInfo = NULL; + int index = 0; + char *sourceShardNamePrefix = get_rel_name(shardIntervalToSplitCopy->relationId); + foreach_ptr(splitCopyInfo, splitCopyInfoList) + { + char *destinationShardSchemaName = get_namespace_name(get_rel_namespace( + shardIntervalToSplitCopy->relationId)); + char *destinationShardNameCopy = pstrdup(sourceShardNamePrefix); + AppendShardIdToName(&destinationShardNameCopy, splitCopyInfo->destinationShardId); + + DestReceiver *shardCopyDest = CreateShardCopyDestReceiver( + estate, + list_make2(destinationShardSchemaName, destinationShardNameCopy), + splitCopyInfo->destinationShardNodeId); + + shardCopyDests[index] = shardCopyDest; + index++; + } + + return shardCopyDests; +} + + +/* Create PartitionedSplitCopyDestReceiver along with underlying ShardCopyDestReceivers */ +static DestReceiver* +CreatePartitionedSplitCopyDestReceiver(EState *estate, ShardInterval *shardIntervalToSplitCopy, List *splitCopyInfoList) +{ + /* Create underlying ShardCopyDestReceivers */ + DestReceiver** shardCopyDestReceivers = CreateShardCopyDestReceivers( + estate, + shardIntervalToSplitCopy, + splitCopyInfoList); + + /* construct an artificial CitusTableCacheEntry for routing tuples to appropriate ShardCopyReceiver */ + ArrayType *minValuesArray = NULL; + ArrayType *maxValuesArray = NULL; + BuildMinMaxRangeArrays(splitCopyInfoList, &minValuesArray, &maxValuesArray); + char partitionMethod = PartitionMethodViaCatalog(shardIntervalToSplitCopy->relationId); + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(shardIntervalToSplitCopy->relationId); + Var* partitionColumn = cacheEntry->partitionColumn; + + CitusTableCacheEntry *shardSearchInfo = + QueryTupleShardSearchInfo(minValuesArray, maxValuesArray, + partitionMethod, partitionColumn); + + /* Construct PartitionedResultDestReceiver from cache and underlying ShardCopyDestReceivers */ + int partitionColumnIndex = partitionColumn->varattno - 1; + int partitionCount = splitCopyInfoList->length; + DestReceiver *splitCopyDestReceiver = CreatePartitionedResultDestReceiver( + partitionColumnIndex, + partitionCount, + shardSearchInfo, + shardCopyDestReceivers, + true /* lazyStartup */, + false /* allowNullPartitionColumnValues */); + + return splitCopyDestReceiver; +} diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index 791ebdbe7..63eca5ad1 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -69,12 +69,26 @@ typedef struct NodeToNodeFragmentsTransfer List *fragmentList; } NodeToNodeFragmentsTransfer; +/* Forward Declarations */ +struct CitusTableCacheEntry; /* intermediate_results.c */ extern DestReceiver * CreateRemoteFileDestReceiver(const char *resultId, EState *executorState, List *initialNodeList, bool writeLocalFile); +extern DestReceiver * CreatePartitionedResultDestReceiver(int partitionColumnIndex, + int partitionCount, + CitusTableCacheEntry * + shardSearchInfo, + DestReceiver ** + partitionedDestReceivers, + bool lazyStartup, + bool allowNullPartitionValues); +extern CitusTableCacheEntry * QueryTupleShardSearchInfo(ArrayType *minValuesArray, + ArrayType *maxValuesArray, + char partitionMethod, + Var *partitionColumn); extern void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat); extern uint64 RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver); extern void SendQueryResultViaCopy(const char *resultId); @@ -83,6 +97,9 @@ extern void RemoveIntermediateResultsDirectories(void); extern int64 IntermediateResultSize(const char *resultId); extern char * QueryResultFileName(const char *resultId); extern char * CreateIntermediateResultsDirectory(void); +extern ArrayType * CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int + datumCount, Oid typeId); + /* distributed_intermediate_results.c */ extern List ** RedistributeTaskListResults(const char *resultIdPrefix, diff --git a/src/include/distributed/worker_split_copy.h b/src/include/distributed/worker_split_copy.h deleted file mode 100644 index ac81a79ca..000000000 --- a/src/include/distributed/worker_split_copy.h +++ /dev/null @@ -1,27 +0,0 @@ -/*------------------------------------------------------------------------- - * - * worker_split_copy.h - * - * API for worker shard split copy. - * - * Copyright (c) Citus Data, Inc. - * - *------------------------------------------------------------------------- - */ - -#ifndef WORKER_SPLIT_COPY_H_ -#define WORKER_SPLIT_COPY_H_ - -typedef struct SplitCopyInfo -{ - uint64 destinationShardId; /* destination shard id */ - int32 destinationShardMinHashValue; /* min hash value of destination shard */ - int32 destinationShardMaxHashValue; /* max hash value of destination shard */ - uint32_t destinationShardNodeId; /* node where split child shard is to be placed */ -} SplitCopyInfo; - -extern DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64 - sourceShardIdToCopy, - List *splitCopyInfoList); - -#endif /* WORKER_SPLIT_COPY_H_ */