mirror of https://github.com/citusdata/citus.git
Properly cache fast-path remote non-SELECT queries
Basically, apply what we do for SELECTremote_prepared_txes
parent
3ece691b79
commit
8613ae173c
|
@ -325,7 +325,6 @@ CitusBeginReadOnlyScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
/* job query no longer has parameters, so we should not send any */
|
/* job query no longer has parameters, so we should not send any */
|
||||||
workerJob->parametersInJobQueryResolved = true;
|
workerJob->parametersInJobQueryResolved = true;
|
||||||
|
|
||||||
|
|
||||||
/* do the heavy lifting of deparsing unless we cannot find any cache */
|
/* do the heavy lifting of deparsing unless we cannot find any cache */
|
||||||
SetTaskQueryString(task, DeparseTaskQuery(task, workerJob->jobQuery));
|
SetTaskQueryString(task, DeparseTaskQuery(task, workerJob->jobQuery));
|
||||||
}
|
}
|
||||||
|
@ -372,7 +371,6 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
{
|
{
|
||||||
ExecuteCoordinatorEvaluableExpressions(jobQuery, planState);
|
ExecuteCoordinatorEvaluableExpressions(jobQuery, planState);
|
||||||
|
|
||||||
/* job query no longer has parameters, so we should not send any */
|
|
||||||
workerJob->parametersInJobQueryResolved = true;
|
workerJob->parametersInJobQueryResolved = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -439,30 +437,38 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList);
|
workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Now that we have populated the task placements we can determine whether
|
|
||||||
* any of them are local to this node and cache a plan if needed.
|
|
||||||
*/
|
|
||||||
if (IsFastPathPlanCachingSupported(workerJob, originalDistributedPlan))
|
if (IsFastPathPlanCachingSupported(workerJob, originalDistributedPlan))
|
||||||
{
|
{
|
||||||
Task *task = linitial(workerJob->taskList);
|
Task *task = linitial(workerJob->taskList);
|
||||||
|
|
||||||
/*
|
FastPathPlanCache *fastPathPlanCache =
|
||||||
* We are going to execute this task locally. If it's not already in
|
CacheFastPathPlanForShardQuery(task, workerJob, originalDistributedPlan,
|
||||||
* the cache, create a local plan now and add it to the cache. During
|
estate->es_param_list_info);
|
||||||
* execution, we will get the plan from the cache.
|
if (fastPathPlanCache == NULL)
|
||||||
*
|
{
|
||||||
* WARNING: In this function we'll use the original plan with the original
|
/* job query no longer has parameters, so we should not send any */
|
||||||
* query tree, meaning parameters and function calls are back and we'll
|
|
||||||
* redo evaluation in the local (Postgres) executor. The reason we do this
|
/* do the heavy lifting of deparsing unless we cannot find any cache */
|
||||||
* is that we only need to cache one generic plan per shard.
|
|
||||||
*
|
ListCell *taskCell = NULL;
|
||||||
* The plan will be cached across executions when originalDistributedPlan
|
|
||||||
* represents a prepared statement.
|
foreach(taskCell, workerJob->taskList)
|
||||||
*/
|
{
|
||||||
CacheFastPathPlanForShardQuery(task, workerJob, originalDistributedPlan,
|
Task *taskI = (Task *) lfirst(taskCell);
|
||||||
estate->es_param_list_info);
|
|
||||||
|
SetTaskQueryString(taskI, DeparseTaskQuery(taskI, workerJob->jobQuery));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (fastPathPlanCache->queryString != NULL)
|
||||||
|
{
|
||||||
|
task->parametersInQueryStringResolved = false;
|
||||||
|
|
||||||
|
SetTaskQueryString(task, fastPathPlanCache->queryString);
|
||||||
|
|
||||||
|
/* TODO: we have this due to MarkUnreferencedExternParams. Can we find another way? */
|
||||||
|
workerJob->jobQuery = copyObject(
|
||||||
|
originalDistributedPlan->workerJob->jobQuery);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
|
@ -345,6 +345,15 @@ IsFastPathPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistrib
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (currentJob->requiresCoordinatorEvaluation)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We want functions to be evaluated on the coordinator, and
|
||||||
|
* we do not want to put into the cache.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (!currentJob->deferredPruning)
|
if (!currentJob->deferredPruning)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -1707,6 +1707,7 @@ RouterInsertTaskList(Query *query, bool parametersInQueryResolved,
|
||||||
modifyTask->taskPlacementList = ActiveShardPlacementList(
|
modifyTask->taskPlacementList = ActiveShardPlacementList(
|
||||||
modifyRoute->shardId);
|
modifyRoute->shardId);
|
||||||
modifyTask->parametersInQueryStringResolved = parametersInQueryResolved;
|
modifyTask->parametersInQueryStringResolved = parametersInQueryResolved;
|
||||||
|
modifyTask->deferredPruning = true;
|
||||||
|
|
||||||
insertTaskList = lappend(insertTaskList, modifyTask);
|
insertTaskList = lappend(insertTaskList, modifyTask);
|
||||||
}
|
}
|
||||||
|
@ -1810,6 +1811,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
||||||
{
|
{
|
||||||
Job *job = CreateJob(originalQuery);
|
Job *job = CreateJob(originalQuery);
|
||||||
job->deferredPruning = true;
|
job->deferredPruning = true;
|
||||||
|
job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation;
|
||||||
|
|
||||||
ereport(DEBUG2, (errmsg("Deferred pruning for a fast-path router "
|
ereport(DEBUG2, (errmsg("Deferred pruning for a fast-path router "
|
||||||
"query")));
|
"query")));
|
||||||
|
|
Loading…
Reference in New Issue