Merge pull request #3114 from citusdata/fix_leak_more_generic

Fix memory leak on ReceiveResults
pull/3121/head
Önder Kalacı 2019-10-22 17:29:52 +02:00 committed by GitHub
commit f0f93d9c45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 41 additions and 17 deletions

View File

@ -262,6 +262,17 @@ typedef struct DistributedExecution
/* statistics on distributed execution */
DistributedExecutionStats *executionStats;
/*
* The following fields are used while receiving results from remote nodes.
* We store this information here to avoid re-allocating it every time.
*
* columnArray field is reset/calculated per row, so might be useless for other
* contexts. The benefit of keeping it here is to avoid allocating the array
* over and over again.
*/
AttInMetadata *attributeInputMetadata;
char **columnArray;
} DistributedExecution;
/*
@ -850,6 +861,19 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu
execution->connectionSetChanged = false;
execution->waitFlagsChanged = false;
/* allocate execution specific data once, on the ExecutorState memory context */
if (tupleDescriptor != NULL)
{
execution->attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
execution->columnArray =
(char **) palloc0(tupleDescriptor->natts * sizeof(char *));
}
else
{
execution->attributeInputMetadata = NULL;
execution->columnArray = NULL;
}
if (ShouldExecuteTasksLocally(taskList))
{
bool readOnlyPlan = !TaskListModifiesDatabase(modLevel, taskList);
@ -3016,23 +3040,27 @@ ReceiveResults(WorkerSession *session, bool storeRows)
DistributedExecution *execution = workerPool->distributedExecution;
DistributedExecutionStats *executionStats = execution->executionStats;
TupleDesc tupleDescriptor = execution->tupleDescriptor;
AttInMetadata *attributeInputMetadata = NULL;
AttInMetadata *attributeInputMetadata = execution->attributeInputMetadata;
uint32 expectedColumnCount = 0;
char **columnArray = NULL;
char **columnArray = execution->columnArray;
Tuplestorestate *tupleStore = execution->tupleStore;
MemoryContext ioContext = NULL;
MemoryContext ioContext = AllocSetContextCreate(CurrentMemoryContext,
"ReceiveResults",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
if (tupleDescriptor != NULL)
{
attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
expectedColumnCount = tupleDescriptor->natts;
columnArray = (char **) palloc0(expectedColumnCount * sizeof(char *));
}
/*
* We use this context while converting each row fetched from remote node
* into tuple. The context is reseted on every row, thus we create it at the
* start of the loop and reset on every iteration.
*/
ioContext = AllocSetContextCreate(CurrentMemoryContext,
"IoContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
while (!PQisBusy(connection->pgConn))
{
@ -3113,7 +3141,7 @@ ReceiveResults(WorkerSession *session, bool storeRows)
for (rowIndex = 0; rowIndex < rowsProcessed; rowIndex++)
{
HeapTuple heapTuple = NULL;
MemoryContext oldContext = NULL;
MemoryContext oldContextPerRow = NULL;
memset(columnArray, 0, columnCount * sizeof(char *));
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
@ -3139,11 +3167,11 @@ ReceiveResults(WorkerSession *session, bool storeRows)
* protects us from any memory leaks that might be present in I/O functions
* called by BuildTupleFromCStrings.
*/
oldContext = MemoryContextSwitchTo(ioContext);
oldContextPerRow = MemoryContextSwitchTo(ioContext);
heapTuple = BuildTupleFromCStrings(attributeInputMetadata, columnArray);
MemoryContextSwitchTo(oldContext);
MemoryContextSwitchTo(oldContextPerRow);
tuplestore_puttuple(tupleStore, heapTuple);
MemoryContextReset(ioContext);
@ -3159,11 +3187,7 @@ ReceiveResults(WorkerSession *session, bool storeRows)
}
}
if (columnArray != NULL)
{
pfree(columnArray);
}
/* the context is local to the function, so not needed anymore */
MemoryContextDelete(ioContext);
return fetchDone;