Worker Split Copy DestReceiver skeleton

pull/6029/head
Nitish Upreti 2022-06-14 15:58:12 -07:00
parent f61a053f00
commit d5ab34040b
2 changed files with 181 additions and 0 deletions

View File

@ -0,0 +1,150 @@
/*-------------------------------------------------------------------------
*
* worker_split_copy.c
* API implementation for worker shard split copy.
*
* Copyright (c) Citus Data, Inc.
*
*
*-------------------------------------------------------------------------
*/
#include <unistd.h>
#include "c.h"
#include "postgres.h"
#include "catalog/namespace.h"
#include "utils/lsyscache.h"
#include "distributed/metadata_cache.h"
#include "distributed/worker_split_copy.h"
typedef struct SplitCopyDestReceiver
{
/* public DestReceiver interface */
DestReceiver pub;
/* Underlying shard copy dest receivers */
DestReceiver **shardCopyDestReceiverArray;
/* Split Copy Info */
SplitCopyInfo **splitCopyInfoArray;
/* Split factor */
uint splitFactor;
/* Source shard name */
FullRelationName *sourceShardName;
/* Source shard Oid */
Oid sourceShardOid;
} SplitCopyDestReceiver;
static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor);
static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot,
DestReceiver *dest);
static void SplitCopyDestReceiverShutdown(DestReceiver *dest);
static void SplitCopyDestReceiverDestroy(DestReceiver *copyDest);
DestReceiver * CreateSplitCopyDestReceiver(FullRelationName *sourceShard, List* splitCopyInfoList)
{
SplitCopyDestReceiver *resultDest =
palloc0(sizeof(SplitCopyDestReceiver));
/* set up the DestReceiver function pointers */
resultDest->pub.receiveSlot = SplitCopyDestReceiverReceive;
resultDest->pub.rStartup = SplitCopyDestReceiverStartup;
resultDest->pub.rShutdown = SplitCopyDestReceiverShutdown;
resultDest->pub.rDestroy = SplitCopyDestReceiverDestroy;
Oid sourceSchemaOid = get_namespace_oid(sourceShard->schemaName, false /* missing_ok */);
Oid sourceShardOid = get_relname_relid(sourceShard->relationName, sourceSchemaOid);
resultDest->sourceShardOid = sourceShardOid;
// TODO(niupre): Create internal destination receivers for each shard.
for (int index = 0; index < splitCopyInfoList->length; index++)
{
}
resultDest->splitFactor = splitCopyInfoList->length;
return (DestReceiver *) resultDest;
}
static void SplitCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor)
{
SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest;
for (int index = 0; index < self->splitFactor; index++)
{
DestReceiver *shardCopyDest = self->shardCopyDestReceiverArray[index];
shardCopyDest->rStartup(shardCopyDest, operation, inputTupleDescriptor);
}
}
static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{
SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest;
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(self->sourceShardOid);
if (cacheEntry == NULL)
{
ereport(ERROR, errmsg("Could not find shard %s for split copy.",
self->sourceShardName->relationName));
}
/* Partition Column Metadata on source shard */
int partitionColumnIndex = cacheEntry->partitionColumn->varattno - 1;
FmgrInfo *hashFunction = cacheEntry->hashFunction;
slot_getallattrs(slot);
Datum *columnValues = slot->tts_values;
bool *columnNulls = slot->tts_isnull;
/* Partition Column Value cannot be null */
if (columnNulls[partitionColumnIndex])
{
ereport(ERROR, errmsg("Found null partition value for shard %s during split copy.",
self->sourceShardName->relationName));
}
Datum hashedValueDatum = FunctionCall1(hashFunction, columnValues[partitionColumnIndex]);
int32_t hashedValue = DatumGetInt32(hashedValueDatum);
for(int index = 0 ; index < self->splitFactor; index++)
{
SplitCopyInfo *splitCopyInfo = self->splitCopyInfoArray[index];
if (splitCopyInfo->shardMinValue <= hashedValue &&
splitCopyInfo->shardMaxValue >= hashedValue)
{
DestReceiver *shardCopyDestReceiver = self->shardCopyDestReceiverArray[index];
shardCopyDestReceiver->receiveSlot(slot, shardCopyDestReceiver);
}
}
return true;
}
static void SplitCopyDestReceiverShutdown(DestReceiver *dest)
{
SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest;
for (int index = 0; index < self->splitFactor; index++)
{
DestReceiver *shardCopyDest = self->shardCopyDestReceiverArray[index];
shardCopyDest->rShutdown(shardCopyDest);
}
}
static void SplitCopyDestReceiverDestroy(DestReceiver *dest)
{
SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest;
for (int index = 0; index < self->splitFactor; index++)
{
DestReceiver *shardCopyDest = self->shardCopyDestReceiverArray[index];
shardCopyDest->rDestroy(shardCopyDest);
}
}

View File

@ -0,0 +1,31 @@
/*-------------------------------------------------------------------------
*
* worker_split_copy.h
*
* API for worker shard split copy.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef WORKER_SPLIT_COPY_H_
#define WORKER_SPLIT_COPY_H_
typedef struct FullRelationName
{
char *schemaName;
char *relationName;
} FullRelationName;
typedef struct SplitCopyInfo
{
FullRelationName *destinationShard; /* destination shard name */
int32 shardMinValue; /* min hash value of destination shard */
int32 shardMaxValue; /* max hash value of destination shard */
uint32_t nodeId; /* node where split child shard is to be placed */
} SplitCopyInfo;
extern DestReceiver* CreateSplitCopyDestReceiver(FullRelationName *sourceShard, List* splitCopyInfoList);
#endif /* WORKER_SPLIT_COPY_H_ */