mirror of https://github.com/citusdata/citus.git
Fix memory leak on ReceiveResults
It turns out that TupleDescGetAttInMetadata() allocates quite a lot of memory. And, if the target list is long and there are too many rows returning, the leak becomes appereant. You can reproduce the issue wout the fix with the following commands: ```SQL CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint); SELECT create_distributed_table('users_table', 'user_id'); insert into users_table SELECT i, now(), i, i, i, i FROM generate_series(0,99999)i; -- load faster -- 200,000 INSERT INTO users_table SELECT * FROM users_table; -- 400,000 INSERT INTO users_table SELECT * FROM users_table; -- 800,000 INSERT INTO users_table SELECT * FROM users_table; -- 1,600,000 INSERT INTO users_table SELECT * FROM users_table; -- 3,200,000 INSERT INTO users_table SELECT * FROM users_table; -- 6,400,000 INSERT INTO users_table SELECT * FROM users_table; -- 12,800,000 INSERT INTO users_table SELECT * FROM users_table; -- making the target list entry wider speeds up the leak to show up select *,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,* FROM users_table ; ```pull/3114/head
parent
78e495e030
commit
a208f8b151
|
@ -262,6 +262,17 @@ typedef struct DistributedExecution
|
|||
|
||||
/* statistics on distributed execution */
|
||||
DistributedExecutionStats *executionStats;
|
||||
|
||||
/*
|
||||
* The following fields are used while receiving results from remote nodes.
|
||||
* We store this information here to avoid re-allocating it every time.
|
||||
*
|
||||
* columnArray field is reset/calculated per row, so might be useless for other
|
||||
* contexts. The benefit of keeping it here is to avoid allocating the array
|
||||
* over and over again.
|
||||
*/
|
||||
AttInMetadata *attributeInputMetadata;
|
||||
char **columnArray;
|
||||
} DistributedExecution;
|
||||
|
||||
/*
|
||||
|
@ -850,6 +861,19 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu
|
|||
execution->connectionSetChanged = false;
|
||||
execution->waitFlagsChanged = false;
|
||||
|
||||
/* allocate execution specific data once, on the ExecutorState memory context */
|
||||
if (tupleDescriptor != NULL)
|
||||
{
|
||||
execution->attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
|
||||
execution->columnArray =
|
||||
(char **) palloc0(tupleDescriptor->natts * sizeof(char *));
|
||||
}
|
||||
else
|
||||
{
|
||||
execution->attributeInputMetadata = NULL;
|
||||
execution->columnArray = NULL;
|
||||
}
|
||||
|
||||
if (ShouldExecuteTasksLocally(taskList))
|
||||
{
|
||||
bool readOnlyPlan = !TaskListModifiesDatabase(modLevel, taskList);
|
||||
|
@ -3016,23 +3040,27 @@ ReceiveResults(WorkerSession *session, bool storeRows)
|
|||
DistributedExecution *execution = workerPool->distributedExecution;
|
||||
DistributedExecutionStats *executionStats = execution->executionStats;
|
||||
TupleDesc tupleDescriptor = execution->tupleDescriptor;
|
||||
AttInMetadata *attributeInputMetadata = NULL;
|
||||
AttInMetadata *attributeInputMetadata = execution->attributeInputMetadata;
|
||||
uint32 expectedColumnCount = 0;
|
||||
char **columnArray = NULL;
|
||||
char **columnArray = execution->columnArray;
|
||||
Tuplestorestate *tupleStore = execution->tupleStore;
|
||||
MemoryContext ioContext = NULL;
|
||||
|
||||
MemoryContext ioContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"ReceiveResults",
|
||||
ALLOCSET_DEFAULT_MINSIZE,
|
||||
ALLOCSET_DEFAULT_INITSIZE,
|
||||
ALLOCSET_DEFAULT_MAXSIZE);
|
||||
if (tupleDescriptor != NULL)
|
||||
{
|
||||
attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
|
||||
expectedColumnCount = tupleDescriptor->natts;
|
||||
columnArray = (char **) palloc0(expectedColumnCount * sizeof(char *));
|
||||
}
|
||||
|
||||
/*
|
||||
* We use this context while converting each row fetched from remote node
|
||||
* into tuple. The context is reseted on every row, thus we create it at the
|
||||
* start of the loop and reset on every iteration.
|
||||
*/
|
||||
ioContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"IoContext",
|
||||
ALLOCSET_DEFAULT_MINSIZE,
|
||||
ALLOCSET_DEFAULT_INITSIZE,
|
||||
ALLOCSET_DEFAULT_MAXSIZE);
|
||||
|
||||
while (!PQisBusy(connection->pgConn))
|
||||
{
|
||||
|
@ -3113,7 +3141,7 @@ ReceiveResults(WorkerSession *session, bool storeRows)
|
|||
for (rowIndex = 0; rowIndex < rowsProcessed; rowIndex++)
|
||||
{
|
||||
HeapTuple heapTuple = NULL;
|
||||
MemoryContext oldContext = NULL;
|
||||
MemoryContext oldContextPerRow = NULL;
|
||||
memset(columnArray, 0, columnCount * sizeof(char *));
|
||||
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
|
@ -3139,11 +3167,11 @@ ReceiveResults(WorkerSession *session, bool storeRows)
|
|||
* protects us from any memory leaks that might be present in I/O functions
|
||||
* called by BuildTupleFromCStrings.
|
||||
*/
|
||||
oldContext = MemoryContextSwitchTo(ioContext);
|
||||
oldContextPerRow = MemoryContextSwitchTo(ioContext);
|
||||
|
||||
heapTuple = BuildTupleFromCStrings(attributeInputMetadata, columnArray);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
MemoryContextSwitchTo(oldContextPerRow);
|
||||
|
||||
tuplestore_puttuple(tupleStore, heapTuple);
|
||||
MemoryContextReset(ioContext);
|
||||
|
@ -3159,11 +3187,7 @@ ReceiveResults(WorkerSession *session, bool storeRows)
|
|||
}
|
||||
}
|
||||
|
||||
if (columnArray != NULL)
|
||||
{
|
||||
pfree(columnArray);
|
||||
}
|
||||
|
||||
/* the context is local to the function, so not needed anymore */
|
||||
MemoryContextDelete(ioContext);
|
||||
|
||||
return fetchDone;
|
||||
|
|
Loading…
Reference in New Issue