From 3fcb456381b27a889b9824138d12eda1d1a13d50 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Fri, 5 Nov 2021 13:31:18 +0100 Subject: [PATCH] Refactor/partitioned result destreceiver (#5432) This change creates a slightly higher abstraction of the `PartitionedResultDestReceiver` where it decouples the partitioning from writing it to a file. This allows for easier reuse for other `DestReceiver`'s that would like to route different tuples to different `DestReceiver`'s. Originally there was a lot of state kept in `PartitionedResultDestReceiver` to be able to lazily create `FileDestReceivers` when the first tuple arrived for that target. This convoluted the implementation of the processing of tuples with where they should go. This refactor changes that where it makes the `PartitionedResultDestReceiver` completely agnostic of what kind of Receivers it is writing to. When constructed you pass it a list of `DestReceiver` compatible pointers with the length of `partitionCount`. Internally the `PartitionedResultDestReceiver` keeps track of which `DestReceiver`'s have been started or not, and start them when they first receive a tuple. Alternatively, if the instantiating code of the `PartitionedResultDestReceiver` wants, the startup can be turned from lazily to eagerly. When the startup is eager (not lazy) all `rStartup` functions on the list of `DestReceiver`'s are called during the startup of the `PartitionedResultDestReceiver` and marked as such. A downside of this approach is the following. On highly partitioned destinations we now need to allocate a `FileDestReceiver` for every target, _always_. When the data passed into the `PartitionedResultDestReceiver` is highly skewed to a small set of `FileDestReceiver`'s this will waste some memory. Given the small size of a `FileDestReceiver`, and the fact that actual file handles are only created during the processing of the startup of the `FileDestReceiver` I think this memory waste is not a problem. If this would become a problem we could refactor the source list into some kind of generator object which can generate the `DestReceiver`'s on the fly. --- .../partitioned_intermediate_results.c | 244 ++++++++++-------- .../worker/worker_sql_task_protocol.c | 9 +- 2 files changed, 138 insertions(+), 115 deletions(-) diff --git a/src/backend/distributed/executor/partitioned_intermediate_results.c b/src/backend/distributed/executor/partitioned_intermediate_results.c index c0f6e9d65..21949feed 100644 --- a/src/backend/distributed/executor/partitioned_intermediate_results.c +++ b/src/backend/distributed/executor/partitioned_intermediate_results.c @@ -44,31 +44,48 @@ typedef struct PartitionedResultDestReceiver /* public DestReceiver interface */ DestReceiver pub; - /* partition file $i is stored at file named $resultIdPrefix_$i. */ - char *resultIdPrefix; + /* on lazy startup we only startup the DestReceiver once they receive a tuple */ + bool lazyStartup; - /* use binary copy or just text copy format? */ - bool binaryCopy; + /* + * Stores the arguments passed to the PartidionedResultDestReceiver's rStarup + * function. These arguments are reused when lazyStartup has been set to true. + * On the processing of a first tuple for a partitionDestReceiver since rStartup it + * will pass the arguments here to the rStartup function of partitionDestReceiver to + * prepare it for receiving tuples. + * + * Even though not used without lazyStartup we just always populate these with the + * last invoked arguments for our rStartup. + */ + struct + { + /* + * operation as passed to rStartup, mostly the CmdType of the command being + * streamed into this DestReceiver + */ + int operation; + + /* + * TupleDesc describing the layout of the tuples being streamed into the + * DestReceiver. + */ + TupleDesc tupleDescriptor; + } startupArguments; + + /* which column of streamed tuples to use as partition column */ + int partitionColumnIndex; + + /* The number of partitions being partitioned into */ + int partitionCount; /* used for deciding which partition a shard belongs to. */ CitusTableCacheEntry *shardSearchInfo; - MemoryContext perTupleContext; - - /* how does stream tuples look like? */ - TupleDesc tupleDescriptor; - - /* which column of streamed tuples to use as partition column? */ - int partitionColumnIndex; - - /* how many partitions do we have? */ - int partitionCount; - - /* - * Tuples for partition[i] are sent to partitionDestReceivers[i], which - * writes it to a result file. - */ + /* Tuples matching shardSearchInfo[i] are sent to partitionDestReceivers[i]. */ DestReceiver **partitionDestReceivers; + + /* keeping track of which partitionDestReceivers have been started */ + Bitmapset *startedDestReceivers; } PartitionedResultDestReceiver; static Portal StartPortalForQueryExecution(const char *queryString); @@ -76,25 +93,19 @@ static CitusTableCacheEntry * QueryTupleShardSearchInfo(ArrayType *minValuesArra ArrayType *maxValuesArray, char partitionMethod, Var *partitionColumn); -static PartitionedResultDestReceiver * CreatePartitionedResultDestReceiver(char *resultId, - int - partitionColumnIndex, - int - partitionCount, - TupleDesc - tupleDescriptor, - bool binaryCopy, - CitusTableCacheEntry - * - shardSearchInfo, - MemoryContext - perTupleContext); +static DestReceiver * CreatePartitionedResultDestReceiver(int partitionColumnIndex, + int partitionCount, + CitusTableCacheEntry * + shardSearchInfo, + DestReceiver ** + partitionedDestReceivers, + bool lazyStartup); static void PartitionedResultDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor); static bool PartitionedResultDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); -static void PartitionedResultDestReceiverShutdown(DestReceiver *destReceiver); -static void PartitionedResultDestReceiverDestroy(DestReceiver *destReceiver); +static void PartitionedResultDestReceiverShutdown(DestReceiver *dest); +static void PartitionedResultDestReceiverDestroy(DestReceiver *copyDest); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(worker_partition_query_result); @@ -202,14 +213,29 @@ worker_partition_query_result(PG_FUNCTION_ARGS) /* prepare the output destination */ EState *estate = CreateExecutorState(); MemoryContext tupleContext = GetPerTupleMemoryContext(estate); - PartitionedResultDestReceiver *dest = - CreatePartitionedResultDestReceiver(resultIdPrefixString, partitionColumnIndex, - partitionCount, tupleDescriptor, binaryCopy, - shardSearchInfo, tupleContext); + + /* create all dest receivers */ + DestReceiver **dests = palloc0(partitionCount * sizeof(DestReceiver *)); + for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++) + { + StringInfo resultId = makeStringInfo(); + appendStringInfo(resultId, "%s_%d", resultIdPrefixString, partitionIndex); + char *filePath = QueryResultFileName(resultId->data); + DestReceiver *partitionDest = CreateFileDestReceiver(filePath, tupleContext, + binaryCopy); + dests[partitionIndex] = partitionDest; + } + + const bool lazyStartup = true; + DestReceiver *dest = CreatePartitionedResultDestReceiver( + partitionColumnIndex, + partitionCount, + shardSearchInfo, + dests, + lazyStartup); /* execute the query */ - PortalRun(portal, FETCH_ALL, false, true, (DestReceiver *) dest, - (DestReceiver *) dest, NULL); + PortalRun(portal, FETCH_ALL, false, true, dest, dest, NULL); /* construct the output result */ TupleDesc returnTupleDesc = NULL; @@ -225,11 +251,7 @@ worker_partition_query_result(PG_FUNCTION_ARGS) Datum values[3]; bool nulls[3]; - if (dest->partitionDestReceivers[partitionIndex] != NULL) - { - FileDestReceiverStats(dest->partitionDestReceivers[partitionIndex], - &recordsWritten, &bytesWritten); - } + FileDestReceiverStats(dests[partitionIndex], &recordsWritten, &bytesWritten); memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); @@ -245,6 +267,8 @@ worker_partition_query_result(PG_FUNCTION_ARGS) PortalDrop(portal, false); FreeExecutorState(estate); + dest->rDestroy(dest); + PG_RETURN_INT64(1); } @@ -363,12 +387,12 @@ QueryTupleShardSearchInfo(ArrayType *minValuesArray, ArrayType *maxValuesArray, /* * CreatePartitionedResultDestReceiver sets up a partitioned dest receiver. */ -static PartitionedResultDestReceiver * -CreatePartitionedResultDestReceiver(char *resultIdPrefix, int partitionColumnIndex, - int partitionCount, TupleDesc tupleDescriptor, - bool binaryCopy, +static DestReceiver * +CreatePartitionedResultDestReceiver(int partitionColumnIndex, + int partitionCount, CitusTableCacheEntry *shardSearchInfo, - MemoryContext perTupleContext) + DestReceiver **partitionedDestReceivers, + bool lazyStartup) { PartitionedResultDestReceiver *resultDest = palloc0(sizeof(PartitionedResultDestReceiver)); @@ -380,18 +404,15 @@ CreatePartitionedResultDestReceiver(char *resultIdPrefix, int partitionColumnInd resultDest->pub.rDestroy = PartitionedResultDestReceiverDestroy; resultDest->pub.mydest = DestCopyOut; - /* set up output parameters */ - resultDest->resultIdPrefix = resultIdPrefix; - resultDest->perTupleContext = perTupleContext; + /* setup routing parameters */ resultDest->partitionColumnIndex = partitionColumnIndex; resultDest->partitionCount = partitionCount; resultDest->shardSearchInfo = shardSearchInfo; - resultDest->tupleDescriptor = tupleDescriptor; - resultDest->binaryCopy = binaryCopy; - resultDest->partitionDestReceivers = - (DestReceiver **) palloc0(partitionCount * sizeof(DestReceiver *)); + resultDest->partitionDestReceivers = partitionedDestReceivers; + resultDest->startedDestReceivers = NULL; + resultDest->lazyStartup = lazyStartup; - return resultDest; + return (DestReceiver *) resultDest; } @@ -400,24 +421,27 @@ CreatePartitionedResultDestReceiver(char *resultIdPrefix, int partitionColumnInd * PartitionedResultDestReceiver. */ static void -PartitionedResultDestReceiverStartup(DestReceiver *copyDest, int operation, +PartitionedResultDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor) { - /* - * We don't expect this to be called multiple times, but if it happens, - * we will just overwrite previous files. - */ - PartitionedResultDestReceiver *partitionedDest = - (PartitionedResultDestReceiver *) copyDest; - int partitionCount = partitionedDest->partitionCount; - for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++) + PartitionedResultDestReceiver *self = (PartitionedResultDestReceiver *) dest; + + self->startupArguments.tupleDescriptor = CreateTupleDescCopy(inputTupleDescriptor); + self->startupArguments.operation = operation; + + if (self->lazyStartup) { - DestReceiver *partitionDest = - partitionedDest->partitionDestReceivers[partitionIndex]; - if (partitionDest != NULL) - { - partitionDest->rStartup(partitionDest, operation, inputTupleDescriptor); - } + /* we are initialized, rest happens when needed*/ + return; + } + + /* no lazy startup, lets startup our partitionedDestReceivers */ + for (int partitionIndex = 0; partitionIndex < self->partitionCount; partitionIndex++) + { + DestReceiver *partitionDest = self->partitionDestReceivers[partitionIndex]; + partitionDest->rStartup(partitionDest, operation, inputTupleDescriptor); + self->startedDestReceivers = bms_add_member(self->startedDestReceivers, + partitionIndex); } } @@ -427,25 +451,24 @@ PartitionedResultDestReceiverStartup(DestReceiver *copyDest, int operation, * PartitionedResultDestReceiver. */ static bool -PartitionedResultDestReceiverReceive(TupleTableSlot *slot, DestReceiver *copyDest) +PartitionedResultDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { - PartitionedResultDestReceiver *partitionedDest = - (PartitionedResultDestReceiver *) copyDest; + PartitionedResultDestReceiver *self = (PartitionedResultDestReceiver *) dest; slot_getallattrs(slot); Datum *columnValues = slot->tts_values; bool *columnNulls = slot->tts_isnull; - if (columnNulls[partitionedDest->partitionColumnIndex]) + if (columnNulls[self->partitionColumnIndex]) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg("the partition column value cannot be NULL"))); } - Datum partitionColumnValue = columnValues[partitionedDest->partitionColumnIndex]; + Datum partitionColumnValue = columnValues[self->partitionColumnIndex]; ShardInterval *shardInterval = FindShardInterval(partitionColumnValue, - partitionedDest->shardSearchInfo); + self->shardSearchInfo); if (shardInterval == NULL) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -454,20 +477,19 @@ PartitionedResultDestReceiverReceive(TupleTableSlot *slot, DestReceiver *copyDes } int partitionIndex = shardInterval->shardIndex; - DestReceiver *partitionDest = partitionedDest->partitionDestReceivers[partitionIndex]; - if (partitionDest == NULL) - { - StringInfo resultId = makeStringInfo(); - appendStringInfo(resultId, "%s_%d", partitionedDest->resultIdPrefix, - partitionIndex); - char *filePath = QueryResultFileName(resultId->data); + DestReceiver *partitionDest = self->partitionDestReceivers[partitionIndex]; - partitionDest = CreateFileDestReceiver(filePath, partitionedDest->perTupleContext, - partitionedDest->binaryCopy); - partitionedDest->partitionDestReceivers[partitionIndex] = partitionDest; - partitionDest->rStartup(partitionDest, 0, partitionedDest->tupleDescriptor); + /* check if this partitionDestReceiver has been started before, start if not */ + if (!bms_is_member(partitionIndex, self->startedDestReceivers)) + { + partitionDest->rStartup(partitionDest, + self->startupArguments.operation, + self->startupArguments.tupleDescriptor); + self->startedDestReceivers = bms_add_member(self->startedDestReceivers, + partitionIndex); } + /* forward the tuple to the appropriate dest receiver */ partitionDest->receiveSlot(slot, partitionDest); return true; @@ -476,23 +498,24 @@ PartitionedResultDestReceiverReceive(TupleTableSlot *slot, DestReceiver *copyDes /* * PartitionedResultDestReceiverShutdown implements the rShutdown interface of - * PartitionedResultDestReceiver. + * PartitionedResultDestReceiver by calling rShutdown on all started + * partitionedDestReceivers. */ static void -PartitionedResultDestReceiverShutdown(DestReceiver *copyDest) +PartitionedResultDestReceiverShutdown(DestReceiver *dest) { - PartitionedResultDestReceiver *partitionedDest = - (PartitionedResultDestReceiver *) copyDest; - int partitionCount = partitionedDest->partitionCount; - for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++) + PartitionedResultDestReceiver *self = (PartitionedResultDestReceiver *) dest; + + int i = -1; + while ((i = bms_next_member(self->startedDestReceivers, i)) >= 0) { - DestReceiver *partitionDest = - partitionedDest->partitionDestReceivers[partitionIndex]; - if (partitionDest != NULL) - { - partitionDest->rShutdown(partitionDest); - } + DestReceiver *partitionDest = self->partitionDestReceivers[i]; + partitionDest->rShutdown(partitionDest); } + + /* empty the set of started receivers which allows them to be restarted again */ + bms_free(self->startedDestReceivers); + self->startedDestReceivers = NULL; } @@ -501,22 +524,17 @@ PartitionedResultDestReceiverShutdown(DestReceiver *copyDest) * PartitionedResultDestReceiver. */ static void -PartitionedResultDestReceiverDestroy(DestReceiver *copyDest) +PartitionedResultDestReceiverDestroy(DestReceiver *dest) { - PartitionedResultDestReceiver *partitionedDest = - (PartitionedResultDestReceiver *) copyDest; - int partitionCount = partitionedDest->partitionCount; - for (int partitionIndex = 0; partitionIndex < partitionCount; partitionIndex++) + PartitionedResultDestReceiver *self = (PartitionedResultDestReceiver *) dest; + + /* we destroy all dest receivers, irregardless if they have been started or not */ + for (int partitionIndex = 0; partitionIndex < self->partitionCount; partitionIndex++) { - DestReceiver *partitionDest = - partitionedDest->partitionDestReceivers[partitionIndex]; + DestReceiver *partitionDest = self->partitionDestReceivers[partitionIndex]; if (partitionDest != NULL) { - /* this call should also free partitionDest, so no need to free it after */ partitionDest->rDestroy(partitionDest); } } - - pfree(partitionedDest->partitionDestReceivers); - pfree(partitionedDest); } diff --git a/src/backend/distributed/worker/worker_sql_task_protocol.c b/src/backend/distributed/worker/worker_sql_task_protocol.c index 28f8cbb26..38dba5e35 100644 --- a/src/backend/distributed/worker/worker_sql_task_protocol.c +++ b/src/backend/distributed/worker/worker_sql_task_protocol.c @@ -262,15 +262,20 @@ TaskFileDestReceiverDestroy(DestReceiver *destReceiver) if (taskFileDest->copyOutState) { pfree(taskFileDest->copyOutState); + taskFileDest->copyOutState = NULL; } if (taskFileDest->columnOutputFunctions) { pfree(taskFileDest->columnOutputFunctions); + taskFileDest->columnOutputFunctions = NULL; } - pfree(taskFileDest->filePath); - pfree(taskFileDest); + if (taskFileDest->filePath) + { + pfree(taskFileDest->filePath); + taskFileDest->filePath = NULL; + } }