diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 52f3b289f..c83892b3e 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -325,7 +325,6 @@ CitusBeginReadOnlyScan(CustomScanState *node, EState *estate, int eflags) /* job query no longer has parameters, so we should not send any */ workerJob->parametersInJobQueryResolved = true; - /* do the heavy lifting of deparsing unless we cannot find any cache */ SetTaskQueryString(task, DeparseTaskQuery(task, workerJob->jobQuery)); } @@ -372,7 +371,6 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags) { ExecuteCoordinatorEvaluableExpressions(jobQuery, planState); - /* job query no longer has parameters, so we should not send any */ workerJob->parametersInJobQueryResolved = true; } @@ -439,30 +437,38 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags) workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList); } - - /* - * Now that we have populated the task placements we can determine whether - * any of them are local to this node and cache a plan if needed. - */ if (IsFastPathPlanCachingSupported(workerJob, originalDistributedPlan)) { Task *task = linitial(workerJob->taskList); - /* - * We are going to execute this task locally. If it's not already in - * the cache, create a local plan now and add it to the cache. During - * execution, we will get the plan from the cache. - * - * WARNING: In this function we'll use the original plan with the original - * query tree, meaning parameters and function calls are back and we'll - * redo evaluation in the local (Postgres) executor. The reason we do this - * is that we only need to cache one generic plan per shard. - * - * The plan will be cached across executions when originalDistributedPlan - * represents a prepared statement. - */ - CacheFastPathPlanForShardQuery(task, workerJob, originalDistributedPlan, - estate->es_param_list_info); + FastPathPlanCache *fastPathPlanCache = + CacheFastPathPlanForShardQuery(task, workerJob, originalDistributedPlan, + estate->es_param_list_info); + if (fastPathPlanCache == NULL) + { + /* job query no longer has parameters, so we should not send any */ + + /* do the heavy lifting of deparsing unless we cannot find any cache */ + + ListCell *taskCell = NULL; + + foreach(taskCell, workerJob->taskList) + { + Task *taskI = (Task *) lfirst(taskCell); + + SetTaskQueryString(taskI, DeparseTaskQuery(taskI, workerJob->jobQuery)); + } + } + else if (fastPathPlanCache->queryString != NULL) + { + task->parametersInQueryStringResolved = false; + + SetTaskQueryString(task, fastPathPlanCache->queryString); + + /* TODO: we have this due to MarkUnreferencedExternParams. Can we find another way? */ + workerJob->jobQuery = copyObject( + originalDistributedPlan->workerJob->jobQuery); + } } MemoryContextSwitchTo(oldContext); diff --git a/src/backend/distributed/planner/fast_path_plan_cache.c b/src/backend/distributed/planner/fast_path_plan_cache.c index 2bf5637ac..1310fb914 100644 --- a/src/backend/distributed/planner/fast_path_plan_cache.c +++ b/src/backend/distributed/planner/fast_path_plan_cache.c @@ -345,6 +345,15 @@ IsFastPathPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistrib return false; } + if (currentJob->requiresCoordinatorEvaluation) + { + /* + * We want functions to be evaluated on the coordinator, and + * we do not want to put into the cache. + */ + return false; + } + if (!currentJob->deferredPruning) { /* diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index beb449526..ec70fe704 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1707,6 +1707,7 @@ RouterInsertTaskList(Query *query, bool parametersInQueryResolved, modifyTask->taskPlacementList = ActiveShardPlacementList( modifyRoute->shardId); modifyTask->parametersInQueryStringResolved = parametersInQueryResolved; + modifyTask->deferredPruning = true; insertTaskList = lappend(insertTaskList, modifyTask); } @@ -1810,6 +1811,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon { Job *job = CreateJob(originalQuery); job->deferredPruning = true; + job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation; ereport(DEBUG2, (errmsg("Deferred pruning for a fast-path router " "query")));