From 00792831adbd82dd2272524155c7b55f626463b8 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 24 Mar 2021 10:46:01 +0100 Subject: [PATCH] Add execution memory contexts and free after local query execution --- .../distributed/executor/adaptive_executor.c | 8 ++++++ .../distributed/executor/citus_custom_scan.c | 7 +++++ .../distributed/executor/local_executor.c | 28 +++++++++++++++++++ 3 files changed, 43 insertions(+) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ca54b1b5d..66c486f47 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -746,6 +746,12 @@ AdaptiveExecutor(CitusScanState *scanState) /* we should only call this once before the scan finished */ Assert(!scanState->finishedRemoteScan); + MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, + "AdaptiveExecutor", + ALLOCSET_DEFAULT_SIZES); + MemoryContext oldContext = MemoryContextSwitchTo(localContext); + + /* Reset Task fields that are only valid for a single execution */ ResetExplainAnalyzeData(taskList); @@ -834,6 +840,8 @@ AdaptiveExecutor(CitusScanState *scanState) SortTupleStore(scanState); } + MemoryContextSwitchTo(oldContext); + return resultSlot; } diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 7c873b2d2..11113b9d6 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -315,6 +315,11 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags) PlanState *planState = &(scanState->customScanState.ss.ps); DistributedPlan *originalDistributedPlan = scanState->distributedPlan; + MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, + "CitusBeginModifyScan", + ALLOCSET_DEFAULT_SIZES); + MemoryContext oldContext = MemoryContextSwitchTo(localContext); + DistributedPlan *currentPlan = CopyDistributedPlanWithoutCache(originalDistributedPlan); scanState->distributedPlan = currentPlan; @@ -405,6 +410,8 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags) */ CacheLocalPlanForShardQuery(task, originalDistributedPlan); } + + MemoryContextSwitchTo(oldContext); } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 9de14db2c..a37e85fea 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -229,9 +229,19 @@ ExecuteLocalTaskListExtended(List *taskList, numParams = paramListInfo->numParams; } + /* + * Use a new memory context that gets reset after every task to free + * the deparsed query string and query plan. + */ + MemoryContext loopContext = AllocSetContextCreate(CurrentMemoryContext, + "ExecuteLocalTaskListExtended", + ALLOCSET_DEFAULT_SIZES); + Task *task = NULL; foreach_ptr(task, taskList) { + MemoryContext oldContext = MemoryContextSwitchTo(loopContext); + TupleDestination *tupleDest = task->tupleDest ? task->tupleDest : defaultTupleDest; @@ -261,6 +271,9 @@ ExecuteLocalTaskListExtended(List *taskList, if (isUtilityCommand) { ExecuteUtilityCommand(TaskQueryString(task)); + + MemoryContextSwitchTo(oldContext); + MemoryContextReset(loopContext); continue; } @@ -308,6 +321,9 @@ ExecuteLocalTaskListExtended(List *taskList, totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries(queryStringList, tupleDest, task); + + MemoryContextSwitchTo(oldContext); + MemoryContextReset(loopContext); continue; } @@ -343,6 +359,9 @@ ExecuteLocalTaskListExtended(List *taskList, totalRowsProcessed += ExecuteLocalTaskPlan(localPlan, shardQueryString, tupleDest, task, paramListInfo); + + MemoryContextSwitchTo(oldContext); + MemoryContextReset(loopContext); } return totalRowsProcessed; @@ -582,6 +601,12 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, RecordNonDistTableAccessesForTask(task); + MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, + "ExecuteLocalTaskPlan", + ALLOCSET_DEFAULT_SIZES); + + MemoryContext oldContext = MemoryContextSwitchTo(localContext); + /* * Some tuple destinations look at task->taskPlacementList to determine * where the result came from using the placement index. Since a local @@ -625,6 +650,9 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, FreeQueryDesc(queryDesc); + MemoryContextSwitchTo(oldContext); + MemoryContextDelete(localContext); + return totalRowsProcessed; }