Merge pull request #4849 from citusdata/marcocitus/fix-insert-mem

pull/4895/head
Marco Slot 2021-05-19 14:19:02 +02:00 committed by GitHub
commit 54c9bf8342
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 77 additions and 33 deletions

View File

@ -746,6 +746,12 @@ AdaptiveExecutor(CitusScanState *scanState)
/* we should only call this once before the scan finished */ /* we should only call this once before the scan finished */
Assert(!scanState->finishedRemoteScan); Assert(!scanState->finishedRemoteScan);
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"AdaptiveExecutor",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
/* Reset Task fields that are only valid for a single execution */ /* Reset Task fields that are only valid for a single execution */
ResetExplainAnalyzeData(taskList); ResetExplainAnalyzeData(taskList);
@ -834,6 +840,8 @@ AdaptiveExecutor(CitusScanState *scanState)
SortTupleStore(scanState); SortTupleStore(scanState);
} }
MemoryContextSwitchTo(oldContext);
return resultSlot; return resultSlot;
} }

View File

@ -189,6 +189,12 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
{ {
CitusBeginModifyScan(node, estate, eflags); CitusBeginModifyScan(node, estate, eflags);
} }
/*
* In case of a prepared statement, we will see this distributed plan again
* on the next execution with a higher usage counter.
*/
distributedPlan->numberOfTimesExecuted++;
} }
@ -315,6 +321,11 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
PlanState *planState = &(scanState->customScanState.ss.ps); PlanState *planState = &(scanState->customScanState.ss.ps);
DistributedPlan *originalDistributedPlan = scanState->distributedPlan; DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CitusBeginModifyScan",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
DistributedPlan *currentPlan = DistributedPlan *currentPlan =
CopyDistributedPlanWithoutCache(originalDistributedPlan); CopyDistributedPlanWithoutCache(originalDistributedPlan);
scanState->distributedPlan = currentPlan; scanState->distributedPlan = currentPlan;
@ -405,6 +416,8 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
*/ */
CacheLocalPlanForShardQuery(task, originalDistributedPlan); CacheLocalPlanForShardQuery(task, originalDistributedPlan);
} }
MemoryContextSwitchTo(oldContext);
} }

View File

@ -229,9 +229,19 @@ ExecuteLocalTaskListExtended(List *taskList,
numParams = paramListInfo->numParams; numParams = paramListInfo->numParams;
} }
/*
* Use a new memory context that gets reset after every task to free
* the deparsed query string and query plan.
*/
MemoryContext loopContext = AllocSetContextCreate(CurrentMemoryContext,
"ExecuteLocalTaskListExtended",
ALLOCSET_DEFAULT_SIZES);
Task *task = NULL; Task *task = NULL;
foreach_ptr(task, taskList) foreach_ptr(task, taskList)
{ {
MemoryContext oldContext = MemoryContextSwitchTo(loopContext);
TupleDestination *tupleDest = task->tupleDest ? TupleDestination *tupleDest = task->tupleDest ?
task->tupleDest : task->tupleDest :
defaultTupleDest; defaultTupleDest;
@ -261,6 +271,9 @@ ExecuteLocalTaskListExtended(List *taskList,
if (isUtilityCommand) if (isUtilityCommand)
{ {
ExecuteUtilityCommand(TaskQueryString(task)); ExecuteUtilityCommand(TaskQueryString(task));
MemoryContextSwitchTo(oldContext);
MemoryContextReset(loopContext);
continue; continue;
} }
@ -308,6 +321,9 @@ ExecuteLocalTaskListExtended(List *taskList,
totalRowsProcessed += totalRowsProcessed +=
LocallyPlanAndExecuteMultipleQueries(queryStringList, tupleDest, LocallyPlanAndExecuteMultipleQueries(queryStringList, tupleDest,
task); task);
MemoryContextSwitchTo(oldContext);
MemoryContextReset(loopContext);
continue; continue;
} }
@ -343,6 +359,9 @@ ExecuteLocalTaskListExtended(List *taskList,
totalRowsProcessed += totalRowsProcessed +=
ExecuteLocalTaskPlan(localPlan, shardQueryString, ExecuteLocalTaskPlan(localPlan, shardQueryString,
tupleDest, task, paramListInfo); tupleDest, task, paramListInfo);
MemoryContextSwitchTo(oldContext);
MemoryContextReset(loopContext);
} }
return totalRowsProcessed; return totalRowsProcessed;
@ -582,6 +601,12 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
RecordNonDistTableAccessesForTask(task); RecordNonDistTableAccessesForTask(task);
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"ExecuteLocalTaskPlan",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
/* /*
* Some tuple destinations look at task->taskPlacementList to determine * Some tuple destinations look at task->taskPlacementList to determine
* where the result came from using the placement index. Since a local * where the result came from using the placement index. Since a local
@ -625,6 +650,9 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
FreeQueryDesc(queryDesc); FreeQueryDesc(queryDesc);
MemoryContextSwitchTo(oldContext);
MemoryContextDelete(localContext);
return totalRowsProcessed; return totalRowsProcessed;
} }

View File

@ -59,6 +59,7 @@ RebuildQueryStrings(Job *workerJob)
Query *originalQuery = workerJob->jobQuery; Query *originalQuery = workerJob->jobQuery;
List *taskList = workerJob->taskList; List *taskList = workerJob->taskList;
Task *task = NULL; Task *task = NULL;
bool isSingleTask = list_length(taskList) == 1;
if (originalQuery->commandType == CMD_INSERT) if (originalQuery->commandType == CMD_INSERT)
{ {
@ -74,7 +75,7 @@ RebuildQueryStrings(Job *workerJob)
* task, we scribble on the original query to avoid the copying * task, we scribble on the original query to avoid the copying
* overhead. * overhead.
*/ */
if (list_length(taskList) > 1) if (!isSingleTask)
{ {
query = copyObject(originalQuery); query = copyObject(originalQuery);
} }
@ -119,6 +120,19 @@ RebuildQueryStrings(Job *workerJob)
* deparse_shard_query when the string is needed * deparse_shard_query when the string is needed
*/ */
task->anchorDistributedTableId = modifiedRelationRTE->relid; task->anchorDistributedTableId = modifiedRelationRTE->relid;
/*
* For multi-row inserts, we modify the VALUES before storing the
* query in the task.
*/
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(query);
if (valuesRTE != NULL)
{
Assert(valuesRTE->rtekind == RTE_VALUES);
Assert(task->rowValuesLists != NULL);
valuesRTE->values_lists = task->rowValuesLists;
}
} }
bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT || bool isQueryObjectOrText = GetTaskQueryType(task) == TASK_QUERY_TEXT ||
@ -180,39 +194,7 @@ AddInsertAliasIfNeeded(Query *query)
static void static void
UpdateTaskQueryString(Query *query, Task *task) UpdateTaskQueryString(Query *query, Task *task)
{ {
List *oldValuesLists = NIL;
RangeTblEntry *valuesRTE = NULL;
if (query->commandType == CMD_INSERT)
{
/* extract the VALUES from the INSERT */
valuesRTE = ExtractDistributedInsertValuesRTE(query);
if (valuesRTE != NULL)
{
Assert(valuesRTE->rtekind == RTE_VALUES);
Assert(task->rowValuesLists != NULL);
oldValuesLists = valuesRTE->values_lists;
valuesRTE->values_lists = task->rowValuesLists;
}
if (ShouldLazyDeparseQuery(task))
{
/*
* not all insert queries are copied before calling this
* function, so we do it here
*/
query = copyObject(query);
}
}
SetTaskQueryIfShouldLazyDeparse(task, query); SetTaskQueryIfShouldLazyDeparse(task, query);
if (valuesRTE != NULL)
{
valuesRTE->values_lists = oldValuesLists;
}
} }

View File

@ -139,6 +139,14 @@ GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan)
bool bool
IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistributedPlan) IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistributedPlan)
{ {
if (originalDistributedPlan->numberOfTimesExecuted < 1)
{
/*
* Only cache if a plan is being reused (via a prepared statement).
*/
return false;
}
if (!currentJob->deferredPruning) if (!currentJob->deferredPruning)
{ {
/* /*

View File

@ -135,6 +135,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS)
COPY_NODE_FIELD(subPlanList); COPY_NODE_FIELD(subPlanList);
COPY_NODE_FIELD(usedSubPlanNodeList); COPY_NODE_FIELD(usedSubPlanNodeList);
COPY_SCALAR_FIELD(fastPathRouterPlan); COPY_SCALAR_FIELD(fastPathRouterPlan);
COPY_SCALAR_FIELD(numberOfTimesExecuted);
COPY_NODE_FIELD(planningError); COPY_NODE_FIELD(planningError);
} }

View File

@ -198,6 +198,7 @@ OutDistributedPlan(OUTFUNC_ARGS)
WRITE_NODE_FIELD(subPlanList); WRITE_NODE_FIELD(subPlanList);
WRITE_NODE_FIELD(usedSubPlanNodeList); WRITE_NODE_FIELD(usedSubPlanNodeList);
WRITE_BOOL_FIELD(fastPathRouterPlan); WRITE_BOOL_FIELD(fastPathRouterPlan);
WRITE_UINT_FIELD(numberOfTimesExecuted);
WRITE_NODE_FIELD(planningError); WRITE_NODE_FIELD(planningError);
} }

View File

@ -448,6 +448,9 @@ typedef struct DistributedPlan
*/ */
bool fastPathRouterPlan; bool fastPathRouterPlan;
/* number of times this plan has been used (as a prepared statement) */
uint32 numberOfTimesExecuted;
/* /*
* NULL if this a valid plan, an error description otherwise. This will * NULL if this a valid plan, an error description otherwise. This will
* e.g. be set if SQL features are present that a planner doesn't support, * e.g. be set if SQL features are present that a planner doesn't support,