Refactor/partitioned result destreceiver (#5432)

This change creates a slightly higher abstraction of the `PartitionedResultDestReceiver` where it decouples the partitioning from writing it to a file. This allows for easier reuse for other `DestReceiver`'s that would like to route different tuples to different `DestReceiver`'s.

Originally there was a lot of state kept in `PartitionedResultDestReceiver` to be able to lazily create `FileDestReceivers` when the first tuple arrived for that target. This convoluted the implementation of the processing of tuples with where they should go.

This refactor changes that where it makes the `PartitionedResultDestReceiver` completely agnostic of what kind of Receivers it is writing to. When constructed you pass it a list of `DestReceiver` compatible pointers with the length of `partitionCount`. Internally the `PartitionedResultDestReceiver` keeps track of which `DestReceiver`'s have been started or not, and start them when they first receive a tuple.

Alternatively, if the instantiating code of the `PartitionedResultDestReceiver` wants, the startup can be turned from lazily to eagerly. When the startup is eager (not lazy) all `rStartup` functions on the list of `DestReceiver`'s are called during the startup of the `PartitionedResultDestReceiver` and marked as such.

A downside of this approach is the following. On highly partitioned destinations we now need to allocate a `FileDestReceiver` for every target, _always_. When the data passed into the `PartitionedResultDestReceiver` is highly skewed to a small set of `FileDestReceiver`'s this will waste some memory. Given the small size of a `FileDestReceiver`, and the fact that actual file handles are only created during the processing of the startup of the `FileDestReceiver` I think this memory waste is not a problem. If this would become a problem we could refactor the source list into some kind of generator object which can generate the `DestReceiver`'s on the fly.
pull/5399/head
Nils Dijk 2021-11-05 13:31:18 +01:00 committed by GitHub
parent 0e7cf9f0ca
commit 3fcb456381
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 138 additions and 115 deletions

View File

@ -44,31 +44,48 @@ typedef struct PartitionedResultDestReceiver
/* public DestReceiver interface */
DestReceiver pub;
/* partition file $i is stored at file named $resultIdPrefix_$i. */
char *resultIdPrefix;
/* on lazy startup we only startup the DestReceiver once they receive a tuple */
bool lazyStartup;
/* use binary copy or just text copy format? */
bool binaryCopy;
/*
* Stores the arguments passed to the PartidionedResultDestReceiver's rStarup
* function. These arguments are reused when lazyStartup has been set to true.
* On the processing of a first tuple for a partitionDestReceiver since rStartup it
* will pass the arguments here to the rStartup function of partitionDestReceiver to
* prepare it for receiving tuples.
*
* Even though not used without lazyStartup we just always populate these with the
* last invoked arguments for our rStartup.
*/
struct
{
/*
* operation as passed to rStartup, mostly the CmdType of the command being
* streamed into this DestReceiver
*/
int operation;
/*
* TupleDesc describing the layout of the tuples being streamed into the
* DestReceiver.
*/
TupleDesc tupleDescriptor;
} startupArguments;
/* which column of streamed tuples to use as partition column */
int partitionColumnIndex;
/* The number of partitions being partitioned into */
int partitionCount;
/* used for deciding which partition a shard belongs to. */
CitusTableCacheEntry *shardSearchInfo;
MemoryContext perTupleContext;
/* how does stream tuples look like? */
TupleDesc tupleDescriptor;
/* which column of streamed tuples to use as partition column? */
int partitionColumnIndex;
/* how many partitions do we have? */
int partitionCount;
/*
* Tuples for partition[i] are sent to partitionDestReceivers[i], which
* writes it to a result file.
*/
/* Tuples matching shardSearchInfo[i] are sent to partitionDestReceivers[i]. */
DestReceiver **partitionDestReceivers;
/* keeping track of which partitionDestReceivers have been started */
Bitmapset *startedDestReceivers;
} PartitionedResultDestReceiver;
static Portal StartPortalForQueryExecution(const char *queryString);
@ -76,25 +93,19 @@ static CitusTableCacheEntry * QueryTupleShardSearchInfo(ArrayType *minValuesArra
ArrayType *maxValuesArray,
char partitionMethod,
Var *partitionColumn);
static PartitionedResultDestReceiver * CreatePartitionedResultDestReceiver(char *resultId,
int
partitionColumnIndex,
int
partitionCount,
TupleDesc
tupleDescriptor,
bool binaryCopy,
CitusTableCacheEntry
*
shardSearchInfo,
MemoryContext
perTupleContext);
static DestReceiver * CreatePartitionedResultDestReceiver(int partitionColumnIndex,
int partitionCount,
CitusTableCacheEntry *
shardSearchInfo,
DestReceiver **
partitionedDestReceivers,
bool lazyStartup);
static void PartitionedResultDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor);
static bool PartitionedResultDestReceiverReceive(TupleTableSlot *slot,
DestReceiver *dest);
static void PartitionedResultDestReceiverShutdown(DestReceiver *destReceiver);
static void PartitionedResultDestReceiverDestroy(DestReceiver *destReceiver);
static void PartitionedResultDestReceiverShutdown(DestReceiver *dest);
static void PartitionedResultDestReceiverDestroy(DestReceiver *copyDest);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(worker_partition_query_result);
@ -202,14 +213,29 @@ worker_partition_query_result(PG_FUNCTION_ARGS)
/* prepare the output destination */
EState *estate = CreateExecutorState();
MemoryContext tupleContext = GetPerTupleMemoryContext(estate);
PartitionedResultDestReceiver *dest =
CreatePartitionedResultDestReceiver(resultIdPrefixString, partitionColumnIndex,
partitionCount, tupleDescriptor, binaryCopy,
shardSearchInfo, tupleContext);
/* create all dest receivers */
DestReceiver **dests = palloc0(partitionCount * sizeof(DestReceiver *));
for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++)
{
StringInfo resultId = makeStringInfo();
appendStringInfo(resultId, "%s_%d", resultIdPrefixString, partitionIndex);
char *filePath = QueryResultFileName(resultId->data);
DestReceiver *partitionDest = CreateFileDestReceiver(filePath, tupleContext,
binaryCopy);
dests[partitionIndex] = partitionDest;
}
const bool lazyStartup = true;
DestReceiver *dest = CreatePartitionedResultDestReceiver(
partitionColumnIndex,
partitionCount,
shardSearchInfo,
dests,
lazyStartup);
/* execute the query */
PortalRun(portal, FETCH_ALL, false, true, (DestReceiver *) dest,
(DestReceiver *) dest, NULL);
PortalRun(portal, FETCH_ALL, false, true, dest, dest, NULL);
/* construct the output result */
TupleDesc returnTupleDesc = NULL;
@ -225,11 +251,7 @@ worker_partition_query_result(PG_FUNCTION_ARGS)
Datum values[3];
bool nulls[3];
if (dest->partitionDestReceivers[partitionIndex] != NULL)
{
FileDestReceiverStats(dest->partitionDestReceivers[partitionIndex],
&recordsWritten, &bytesWritten);
}
FileDestReceiverStats(dests[partitionIndex], &recordsWritten, &bytesWritten);
memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
@ -245,6 +267,8 @@ worker_partition_query_result(PG_FUNCTION_ARGS)
PortalDrop(portal, false);
FreeExecutorState(estate);
dest->rDestroy(dest);
PG_RETURN_INT64(1);
}
@ -363,12 +387,12 @@ QueryTupleShardSearchInfo(ArrayType *minValuesArray, ArrayType *maxValuesArray,
/*
* CreatePartitionedResultDestReceiver sets up a partitioned dest receiver.
*/
static PartitionedResultDestReceiver *
CreatePartitionedResultDestReceiver(char *resultIdPrefix, int partitionColumnIndex,
int partitionCount, TupleDesc tupleDescriptor,
bool binaryCopy,
static DestReceiver *
CreatePartitionedResultDestReceiver(int partitionColumnIndex,
int partitionCount,
CitusTableCacheEntry *shardSearchInfo,
MemoryContext perTupleContext)
DestReceiver **partitionedDestReceivers,
bool lazyStartup)
{
PartitionedResultDestReceiver *resultDest =
palloc0(sizeof(PartitionedResultDestReceiver));
@ -380,18 +404,15 @@ CreatePartitionedResultDestReceiver(char *resultIdPrefix, int partitionColumnInd
resultDest->pub.rDestroy = PartitionedResultDestReceiverDestroy;
resultDest->pub.mydest = DestCopyOut;
/* set up output parameters */
resultDest->resultIdPrefix = resultIdPrefix;
resultDest->perTupleContext = perTupleContext;
/* setup routing parameters */
resultDest->partitionColumnIndex = partitionColumnIndex;
resultDest->partitionCount = partitionCount;
resultDest->shardSearchInfo = shardSearchInfo;
resultDest->tupleDescriptor = tupleDescriptor;
resultDest->binaryCopy = binaryCopy;
resultDest->partitionDestReceivers =
(DestReceiver **) palloc0(partitionCount * sizeof(DestReceiver *));
resultDest->partitionDestReceivers = partitionedDestReceivers;
resultDest->startedDestReceivers = NULL;
resultDest->lazyStartup = lazyStartup;
return resultDest;
return (DestReceiver *) resultDest;
}
@ -400,24 +421,27 @@ CreatePartitionedResultDestReceiver(char *resultIdPrefix, int partitionColumnInd
* PartitionedResultDestReceiver.
*/
static void
PartitionedResultDestReceiverStartup(DestReceiver *copyDest, int operation,
PartitionedResultDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor)
{
/*
* We don't expect this to be called multiple times, but if it happens,
* we will just overwrite previous files.
*/
PartitionedResultDestReceiver *partitionedDest =
(PartitionedResultDestReceiver *) copyDest;
int partitionCount = partitionedDest->partitionCount;
for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++)
PartitionedResultDestReceiver *self = (PartitionedResultDestReceiver *) dest;
self->startupArguments.tupleDescriptor = CreateTupleDescCopy(inputTupleDescriptor);
self->startupArguments.operation = operation;
if (self->lazyStartup)
{
DestReceiver *partitionDest =
partitionedDest->partitionDestReceivers[partitionIndex];
if (partitionDest != NULL)
{
partitionDest->rStartup(partitionDest, operation, inputTupleDescriptor);
}
/* we are initialized, rest happens when needed*/
return;
}
/* no lazy startup, lets startup our partitionedDestReceivers */
for (int partitionIndex = 0; partitionIndex < self->partitionCount; partitionIndex++)
{
DestReceiver *partitionDest = self->partitionDestReceivers[partitionIndex];
partitionDest->rStartup(partitionDest, operation, inputTupleDescriptor);
self->startedDestReceivers = bms_add_member(self->startedDestReceivers,
partitionIndex);
}
}
@ -427,25 +451,24 @@ PartitionedResultDestReceiverStartup(DestReceiver *copyDest, int operation,
* PartitionedResultDestReceiver.
*/
static bool
PartitionedResultDestReceiverReceive(TupleTableSlot *slot, DestReceiver *copyDest)
PartitionedResultDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{
PartitionedResultDestReceiver *partitionedDest =
(PartitionedResultDestReceiver *) copyDest;
PartitionedResultDestReceiver *self = (PartitionedResultDestReceiver *) dest;
slot_getallattrs(slot);
Datum *columnValues = slot->tts_values;
bool *columnNulls = slot->tts_isnull;
if (columnNulls[partitionedDest->partitionColumnIndex])
if (columnNulls[self->partitionColumnIndex])
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("the partition column value cannot be NULL")));
}
Datum partitionColumnValue = columnValues[partitionedDest->partitionColumnIndex];
Datum partitionColumnValue = columnValues[self->partitionColumnIndex];
ShardInterval *shardInterval = FindShardInterval(partitionColumnValue,
partitionedDest->shardSearchInfo);
self->shardSearchInfo);
if (shardInterval == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@ -454,20 +477,19 @@ PartitionedResultDestReceiverReceive(TupleTableSlot *slot, DestReceiver *copyDes
}
int partitionIndex = shardInterval->shardIndex;
DestReceiver *partitionDest = partitionedDest->partitionDestReceivers[partitionIndex];
if (partitionDest == NULL)
{
StringInfo resultId = makeStringInfo();
appendStringInfo(resultId, "%s_%d", partitionedDest->resultIdPrefix,
partitionIndex);
char *filePath = QueryResultFileName(resultId->data);
DestReceiver *partitionDest = self->partitionDestReceivers[partitionIndex];
partitionDest = CreateFileDestReceiver(filePath, partitionedDest->perTupleContext,
partitionedDest->binaryCopy);
partitionedDest->partitionDestReceivers[partitionIndex] = partitionDest;
partitionDest->rStartup(partitionDest, 0, partitionedDest->tupleDescriptor);
/* check if this partitionDestReceiver has been started before, start if not */
if (!bms_is_member(partitionIndex, self->startedDestReceivers))
{
partitionDest->rStartup(partitionDest,
self->startupArguments.operation,
self->startupArguments.tupleDescriptor);
self->startedDestReceivers = bms_add_member(self->startedDestReceivers,
partitionIndex);
}
/* forward the tuple to the appropriate dest receiver */
partitionDest->receiveSlot(slot, partitionDest);
return true;
@ -476,23 +498,24 @@ PartitionedResultDestReceiverReceive(TupleTableSlot *slot, DestReceiver *copyDes
/*
* PartitionedResultDestReceiverShutdown implements the rShutdown interface of
* PartitionedResultDestReceiver.
* PartitionedResultDestReceiver by calling rShutdown on all started
* partitionedDestReceivers.
*/
static void
PartitionedResultDestReceiverShutdown(DestReceiver *copyDest)
PartitionedResultDestReceiverShutdown(DestReceiver *dest)
{
PartitionedResultDestReceiver *partitionedDest =
(PartitionedResultDestReceiver *) copyDest;
int partitionCount = partitionedDest->partitionCount;
for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++)
PartitionedResultDestReceiver *self = (PartitionedResultDestReceiver *) dest;
int i = -1;
while ((i = bms_next_member(self->startedDestReceivers, i)) >= 0)
{
DestReceiver *partitionDest =
partitionedDest->partitionDestReceivers[partitionIndex];
if (partitionDest != NULL)
{
partitionDest->rShutdown(partitionDest);
}
DestReceiver *partitionDest = self->partitionDestReceivers[i];
partitionDest->rShutdown(partitionDest);
}
/* empty the set of started receivers which allows them to be restarted again */
bms_free(self->startedDestReceivers);
self->startedDestReceivers = NULL;
}
@ -501,22 +524,17 @@ PartitionedResultDestReceiverShutdown(DestReceiver *copyDest)
* PartitionedResultDestReceiver.
*/
static void
PartitionedResultDestReceiverDestroy(DestReceiver *copyDest)
PartitionedResultDestReceiverDestroy(DestReceiver *dest)
{
PartitionedResultDestReceiver *partitionedDest =
(PartitionedResultDestReceiver *) copyDest;
int partitionCount = partitionedDest->partitionCount;
for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++)
PartitionedResultDestReceiver *self = (PartitionedResultDestReceiver *) dest;
/* we destroy all dest receivers, irregardless if they have been started or not */
for (int partitionIndex = 0; partitionIndex < self->partitionCount; partitionIndex++)
{
DestReceiver *partitionDest =
partitionedDest->partitionDestReceivers[partitionIndex];
DestReceiver *partitionDest = self->partitionDestReceivers[partitionIndex];
if (partitionDest != NULL)
{
/* this call should also free partitionDest, so no need to free it after */
partitionDest->rDestroy(partitionDest);
}
}
pfree(partitionedDest->partitionDestReceivers);
pfree(partitionedDest);
}

View File

@ -262,15 +262,20 @@ TaskFileDestReceiverDestroy(DestReceiver *destReceiver)
if (taskFileDest->copyOutState)
{
pfree(taskFileDest->copyOutState);
taskFileDest->copyOutState = NULL;
}
if (taskFileDest->columnOutputFunctions)
{
pfree(taskFileDest->columnOutputFunctions);
taskFileDest->columnOutputFunctions = NULL;
}
pfree(taskFileDest->filePath);
pfree(taskFileDest);
if (taskFileDest->filePath)
{
pfree(taskFileDest->filePath);
taskFileDest->filePath = NULL;
}
}