diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ca54b1b5d..66c486f47 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -746,6 +746,12 @@ AdaptiveExecutor(CitusScanState *scanState) /* we should only call this once before the scan finished */ Assert(!scanState->finishedRemoteScan); + MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, + "AdaptiveExecutor", + ALLOCSET_DEFAULT_SIZES); + MemoryContext oldContext = MemoryContextSwitchTo(localContext); + + /* Reset Task fields that are only valid for a single execution */ ResetExplainAnalyzeData(taskList); @@ -834,6 +840,8 @@ AdaptiveExecutor(CitusScanState *scanState) SortTupleStore(scanState); } + MemoryContextSwitchTo(oldContext); + return resultSlot; } diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 7c873b2d2..e1820a74e 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -189,6 +189,12 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags) { CitusBeginModifyScan(node, estate, eflags); } + + /* + * In case of a prepared statement, we will see this distributed plan again + * on the next execution with a higher usage counter. + */ + distributedPlan->numberOfTimesExecuted++; } @@ -315,6 +321,11 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags) PlanState *planState = &(scanState->customScanState.ss.ps); DistributedPlan *originalDistributedPlan = scanState->distributedPlan; + MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, + "CitusBeginModifyScan", + ALLOCSET_DEFAULT_SIZES); + MemoryContext oldContext = MemoryContextSwitchTo(localContext); + DistributedPlan *currentPlan = CopyDistributedPlanWithoutCache(originalDistributedPlan); scanState->distributedPlan = currentPlan; @@ -405,6 +416,8 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags) */ CacheLocalPlanForShardQuery(task, originalDistributedPlan); } + + MemoryContextSwitchTo(oldContext); } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 9de14db2c..a37e85fea 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -229,9 +229,19 @@ ExecuteLocalTaskListExtended(List *taskList, numParams = paramListInfo->numParams; } + /* + * Use a new memory context that gets reset after every task to free + * the deparsed query string and query plan. + */ + MemoryContext loopContext = AllocSetContextCreate(CurrentMemoryContext, + "ExecuteLocalTaskListExtended", + ALLOCSET_DEFAULT_SIZES); + Task *task = NULL; foreach_ptr(task, taskList) { + MemoryContext oldContext = MemoryContextSwitchTo(loopContext); + TupleDestination *tupleDest = task->tupleDest ? task->tupleDest : defaultTupleDest; @@ -261,6 +271,9 @@ ExecuteLocalTaskListExtended(List *taskList, if (isUtilityCommand) { ExecuteUtilityCommand(TaskQueryString(task)); + + MemoryContextSwitchTo(oldContext); + MemoryContextReset(loopContext); continue; } @@ -308,6 +321,9 @@ ExecuteLocalTaskListExtended(List *taskList, totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries(queryStringList, tupleDest, task); + + MemoryContextSwitchTo(oldContext); + MemoryContextReset(loopContext); continue; } @@ -343,6 +359,9 @@ ExecuteLocalTaskListExtended(List *taskList, totalRowsProcessed += ExecuteLocalTaskPlan(localPlan, shardQueryString, tupleDest, task, paramListInfo); + + MemoryContextSwitchTo(oldContext); + MemoryContextReset(loopContext); } return totalRowsProcessed; @@ -582,6 +601,12 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, RecordNonDistTableAccessesForTask(task); + MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, + "ExecuteLocalTaskPlan", + ALLOCSET_DEFAULT_SIZES); + + MemoryContext oldContext = MemoryContextSwitchTo(localContext); + /* * Some tuple destinations look at task->taskPlacementList to determine * where the result came from using the placement index. Since a local @@ -625,6 +650,9 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, FreeQueryDesc(queryDesc); + MemoryContextSwitchTo(oldContext); + MemoryContextDelete(localContext); + return totalRowsProcessed; } diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 73666935e..01998b029 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -59,6 +59,7 @@ RebuildQueryStrings(Job *workerJob) Query *originalQuery = workerJob->jobQuery; List *taskList = workerJob->taskList; Task *task = NULL; + bool isSingleTask = list_length(taskList) == 1; if (originalQuery->commandType == CMD_INSERT) { @@ -74,7 +75,7 @@ RebuildQueryStrings(Job *workerJob) * task, we scribble on the original query to avoid the copying * overhead. */ - if (list_length(taskList) > 1) + if (!isSingleTask) { query = copyObject(originalQuery); } @@ -119,6 +120,19 @@ RebuildQueryStrings(Job *workerJob) * deparse_shard_query when the string is needed */ task->anchorDistributedTableId = modifiedRelationRTE->relid; + + /* + * For multi-row inserts, we modify the VALUES before storing the + * query in the task. + */ + RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(query); + if (valuesRTE != NULL) + { + Assert(valuesRTE->rtekind == RTE_VALUES); + Assert(task->rowValuesLists != NULL); + + valuesRTE->values_lists = task->rowValuesLists; + } } bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT || @@ -180,39 +194,7 @@ AddInsertAliasIfNeeded(Query *query) static void UpdateTaskQueryString(Query *query, Task *task) { - List *oldValuesLists = NIL; - RangeTblEntry *valuesRTE = NULL; - - if (query->commandType == CMD_INSERT) - { - /* extract the VALUES from the INSERT */ - valuesRTE = ExtractDistributedInsertValuesRTE(query); - - if (valuesRTE != NULL) - { - Assert(valuesRTE->rtekind == RTE_VALUES); - Assert(task->rowValuesLists != NULL); - - oldValuesLists = valuesRTE->values_lists; - valuesRTE->values_lists = task->rowValuesLists; - } - - if (ShouldLazyDeparseQuery(task)) - { - /* - * not all insert queries are copied before calling this - * function, so we do it here - */ - query = copyObject(query); - } - } - SetTaskQueryIfShouldLazyDeparse(task, query); - - if (valuesRTE != NULL) - { - valuesRTE->values_lists = oldValuesLists; - } } diff --git a/src/backend/distributed/planner/local_plan_cache.c b/src/backend/distributed/planner/local_plan_cache.c index 77a80a892..3ae83d235 100644 --- a/src/backend/distributed/planner/local_plan_cache.c +++ b/src/backend/distributed/planner/local_plan_cache.c @@ -139,6 +139,14 @@ GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan) bool IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistributedPlan) { + if (originalDistributedPlan->numberOfTimesExecuted < 1) + { + /* + * Only cache if a plan is being reused (via a prepared statement). + */ + return false; + } + if (!currentJob->deferredPruning) { /* diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 8e725a7d0..fba73445f 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -135,6 +135,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) COPY_NODE_FIELD(subPlanList); COPY_NODE_FIELD(usedSubPlanNodeList); COPY_SCALAR_FIELD(fastPathRouterPlan); + COPY_SCALAR_FIELD(numberOfTimesExecuted); COPY_NODE_FIELD(planningError); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index f8bd9bbc7..a3743c281 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -198,6 +198,7 @@ OutDistributedPlan(OUTFUNC_ARGS) WRITE_NODE_FIELD(subPlanList); WRITE_NODE_FIELD(usedSubPlanNodeList); WRITE_BOOL_FIELD(fastPathRouterPlan); + WRITE_UINT_FIELD(numberOfTimesExecuted); WRITE_NODE_FIELD(planningError); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 1fc75f8d2..740adfdd0 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -448,6 +448,9 @@ typedef struct DistributedPlan */ bool fastPathRouterPlan; + /* number of times this plan has been used (as a prepared statement) */ + uint32 numberOfTimesExecuted; + /* * NULL if this a valid plan, an error description otherwise. This will * e.g. be set if SQL features are present that a planner doesn't support,