diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index d2b29d9e9..79df46344 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -262,6 +262,17 @@ typedef struct DistributedExecution /* statistics on distributed execution */ DistributedExecutionStats *executionStats; + + /* + * The following fields are used while receiving results from remote nodes. + * We store this information here to avoid re-allocating it every time. + * + * columnArray field is reset/calculated per row, so might be useless for other + * contexts. The benefit of keeping it here is to avoid allocating the array + * over and over again. + */ + AttInMetadata *attributeInputMetadata; + char **columnArray; } DistributedExecution; /* @@ -850,6 +861,19 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu execution->connectionSetChanged = false; execution->waitFlagsChanged = false; + /* allocate execution specific data once, on the ExecutorState memory context */ + if (tupleDescriptor != NULL) + { + execution->attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); + execution->columnArray = + (char **) palloc0(tupleDescriptor->natts * sizeof(char *)); + } + else + { + execution->attributeInputMetadata = NULL; + execution->columnArray = NULL; + } + if (ShouldExecuteTasksLocally(taskList)) { bool readOnlyPlan = !TaskListModifiesDatabase(modLevel, taskList); @@ -3016,23 +3040,27 @@ ReceiveResults(WorkerSession *session, bool storeRows) DistributedExecution *execution = workerPool->distributedExecution; DistributedExecutionStats *executionStats = execution->executionStats; TupleDesc tupleDescriptor = execution->tupleDescriptor; - AttInMetadata *attributeInputMetadata = NULL; + AttInMetadata *attributeInputMetadata = execution->attributeInputMetadata; uint32 expectedColumnCount = 0; - char **columnArray = NULL; + char **columnArray = execution->columnArray; Tuplestorestate *tupleStore = execution->tupleStore; + MemoryContext ioContext = NULL; - MemoryContext ioContext = AllocSetContextCreate(CurrentMemoryContext, - "ReceiveResults", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); if (tupleDescriptor != NULL) { - attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); expectedColumnCount = tupleDescriptor->natts; - columnArray = (char **) palloc0(expectedColumnCount * sizeof(char *)); } + /* + * We use this context while converting each row fetched from remote node + * into tuple. The context is reseted on every row, thus we create it at the + * start of the loop and reset on every iteration. + */ + ioContext = AllocSetContextCreate(CurrentMemoryContext, + "IoContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); while (!PQisBusy(connection->pgConn)) { @@ -3113,7 +3141,7 @@ ReceiveResults(WorkerSession *session, bool storeRows) for (rowIndex = 0; rowIndex < rowsProcessed; rowIndex++) { HeapTuple heapTuple = NULL; - MemoryContext oldContext = NULL; + MemoryContext oldContextPerRow = NULL; memset(columnArray, 0, columnCount * sizeof(char *)); for (columnIndex = 0; columnIndex < columnCount; columnIndex++) @@ -3139,11 +3167,11 @@ ReceiveResults(WorkerSession *session, bool storeRows) * protects us from any memory leaks that might be present in I/O functions * called by BuildTupleFromCStrings. */ - oldContext = MemoryContextSwitchTo(ioContext); + oldContextPerRow = MemoryContextSwitchTo(ioContext); heapTuple = BuildTupleFromCStrings(attributeInputMetadata, columnArray); - MemoryContextSwitchTo(oldContext); + MemoryContextSwitchTo(oldContextPerRow); tuplestore_puttuple(tupleStore, heapTuple); MemoryContextReset(ioContext); @@ -3159,11 +3187,7 @@ ReceiveResults(WorkerSession *session, bool storeRows) } } - if (columnArray != NULL) - { - pfree(columnArray); - } - + /* the context is local to the function, so not needed anymore */ MemoryContextDelete(ioContext); return fetchDone;