From d5ab34040b817bf64ed37ddef997b8b88cff5716 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Tue, 14 Jun 2022 15:58:12 -0700 Subject: [PATCH] Worker Split Copy DestReceiver skeleton --- .../operations/worker_split_copy.c | 150 ++++++++++++++++++ src/include/distributed/worker_split_copy.h | 31 ++++ 2 files changed, 181 insertions(+) create mode 100644 src/backend/distributed/operations/worker_split_copy.c create mode 100644 src/include/distributed/worker_split_copy.h diff --git a/src/backend/distributed/operations/worker_split_copy.c b/src/backend/distributed/operations/worker_split_copy.c new file mode 100644 index 000000000..f015eaf61 --- /dev/null +++ b/src/backend/distributed/operations/worker_split_copy.c @@ -0,0 +1,150 @@ +/*------------------------------------------------------------------------- + * + * worker_split_copy.c + * API implementation for worker shard split copy. + * + * Copyright (c) Citus Data, Inc. + * + * + *------------------------------------------------------------------------- + */ + +#include +#include "c.h" +#include "postgres.h" +#include "catalog/namespace.h" +#include "utils/lsyscache.h" +#include "distributed/metadata_cache.h" +#include "distributed/worker_split_copy.h" + +typedef struct SplitCopyDestReceiver +{ + /* public DestReceiver interface */ + DestReceiver pub; + + /* Underlying shard copy dest receivers */ + DestReceiver **shardCopyDestReceiverArray; + + /* Split Copy Info */ + SplitCopyInfo **splitCopyInfoArray; + + /* Split factor */ + uint splitFactor; + + /* Source shard name */ + FullRelationName *sourceShardName; + + /* Source shard Oid */ + Oid sourceShardOid; +} SplitCopyDestReceiver; + + +static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, + TupleDesc inputTupleDescriptor); +static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, + DestReceiver *dest); +static void SplitCopyDestReceiverShutdown(DestReceiver *dest); +static void SplitCopyDestReceiverDestroy(DestReceiver *copyDest); + +DestReceiver * CreateSplitCopyDestReceiver(FullRelationName *sourceShard, List* splitCopyInfoList) +{ + SplitCopyDestReceiver *resultDest = + palloc0(sizeof(SplitCopyDestReceiver)); + + /* set up the DestReceiver function pointers */ + resultDest->pub.receiveSlot = SplitCopyDestReceiverReceive; + resultDest->pub.rStartup = SplitCopyDestReceiverStartup; + resultDest->pub.rShutdown = SplitCopyDestReceiverShutdown; + resultDest->pub.rDestroy = SplitCopyDestReceiverDestroy; + + Oid sourceSchemaOid = get_namespace_oid(sourceShard->schemaName, false /* missing_ok */); + Oid sourceShardOid = get_relname_relid(sourceShard->relationName, sourceSchemaOid); + resultDest->sourceShardOid = sourceShardOid; + + // TODO(niupre): Create internal destination receivers for each shard. + for (int index = 0; index < splitCopyInfoList->length; index++) + { + + } + + resultDest->splitFactor = splitCopyInfoList->length; + + return (DestReceiver *) resultDest; +} + +static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor) +{ + SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest; + + for (int index = 0; index < self->splitFactor; index++) + { + DestReceiver *shardCopyDest = self->shardCopyDestReceiverArray[index]; + shardCopyDest->rStartup(shardCopyDest, operation, inputTupleDescriptor); + } +} + +static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) +{ + SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest; + + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(self->sourceShardOid); + if (cacheEntry == NULL) + { + ereport(ERROR, errmsg("Could not find shard %s for split copy.", + self->sourceShardName->relationName)); + } + + /* Partition Column Metadata on source shard */ + int partitionColumnIndex = cacheEntry->partitionColumn->varattno - 1; + FmgrInfo *hashFunction = cacheEntry->hashFunction; + + slot_getallattrs(slot); + Datum *columnValues = slot->tts_values; + bool *columnNulls = slot->tts_isnull; + + /* Partition Column Value cannot be null */ + if (columnNulls[partitionColumnIndex]) + { + ereport(ERROR, errmsg("Found null partition value for shard %s during split copy.", + self->sourceShardName->relationName)); + } + + Datum hashedValueDatum = FunctionCall1(hashFunction, columnValues[partitionColumnIndex]); + int32_t hashedValue = DatumGetInt32(hashedValueDatum); + + for(int index = 0 ; index < self->splitFactor; index++) + { + SplitCopyInfo *splitCopyInfo = self->splitCopyInfoArray[index]; + + if (splitCopyInfo->shardMinValue <= hashedValue && + splitCopyInfo->shardMaxValue >= hashedValue) + { + DestReceiver *shardCopyDestReceiver = self->shardCopyDestReceiverArray[index]; + shardCopyDestReceiver->receiveSlot(slot, shardCopyDestReceiver); + } + } + + return true; +} + +static void SplitCopyDestReceiverShutdown(DestReceiver *dest) +{ + SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest; + + for (int index = 0; index < self->splitFactor; index++) + { + DestReceiver *shardCopyDest = self->shardCopyDestReceiverArray[index]; + shardCopyDest->rShutdown(shardCopyDest); + } +} + +static void SplitCopyDestReceiverDestroy(DestReceiver *dest) +{ + SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest; + + for (int index = 0; index < self->splitFactor; index++) + { + DestReceiver *shardCopyDest = self->shardCopyDestReceiverArray[index]; + shardCopyDest->rDestroy(shardCopyDest); + } +} diff --git a/src/include/distributed/worker_split_copy.h b/src/include/distributed/worker_split_copy.h new file mode 100644 index 000000000..4af30229b --- /dev/null +++ b/src/include/distributed/worker_split_copy.h @@ -0,0 +1,31 @@ +/*------------------------------------------------------------------------- + * + * worker_split_copy.h + * + * API for worker shard split copy. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef WORKER_SPLIT_COPY_H_ +#define WORKER_SPLIT_COPY_H_ + +typedef struct FullRelationName +{ + char *schemaName; + char *relationName; +} FullRelationName; + +typedef struct SplitCopyInfo +{ + FullRelationName *destinationShard; /* destination shard name */ + int32 shardMinValue; /* min hash value of destination shard */ + int32 shardMaxValue; /* max hash value of destination shard */ + uint32_t nodeId; /* node where split child shard is to be placed */ +} SplitCopyInfo; + +extern DestReceiver* CreateSplitCopyDestReceiver(FullRelationName *sourceShard, List* splitCopyInfoList); + +#endif /* WORKER_SPLIT_COPY_H_ */