Add execution memory contexts and free after local query execution

pull/4849/head
Marco Slot 2021-03-24 10:46:01 +01:00
parent 924959fdb1
commit 00792831ad
3 changed files with 43 additions and 0 deletions

View File

@ -746,6 +746,12 @@ AdaptiveExecutor(CitusScanState *scanState)
/* we should only call this once before the scan finished */ /* we should only call this once before the scan finished */
Assert(!scanState->finishedRemoteScan); Assert(!scanState->finishedRemoteScan);
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"AdaptiveExecutor",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
/* Reset Task fields that are only valid for a single execution */ /* Reset Task fields that are only valid for a single execution */
ResetExplainAnalyzeData(taskList); ResetExplainAnalyzeData(taskList);
@ -834,6 +840,8 @@ AdaptiveExecutor(CitusScanState *scanState)
SortTupleStore(scanState); SortTupleStore(scanState);
} }
MemoryContextSwitchTo(oldContext);
return resultSlot; return resultSlot;
} }

View File

@ -315,6 +315,11 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
PlanState *planState = &(scanState->customScanState.ss.ps); PlanState *planState = &(scanState->customScanState.ss.ps);
DistributedPlan *originalDistributedPlan = scanState->distributedPlan; DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CitusBeginModifyScan",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
DistributedPlan *currentPlan = DistributedPlan *currentPlan =
CopyDistributedPlanWithoutCache(originalDistributedPlan); CopyDistributedPlanWithoutCache(originalDistributedPlan);
scanState->distributedPlan = currentPlan; scanState->distributedPlan = currentPlan;
@ -405,6 +410,8 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
*/ */
CacheLocalPlanForShardQuery(task, originalDistributedPlan); CacheLocalPlanForShardQuery(task, originalDistributedPlan);
} }
MemoryContextSwitchTo(oldContext);
} }

View File

@ -229,9 +229,19 @@ ExecuteLocalTaskListExtended(List *taskList,
numParams = paramListInfo->numParams; numParams = paramListInfo->numParams;
} }
/*
* Use a new memory context that gets reset after every task to free
* the deparsed query string and query plan.
*/
MemoryContext loopContext = AllocSetContextCreate(CurrentMemoryContext,
"ExecuteLocalTaskListExtended",
ALLOCSET_DEFAULT_SIZES);
Task *task = NULL; Task *task = NULL;
foreach_ptr(task, taskList) foreach_ptr(task, taskList)
{ {
MemoryContext oldContext = MemoryContextSwitchTo(loopContext);
TupleDestination *tupleDest = task->tupleDest ? TupleDestination *tupleDest = task->tupleDest ?
task->tupleDest : task->tupleDest :
defaultTupleDest; defaultTupleDest;
@ -261,6 +271,9 @@ ExecuteLocalTaskListExtended(List *taskList,
if (isUtilityCommand) if (isUtilityCommand)
{ {
ExecuteUtilityCommand(TaskQueryString(task)); ExecuteUtilityCommand(TaskQueryString(task));
MemoryContextSwitchTo(oldContext);
MemoryContextReset(loopContext);
continue; continue;
} }
@ -308,6 +321,9 @@ ExecuteLocalTaskListExtended(List *taskList,
totalRowsProcessed += totalRowsProcessed +=
LocallyPlanAndExecuteMultipleQueries(queryStringList, tupleDest, LocallyPlanAndExecuteMultipleQueries(queryStringList, tupleDest,
task); task);
MemoryContextSwitchTo(oldContext);
MemoryContextReset(loopContext);
continue; continue;
} }
@ -343,6 +359,9 @@ ExecuteLocalTaskListExtended(List *taskList,
totalRowsProcessed += totalRowsProcessed +=
ExecuteLocalTaskPlan(localPlan, shardQueryString, ExecuteLocalTaskPlan(localPlan, shardQueryString,
tupleDest, task, paramListInfo); tupleDest, task, paramListInfo);
MemoryContextSwitchTo(oldContext);
MemoryContextReset(loopContext);
} }
return totalRowsProcessed; return totalRowsProcessed;
@ -582,6 +601,12 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
RecordNonDistTableAccessesForTask(task); RecordNonDistTableAccessesForTask(task);
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"ExecuteLocalTaskPlan",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
/* /*
* Some tuple destinations look at task->taskPlacementList to determine * Some tuple destinations look at task->taskPlacementList to determine
* where the result came from using the placement index. Since a local * where the result came from using the placement index. Since a local
@ -625,6 +650,9 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
FreeQueryDesc(queryDesc); FreeQueryDesc(queryDesc);
MemoryContextSwitchTo(oldContext);
MemoryContextDelete(localContext);
return totalRowsProcessed; return totalRowsProcessed;
} }