Remove SplitCopyDestReceiver and use PartitionedResultDestReceiver

pull/6029/head
Nitish Upreti 2022-06-27 13:45:58 -07:00
parent 0fde80c133
commit 75ae1d0265
6 changed files with 142 additions and 274 deletions

View File

@ -74,8 +74,6 @@ static void PartitioningTupleDestPutTuple(TupleDestination *self, Task *task,
HeapTuple heapTuple, uint64 tupleLibpqSize);
static TupleDesc PartitioningTupleDestTupleDescForQuery(TupleDestination *self, int
queryNumber);
static ArrayType * CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int
datumCount, Oid typeId);
static char * SourceShardPrefix(const char *resultPrefix, uint64 shardId);
static DistributedResultFragment * TupleToDistributedResultFragment(HeapTuple heapTuple,
TupleDesc tupleDesc,
@ -372,7 +370,7 @@ ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount,
/*
* CreateArrayFromDatums creates an array consisting of given values and nulls.
*/
static ArrayType *
ArrayType *
CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int datumCount, Oid typeId)
{
bool typeByValue = false;

View File

@ -95,18 +95,6 @@ typedef struct PartitionedResultDestReceiver
} PartitionedResultDestReceiver;
static Portal StartPortalForQueryExecution(const char *queryString);
static CitusTableCacheEntry * QueryTupleShardSearchInfo(ArrayType *minValuesArray,
ArrayType *maxValuesArray,
char partitionMethod,
Var *partitionColumn);
static DestReceiver * CreatePartitionedResultDestReceiver(int partitionColumnIndex,
int partitionCount,
CitusTableCacheEntry *
shardSearchInfo,
DestReceiver **
partitionedDestReceivers,
bool lazyStartup,
bool allowNullPartitionValues);
static void PartitionedResultDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor);
static bool PartitionedResultDestReceiverReceive(TupleTableSlot *slot,
@ -319,7 +307,7 @@ StartPortalForQueryExecution(const char *queryString)
* information so that FindShardInterval() can find the shard corresponding
* to a tuple.
*/
static CitusTableCacheEntry *
CitusTableCacheEntry *
QueryTupleShardSearchInfo(ArrayType *minValuesArray, ArrayType *maxValuesArray,
char partitionMethod, Var *partitionColumn)
{
@ -408,7 +396,7 @@ QueryTupleShardSearchInfo(ArrayType *minValuesArray, ArrayType *maxValuesArray,
/*
* CreatePartitionedResultDestReceiver sets up a partitioned dest receiver.
*/
static DestReceiver *
DestReceiver *
CreatePartitionedResultDestReceiver(int partitionColumnIndex,
int partitionCount,
CitusTableCacheEntry *shardSearchInfo,

View File

@ -1,223 +0,0 @@
/*-------------------------------------------------------------------------
*
* worker_split_copy.c
* API implementation for worker shard split copy.
*
* Copyright (c) Citus Data, Inc.
*
*
*-------------------------------------------------------------------------
*/
#include <unistd.h>
#include "c.h"
#include "postgres.h"
#include "catalog/namespace.h"
#include "utils/lsyscache.h"
#include "utils/builtins.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/relation_utils.h"
#include "distributed/worker_split_copy.h"
#include "distributed/worker_shard_copy.h"
#include "distributed/relay_utility.h"
typedef struct SplitCopyDestReceiver
{
/* public DestReceiver interface */
DestReceiver pub;
/* Underlying shard copy dest receivers */
DestReceiver **shardCopyDestReceiverArray;
/* Split Copy Info */
SplitCopyInfo **splitCopyInfoArray;
/* Split factor */
uint splitFactor;
/* EState for per-tuple memory allocation */
EState *executorState;
/* Source shard Oid */
Oid sourceShardRelationOid;
} SplitCopyDestReceiver;
static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor);
static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot,
DestReceiver *dest);
static void SplitCopyDestReceiverShutdown(DestReceiver *dest);
static void SplitCopyDestReceiverDestroy(DestReceiver *copyDest);
/*
* CreateSplitCopyDestReceiver creates a DestReceiver that performs
* split copy for sourceShardIdToCopy to destination split children
* based on splitCopyInfoList.
*/
DestReceiver *
CreateSplitCopyDestReceiver(EState *executorState, uint64 sourceShardIdToCopy,
List *splitCopyInfoList)
{
SplitCopyDestReceiver *splitCopyDest =
palloc0(sizeof(SplitCopyDestReceiver));
/* set up the DestReceiver function pointers */
splitCopyDest->pub.receiveSlot = SplitCopyDestReceiverReceive;
splitCopyDest->pub.rStartup = SplitCopyDestReceiverStartup;
splitCopyDest->pub.rShutdown = SplitCopyDestReceiverShutdown;
splitCopyDest->pub.rDestroy = SplitCopyDestReceiverDestroy;
splitCopyDest->executorState = executorState;
splitCopyDest->splitFactor = splitCopyInfoList->length;
ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(sourceShardIdToCopy);
splitCopyDest->sourceShardRelationOid = shardIntervalToSplitCopy->relationId;
DestReceiver **shardCopyDests = palloc0(splitCopyDest->splitFactor *
sizeof(DestReceiver *));
SplitCopyInfo **splitCopyInfos = palloc0(splitCopyDest->splitFactor *
sizeof(SplitCopyInfo *));
SplitCopyInfo *splitCopyInfo = NULL;
int index = 0;
char *sourceShardNamePrefix = get_rel_name(shardIntervalToSplitCopy->relationId);
foreach_ptr(splitCopyInfo, splitCopyInfoList)
{
char *destinationShardSchemaName = get_namespace_name(get_rel_namespace(
splitCopyDest->
sourceShardRelationOid));
char *destinationShardNameCopy = pstrdup(sourceShardNamePrefix);
AppendShardIdToName(&destinationShardNameCopy, splitCopyInfo->destinationShardId);
DestReceiver *shardCopyDest = CreateShardCopyDestReceiver(
executorState,
list_make2(destinationShardSchemaName, destinationShardNameCopy),
splitCopyInfo->destinationShardNodeId);
shardCopyDests[index] = shardCopyDest;
splitCopyInfos[index] = splitCopyInfo;
index++;
}
splitCopyDest->shardCopyDestReceiverArray = shardCopyDests;
splitCopyDest->splitCopyInfoArray = splitCopyInfos;
return (DestReceiver *) splitCopyDest;
}
/*
* SplitCopyDestReceiverStartup implements the rStartup interface of SplitCopyDestReceiver.
*/
static void
SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc
inputTupleDescriptor)
{
SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest;
for (int index = 0; index < copyDest->splitFactor; index++)
{
DestReceiver *shardCopyDest = copyDest->shardCopyDestReceiverArray[index];
shardCopyDest->rStartup(shardCopyDest, operation, inputTupleDescriptor);
}
}
/*
* SplitCopyDestReceiverReceive implements the receiveSlot function of
* SplitCopyDestReceiver. It takes a TupleTableSlot and sends the contents to
* the appropriate destination shard.
*/
static bool
SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{
SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest;
/* Switch to a per-tuple memory memory context */
EState *executorState = copyDest->executorState;
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(
copyDest->sourceShardRelationOid);
if (cacheEntry == NULL)
{
ereport(ERROR, errmsg("Could not find shard %s for split copy.",
get_rel_name(copyDest->sourceShardRelationOid)));
}
/* Partition Column Metadata on source shard */
int partitionColumnIndex = cacheEntry->partitionColumn->varattno - 1;
FmgrInfo *hashFunction = cacheEntry->hashFunction;
slot_getallattrs(slot);
Datum *columnValues = slot->tts_values;
bool *columnNulls = slot->tts_isnull;
/* Partition Column Value cannot be null */
if (columnNulls[partitionColumnIndex])
{
ereport(ERROR, errmsg(
"Found null partition value for shard %s during split copy.",
get_rel_name(copyDest->sourceShardRelationOid)));
}
Datum hashedValueDatum = FunctionCall1(hashFunction,
columnValues[partitionColumnIndex]);
int32_t hashedValue = DatumGetInt32(hashedValueDatum);
for (int index = 0; index < copyDest->splitFactor; index++)
{
SplitCopyInfo *splitCopyInfo = copyDest->splitCopyInfoArray[index];
if (splitCopyInfo->destinationShardMinHashValue <= hashedValue &&
splitCopyInfo->destinationShardMaxHashValue >= hashedValue)
{
DestReceiver *shardCopyDestReceiver =
copyDest->shardCopyDestReceiverArray[index];
shardCopyDestReceiver->receiveSlot(slot, shardCopyDestReceiver);
}
}
MemoryContextSwitchTo(oldContext);
ResetPerTupleExprContext(executorState);
return true;
}
/*
* SplitCopyDestReceiverShutdown implements the rShutdown interface of
* SplitCopyDestReceiver. It ends all open COPY operations.
*/
static void
SplitCopyDestReceiverShutdown(DestReceiver *dest)
{
SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest;
for (int index = 0; index < copyDest->splitFactor; index++)
{
DestReceiver *shardCopyDest = copyDest->shardCopyDestReceiverArray[index];
shardCopyDest->rShutdown(shardCopyDest);
}
}
/*
* SplitCopyDestReceiverDestroy frees the DestReceiver.
*/
static void
SplitCopyDestReceiverDestroy(DestReceiver *dest)
{
SplitCopyDestReceiver *copyDest = (SplitCopyDestReceiver *) dest;
for (int index = 0; index < copyDest->splitFactor; index++)
{
DestReceiver *shardCopyDest = copyDest->shardCopyDestReceiverArray[index];
shardCopyDest->rDestroy(shardCopyDest);
pfree(shardCopyDest);
pfree(copyDest->splitCopyInfoArray[index]);
}
}

View File

@ -9,16 +9,34 @@
#include "postgres.h"
#include "pg_version_compat.h"
#include "utils/lsyscache.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "distributed/listutils.h"
#include "distributed/multi_executor.h"
#include "distributed/worker_shard_copy.h"
#include "distributed/worker_split_copy.h"
#include "distributed/intermediate_results.h"
#include "distributed/citus_ruleutils.h"
PG_FUNCTION_INFO_V1(worker_split_copy);
typedef struct SplitCopyInfo
{
uint64 destinationShardId; /* destination shard id */
Datum destinationShardMinHashValue; /* min hash value of destination shard */
Datum destinationShardMaxHashValue; /* max hash value of destination shard */
uint32_t destinationShardNodeId; /* node where split child shard is to be placed */
} SplitCopyInfo;
static void ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum,
SplitCopyInfo **splitCopyInfo);
static DestReceiver** CreateShardCopyDestReceivers(EState *estate, ShardInterval *shardIntervalToSplitCopy, List *splitCopyInfoList);
static DestReceiver* CreatePartitionedSplitCopyDestReceiver(
EState *executor,
ShardInterval *shardIntervalToSplitCopy,
List *splitCopyInfoList);
static void BuildMinMaxRangeArrays(List *splitCopyInfoList, ArrayType **minValueArray, ArrayType **maxValueArray);
/*
* worker_split_copy(source_shard_id bigint, splitCopyInfo citus.split_copy_info[])
@ -54,9 +72,9 @@ worker_split_copy(PG_FUNCTION_ARGS)
}
EState *executor = CreateExecutorState();
DestReceiver *splitCopyDestReceiver = CreateSplitCopyDestReceiver(executor,
shardIdToSplitCopy,
splitCopyInfoList);
DestReceiver *splitCopyDestReceiver = CreatePartitionedSplitCopyDestReceiver(executor,
shardIntervalToSplitCopy,
splitCopyInfoList);
char *sourceShardToCopyName = generate_qualified_relation_name(
shardIntervalToSplitCopy->relationId);
@ -101,8 +119,7 @@ ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo)
ereport(ERROR, (errmsg(
"destination_shard_min_value for split_copy_info cannot be null.")));
}
char *destinationMinHash = text_to_cstring(DatumGetTextP(minValueDatum));
copyInfo->destinationShardMinHashValue = pg_strtoint32(destinationMinHash);
copyInfo->destinationShardMinHashValue = minValueDatum;
Datum maxValueDatum = GetAttributeByName(dataTuple, "destination_shard_max_value",
&isnull);
@ -111,8 +128,7 @@ ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo)
ereport(ERROR, (errmsg(
"destination_shard_max_value for split_copy_info cannot be null.")));
}
char *destinationMaxHash = text_to_cstring(DatumGetTextP(maxValueDatum));
copyInfo->destinationShardMaxHashValue = pg_strtoint32(destinationMaxHash);
copyInfo->destinationShardMaxHashValue = maxValueDatum;
Datum nodeIdDatum = GetAttributeByName(dataTuple, "destination_shard_node_id",
&isnull);
@ -125,3 +141,102 @@ ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo)
*splitCopyInfo = copyInfo;
}
/* Build 'min/max' hash range arrays for PartitionedResultDestReceiver */
static void
BuildMinMaxRangeArrays(List *splitCopyInfoList, ArrayType **minValueArray, ArrayType **maxValueArray)
{
int partitionCount = list_length(splitCopyInfoList);
Datum *minValues = palloc0(partitionCount * sizeof(Datum));
bool *minValueNulls = palloc0(partitionCount * sizeof(bool));
Datum *maxValues = palloc0(partitionCount * sizeof(Datum));
bool *maxValueNulls = palloc0(partitionCount * sizeof(bool));
SplitCopyInfo *splitCopyInfo = NULL;
int index = 0;
foreach_ptr(splitCopyInfo, splitCopyInfoList)
{
minValues[index] = splitCopyInfo->destinationShardMinHashValue;
maxValues[index] = splitCopyInfo->destinationShardMaxHashValue;
/* Caller enforces that min/max values will be not-null */
minValueNulls[index] = false;
maxValueNulls[index] = false;
index++;
}
*minValueArray = CreateArrayFromDatums(minValues, minValueNulls, partitionCount, TEXTOID);
*maxValueArray = CreateArrayFromDatums(maxValues, maxValueNulls, partitionCount, TEXTOID);
}
/*
* Create underlying ShardCopyDestReceivers for PartitionedResultDestReceiver
* Each ShardCopyDestReceivers will be responsible for copying tuples from source shard,
* that fall under its min/max range, to specified destination shard.
*/
static DestReceiver**
CreateShardCopyDestReceivers(EState *estate, ShardInterval *shardIntervalToSplitCopy, List *splitCopyInfoList)
{
DestReceiver **shardCopyDests = palloc0(splitCopyInfoList->length * sizeof(DestReceiver *));
SplitCopyInfo *splitCopyInfo = NULL;
int index = 0;
char *sourceShardNamePrefix = get_rel_name(shardIntervalToSplitCopy->relationId);
foreach_ptr(splitCopyInfo, splitCopyInfoList)
{
char *destinationShardSchemaName = get_namespace_name(get_rel_namespace(
shardIntervalToSplitCopy->relationId));
char *destinationShardNameCopy = pstrdup(sourceShardNamePrefix);
AppendShardIdToName(&destinationShardNameCopy, splitCopyInfo->destinationShardId);
DestReceiver *shardCopyDest = CreateShardCopyDestReceiver(
estate,
list_make2(destinationShardSchemaName, destinationShardNameCopy),
splitCopyInfo->destinationShardNodeId);
shardCopyDests[index] = shardCopyDest;
index++;
}
return shardCopyDests;
}
/* Create PartitionedSplitCopyDestReceiver along with underlying ShardCopyDestReceivers */
static DestReceiver*
CreatePartitionedSplitCopyDestReceiver(EState *estate, ShardInterval *shardIntervalToSplitCopy, List *splitCopyInfoList)
{
/* Create underlying ShardCopyDestReceivers */
DestReceiver** shardCopyDestReceivers = CreateShardCopyDestReceivers(
estate,
shardIntervalToSplitCopy,
splitCopyInfoList);
/* construct an artificial CitusTableCacheEntry for routing tuples to appropriate ShardCopyReceiver */
ArrayType *minValuesArray = NULL;
ArrayType *maxValuesArray = NULL;
BuildMinMaxRangeArrays(splitCopyInfoList, &minValuesArray, &maxValuesArray);
char partitionMethod = PartitionMethodViaCatalog(shardIntervalToSplitCopy->relationId);
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(shardIntervalToSplitCopy->relationId);
Var* partitionColumn = cacheEntry->partitionColumn;
CitusTableCacheEntry *shardSearchInfo =
QueryTupleShardSearchInfo(minValuesArray, maxValuesArray,
partitionMethod, partitionColumn);
/* Construct PartitionedResultDestReceiver from cache and underlying ShardCopyDestReceivers */
int partitionColumnIndex = partitionColumn->varattno - 1;
int partitionCount = splitCopyInfoList->length;
DestReceiver *splitCopyDestReceiver = CreatePartitionedResultDestReceiver(
partitionColumnIndex,
partitionCount,
shardSearchInfo,
shardCopyDestReceivers,
true /* lazyStartup */,
false /* allowNullPartitionColumnValues */);
return splitCopyDestReceiver;
}

View File

@ -69,12 +69,26 @@ typedef struct NodeToNodeFragmentsTransfer
List *fragmentList;
} NodeToNodeFragmentsTransfer;
/* Forward Declarations */
struct CitusTableCacheEntry;
/* intermediate_results.c */
extern DestReceiver * CreateRemoteFileDestReceiver(const char *resultId,
EState *executorState,
List *initialNodeList, bool
writeLocalFile);
extern DestReceiver * CreatePartitionedResultDestReceiver(int partitionColumnIndex,
int partitionCount,
CitusTableCacheEntry *
shardSearchInfo,
DestReceiver **
partitionedDestReceivers,
bool lazyStartup,
bool allowNullPartitionValues);
extern CitusTableCacheEntry * QueryTupleShardSearchInfo(ArrayType *minValuesArray,
ArrayType *maxValuesArray,
char partitionMethod,
Var *partitionColumn);
extern void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat);
extern uint64 RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver);
extern void SendQueryResultViaCopy(const char *resultId);
@ -83,6 +97,9 @@ extern void RemoveIntermediateResultsDirectories(void);
extern int64 IntermediateResultSize(const char *resultId);
extern char * QueryResultFileName(const char *resultId);
extern char * CreateIntermediateResultsDirectory(void);
extern ArrayType * CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int
datumCount, Oid typeId);
/* distributed_intermediate_results.c */
extern List ** RedistributeTaskListResults(const char *resultIdPrefix,

View File

@ -1,27 +0,0 @@
/*-------------------------------------------------------------------------
*
* worker_split_copy.h
*
* API for worker shard split copy.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef WORKER_SPLIT_COPY_H_
#define WORKER_SPLIT_COPY_H_
typedef struct SplitCopyInfo
{
uint64 destinationShardId; /* destination shard id */
int32 destinationShardMinHashValue; /* min hash value of destination shard */
int32 destinationShardMaxHashValue; /* max hash value of destination shard */
uint32_t destinationShardNodeId; /* node where split child shard is to be placed */
} SplitCopyInfo;
extern DestReceiver * CreateSplitCopyDestReceiver(EState *executorState, uint64
sourceShardIdToCopy,
List *splitCopyInfoList);
#endif /* WORKER_SPLIT_COPY_H_ */