diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ca4662e58..e11277e72 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -3185,7 +3185,7 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, session->currentTask = placementExecution; placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING; - if (paramListInfo != NULL) + if (paramListInfo != NULL && !task->parametersInQueryStringResolved) { int parameterCount = paramListInfo->numParams; Oid *parameterTypes = NULL; diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 8b7ea274a..05c7bbd13 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -16,6 +16,7 @@ #include "distributed/citus_clauses.h" #include "distributed/citus_custom_scan.h" #include "distributed/citus_nodefuncs.h" +#include "distributed/citus_ruleutils.h" #include "distributed/deparse_shard_query.h" #include "distributed/distributed_execution_locks.h" #include "distributed/insert_select_executor.h" @@ -48,17 +49,17 @@ static Node * DelayedErrorCreateScan(CustomScan *scan); /* functions that are common to different scans */ static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags); -static void CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, - int eflags); +static void CitusBeginSelectScan(CustomScanState *node, EState *estate, int eflags); +static void CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags); static void CitusPreExecScan(CitusScanState *scanState); -static void HandleDeferredShardPruningForFastPathQueries( - DistributedPlan *distributedPlan); -static void HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan); -static void CacheLocalPlanForTask(Task *task, DistributedPlan *originalDistributedPlan); -static DistributedPlan * CopyDistributedPlanWithoutCache(CitusScanState *scanState); -static void ResetExecutionParameters(EState *executorState); -static void CitusBeginScanWithoutCoordinatorProcessing(CustomScanState *node, - EState *estate, int eflags); +static void RegenerateTaskForFasthPathQuery(Job *workerJob); +static void RegenerateTaskListForInsert(Job *workerJob); +static void CacheLocalPlanForShardQuery(Task *task, + DistributedPlan *originalDistributedPlan); +static bool IsLocalPlanCachingSupported(Job *workerJob, + DistributedPlan *originalDistributedPlan); +static DistributedPlan * CopyDistributedPlanWithoutCache( + DistributedPlan *originalDistributedPlan); static void CitusEndScan(CustomScanState *node); static void CitusReScan(CustomScanState *node); @@ -187,16 +188,22 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags) #endif DistributedPlan *distributedPlan = scanState->distributedPlan; - Job *workerJob = distributedPlan->workerJob; - if (workerJob && - (workerJob->requiresMasterEvaluation || workerJob->deferredPruning)) + if (distributedPlan->insertSelectQuery != NULL) { - CitusBeginScanWithCoordinatorProcessing(node, estate, eflags); - + /* + * INSERT..SELECT via coordinator or re-partitioning are special because + * the SELECT part is planned separately. + */ return; } - - CitusBeginScanWithoutCoordinatorProcessing(node, estate, eflags); + else if (distributedPlan->modLevel == ROW_MODIFY_READONLY) + { + CitusBeginSelectScan(node, estate, eflags); + } + else + { + CitusBeginModifyScan(node, estate, eflags); + } } @@ -233,179 +240,176 @@ CitusExecScan(CustomScanState *node) /* - * CitusBeginScanWithoutCoordinatorProcessing is intended to work on all executions - * that do not require any coordinator processing. The function simply acquires the - * necessary locks on the shards involved in the task list of the distributed plan - * and does the placement assignements. This implies that the function is a no-op for - * SELECT queries as they do not require any locking and placement assignements. + * CitusBeginSelectScan handles deferred pruning and plan caching for SELECTs. */ static void -CitusBeginScanWithoutCoordinatorProcessing(CustomScanState *node, EState *estate, int - eflags) +CitusBeginSelectScan(CustomScanState *node, EState *estate, int eflags) { CitusScanState *scanState = (CitusScanState *) node; - DistributedPlan *distributedPlan = scanState->distributedPlan; + DistributedPlan *originalDistributedPlan = scanState->distributedPlan; - if (distributedPlan->modLevel == ROW_MODIFY_READONLY || - distributedPlan->insertSelectQuery != NULL) + if (!originalDistributedPlan->workerJob->deferredPruning) { + /* + * For SELECT queries that have already been pruned we can proceed straight + * to execution, since none of the prepared statement logic applies. + */ return; } - /* we'll be modifying the distributed plan by assigning taskList, do it on a copy */ - distributedPlan = copyObject(distributedPlan); - scanState->distributedPlan = distributedPlan; + /* + * Create a copy of the generic plan for the current execution, but make a shallow + * copy of the plan cache. That means we'll be able to access the plan cache via + * currentPlan->workerJob->localPlannedStatements, but it will be preserved across + * executions by the prepared statement logic. + */ + DistributedPlan *currentPlan = + CopyDistributedPlanWithoutCache(originalDistributedPlan); + scanState->distributedPlan = currentPlan; - Job *workerJob = distributedPlan->workerJob; - List *taskList = workerJob->taskList; + Job *workerJob = currentPlan->workerJob; + Query *jobQuery = workerJob->jobQuery; + PlanState *planState = &(scanState->customScanState.ss.ps); /* - * These more complex jobs should have been evaluated in - * CitusBeginScanWithCoordinatorProcessing. + * We only do deferred pruning for fast path queries, which have a single + * partition column value. */ - Assert(!(workerJob->requiresMasterEvaluation || workerJob->deferredPruning)); + Assert(currentPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner); - /* prevent concurrent placement changes */ - AcquireMetadataLocks(taskList); + /* + * Evaluate parameters, because the parameters are only available on the + * coordinator and are required for pruning. + * + * We don't evaluate functions for read-only queries on the coordinator + * at the moment. Most function calls would be in a context where they + * should be re-evaluated for every row in case of volatile functions. + * + * TODO: evaluate stable functions + */ + ExecuteMasterEvaluableParameters(jobQuery, planState); - /* modify tasks are always assigned using first-replica policy */ - workerJob->taskList = FirstReplicaAssignTaskList(taskList); + /* job query no longer has parameters, so we should not send any */ + workerJob->parametersInJobQueryResolved = true; + + /* parameters are filled in, so we can generate a task for this execution */ + RegenerateTaskForFasthPathQuery(workerJob); + + if (IsLocalPlanCachingSupported(workerJob, originalDistributedPlan)) + { + Task *task = linitial(workerJob->taskList); + + /* + * We are going to execute this task locally. If it's not already in + * the cache, create a local plan now and add it to the cache. During + * execution, we will get the plan from the cache. + * + * The plan will be cached across executions when originalDistributedPlan + * represents a prepared statement. + */ + CacheLocalPlanForShardQuery(task, originalDistributedPlan); + } } /* - * CitusBeginScanWithCoordinatorProcessing generates query strings at the start of the execution - * in two cases: when the query requires master evaluation and/or deferred shard pruning. + * CitusBeginModifyScan prepares the scan state for a modification. * - * The function is also smart about caching plans if the plan is local to this node. + * Modifications are special because: + * a) we evaluate function calls (e.g. nextval) here and the outcome may + * determine which shards are affected by this query. + * b) we need to take metadata locks to make sure no write is left behind + * when finalizing a shard move. */ static void -CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, int eflags) +CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags) { CitusScanState *scanState = (CitusScanState *) node; - DistributedPlan *originalDistributedPlan = scanState->distributedPlan; - DistributedPlan *distributedPlan = CopyDistributedPlanWithoutCache(scanState); - Job *workerJob = distributedPlan->workerJob; - Query *jobQuery = workerJob->jobQuery; - - /* we'd only get to this function with the following conditions */ - Assert(workerJob->requiresMasterEvaluation || workerJob->deferredPruning); - PlanState *planState = &(scanState->customScanState.ss.ps); + DistributedPlan *originalDistributedPlan = scanState->distributedPlan; - /* citus only evaluates functions for modification queries */ - bool modifyQueryRequiresMasterEvaluation = - jobQuery->commandType != CMD_SELECT && - (workerJob->requiresMasterEvaluation || workerJob->deferredPruning); + DistributedPlan *currentPlan = + CopyDistributedPlanWithoutCache(originalDistributedPlan); + scanState->distributedPlan = currentPlan; - /* - * ExecuteMasterEvaluableFunctions handles both function evalation - * and parameter evaluation. Pruning is most likely deferred because - * there is a parameter on the distribution key. So, evaluate in both - * cases. - */ - if (modifyQueryRequiresMasterEvaluation) + Job *workerJob = currentPlan->workerJob; + Query *jobQuery = workerJob->jobQuery; + bool evaluateAllExpressions = workerJob->requiresMasterEvaluation || + workerJob->deferredPruning; + + if (evaluateAllExpressions) { - /* evaluate functions and parameters for modification queries */ + /* evaluate both functions and parameters */ ExecuteMasterEvaluableFunctionsAndParameters(jobQuery, planState); - } - else if (jobQuery->commandType == CMD_SELECT && !workerJob->deferredPruning) - { - /* we'll use generated strings, no need to have the parameters anymore */ - EState *executorState = planState->state; - ResetExecutionParameters(executorState); - /* we're done, we don't want to evaluate functions for SELECT queries */ - return; + /* job query no longer has parameters, so we should not send any */ + workerJob->parametersInJobQueryResolved = true; } - else if (jobQuery->commandType == CMD_SELECT && workerJob->deferredPruning) + + if (workerJob->deferredPruning) { /* - * Evaluate parameters, because the parameters are only avaliable on the - * coordinator and are required for pruning. + * At this point, we're about to do the shard pruning for fast-path queries. + * Given that pruning is deferred always for INSERTs, we get here + * !EnableFastPathRouterPlanner as well. + */ + Assert(currentPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner); + + /* + * We can only now decide which shard to use, so we need to build a new task + * list. + */ + if (jobQuery->commandType == CMD_INSERT) + { + RegenerateTaskListForInsert(workerJob); + } + else + { + RegenerateTaskForFasthPathQuery(workerJob); + } + } + else if (workerJob->requiresMasterEvaluation) + { + /* + * When there is no deferred pruning, but we did evaluate functions, then + * we only rebuild the query strings in the existing tasks. + */ + RebuildQueryStrings(workerJob); + } + + /* + * Now that we know the shard ID(s) we can acquire the necessary shard metadata + * locks. Once we have the locks it's safe to load the placement metadata. + */ + + /* prevent concurrent placement changes */ + AcquireMetadataLocks(workerJob->taskList); + + /* modify tasks are always assigned using first-replica policy */ + workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList); + + /* + * Now that we have populated the task placements we can determine whether + * any of them are local to this node and cache a plan if needed. + */ + if (IsLocalPlanCachingSupported(workerJob, originalDistributedPlan)) + { + Task *task = linitial(workerJob->taskList); + + /* + * We are going to execute this task locally. If it's not already in + * the cache, create a local plan now and add it to the cache. During + * execution, we will get the plan from the cache. * - * But, we don't want to evaluate functions for read-only queries on the - * coordinator as the volatile functions could yield different - * results per shard (also per row) and could have side-effects. + * WARNING: In this function we'll use the original plan with the original + * query tree, meaning parameters and function calls are back and we'll + * redo evaluation in the local (Postgres) executor. The reason we do this + * is that we only need to cache one generic plan per shard. * - * Note that Citus already errors out for modification queries during - * planning when the query involve any volatile function that might - * diverge the shards as such functions are expected to yield different - * results per shard (also per row). + * The plan will be cached across executions when originalDistributedPlan + * represents a prepared statement. */ - ExecuteMasterEvaluableParameters(jobQuery, planState); - } - - /* - * After evaluating the function/parameters, we're done unless shard pruning - * is also deferred. - */ - if (workerJob->requiresMasterEvaluation && !workerJob->deferredPruning) - { - RebuildQueryStrings(workerJob->jobQuery, workerJob->taskList); - - /* we'll use generated strings, no need to have the parameters anymore */ - EState *executorState = planState->state; - ResetExecutionParameters(executorState); - - return; - } - - /* - * At this point, we're about to do the shard pruning for fast-path queries. - * Given that pruning is deferred always for INSERTs, we get here - * !EnableFastPathRouterPlanner as well. - */ - Assert(workerJob->deferredPruning && - (distributedPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner)); - if (jobQuery->commandType == CMD_INSERT) - { - HandleDeferredShardPruningForInserts(distributedPlan); - } - else - { - HandleDeferredShardPruningForFastPathQueries(distributedPlan); - } - - if (jobQuery->commandType != CMD_SELECT) - { - /* prevent concurrent placement changes */ - AcquireMetadataLocks(workerJob->taskList); - - /* modify tasks are always assigned using first-replica policy */ - workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList); - } - - if (list_length(distributedPlan->workerJob->taskList) != 1) - { - /* - * We might have zero shard queries or multi-row INSERTs at this point, - * we only want to cache single task queries. - */ - return; - } - - /* - * As long as the task accesses local node and the query doesn't have - * any volatile functions, we cache the local Postgres plan on the - * shard for re-use. - */ - Task *task = linitial(distributedPlan->workerJob->taskList); - if (EnableLocalExecution && TaskAccessesLocalNode(task) && - !contain_volatile_functions( - (Node *) originalDistributedPlan->workerJob->jobQuery)) - { - CacheLocalPlanForTask(task, originalDistributedPlan); - } - else - { - /* - * If we're not going to use a cached plan, we'll use the query string that is - * already generated where the parameters are replaced, so we should not have - * the parameters anymore. - */ - EState *executorState = planState->state; - ResetExecutionParameters(executorState); + CacheLocalPlanForShardQuery(task, originalDistributedPlan); } } @@ -422,15 +426,13 @@ CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, i * reasons, as they are immutable, so no need to have a deep copy. */ static DistributedPlan * -CopyDistributedPlanWithoutCache(CitusScanState *scanState) +CopyDistributedPlanWithoutCache(DistributedPlan *originalDistributedPlan) { - DistributedPlan *originalDistributedPlan = scanState->distributedPlan; List *localPlannedStatements = originalDistributedPlan->workerJob->localPlannedStatements; originalDistributedPlan->workerJob->localPlannedStatements = NIL; DistributedPlan *distributedPlan = copyObject(originalDistributedPlan); - scanState->distributedPlan = distributedPlan; /* set back the immutable field */ originalDistributedPlan->workerJob->localPlannedStatements = localPlannedStatements; @@ -441,30 +443,12 @@ CopyDistributedPlanWithoutCache(CitusScanState *scanState) /* - * ResetExecutionParameters set the parameter list to NULL. See the function - * for details. + * CacheLocalPlanForShardQuery replaces the relation OIDs in the job query + * with shard relation OIDs and then plans the query and caches the result + * in the originalDistributedPlan (which may be preserved across executions). */ static void -ResetExecutionParameters(EState *executorState) -{ - /* - * We've processed parameters in ExecuteMasterEvaluableFunctions and - * don't need to send their values to workers, since they will be - * represented as constants in the deparsed query. To avoid sending - * parameter values, we set the parameter list to NULL. - */ - executorState->es_param_list_info = NULL; -} - - -/* - * CacheLocalPlanForTask caches a plan that is local to this node in the - * originalDistributedPlan. - * - * The basic idea is to be able to skip planning on the shards when possible. - */ -static void -CacheLocalPlanForTask(Task *task, DistributedPlan *originalDistributedPlan) +CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan) { PlannedStmt *localPlan = GetCachedLocalPlan(task, originalDistributedPlan); if (localPlan != NULL) @@ -512,6 +496,15 @@ CacheLocalPlanForTask(Task *task, DistributedPlan *originalDistributedPlan) return; } + if (IsLoggableLevel(DEBUG5)) + { + StringInfo queryString = makeStringInfo(); + pg_get_query_def(shardQuery, queryString); + + ereport(DEBUG5, (errmsg("caching plan for query: %s", + queryString->data))); + } + LockRelationOid(rangeTableEntry->relid, lockMode); LocalPlannedStatement *localPlannedStatement = CitusMakeNode(LocalPlannedStatement); @@ -554,18 +547,86 @@ GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan) /* - * HandleDeferredShardPruningForInserts does the shard pruning for INSERT + * IsLocalPlanCachingSupported returns whether (part of) the task can be planned + * and executed locally and whether caching is supported (single shard, no volatile + * functions). + */ +static bool +IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistributedPlan) +{ + if (!currentJob->deferredPruning) + { + /* + * When not using deferred pruning we may have already replaced distributed + * table RTEs with citus_extradata_container RTEs to pass the shard ID to the + * deparser. In that case, we cannot pass the query tree directly to the + * planner. + * + * If desired, we can relax this check by improving the implementation of + * CacheLocalPlanForShardQuery to translate citus_extradata_container + * to a shard relation OID. + */ + return false; + } + + List *taskList = currentJob->taskList; + if (list_length(taskList) != 1) + { + /* we only support plan caching for single shard queries */ + return false; + } + + Task *task = linitial(taskList); + if (!TaskAccessesLocalNode(task)) + { + /* not a local task */ + return false; + } + + if (!EnableLocalExecution) + { + /* user requested not to use local execution */ + return false; + } + + if (TransactionConnectedToLocalGroup) + { + /* transaction already connected to localhost */ + return false; + } + + Query *originalJobQuery = originalDistributedPlan->workerJob->jobQuery; + if (contain_volatile_functions((Node *) originalJobQuery)) + { + /* + * We do not cache plans with volatile functions in the query. + * + * The reason we care about volatile functions is primarily that we + * already executed them in ExecuteMasterEvaluableFunctionsAndParameters + * and since we're falling back to the original query tree here we would + * execute them again if we execute the plan. + */ + return false; + } + + return true; +} + + +/* + * RegenerateTaskListForInsert does the shard pruning for an INSERT query * queries and rebuilds the query strings. */ static void -HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan) +RegenerateTaskListForInsert(Job *workerJob) { - Job *workerJob = distributedPlan->workerJob; Query *jobQuery = workerJob->jobQuery; + bool parametersInJobQueryResolved = workerJob->parametersInJobQueryResolved; DeferredErrorMessage *planningError = NULL; /* need to perform shard pruning, rebuild the task list from scratch */ - List *taskList = RouterInsertTaskList(jobQuery, &planningError); + List *taskList = RouterInsertTaskList(jobQuery, parametersInJobQueryResolved, + &planningError); if (planningError != NULL) { @@ -575,21 +636,17 @@ HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan) workerJob->taskList = taskList; workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery); - RebuildQueryStrings(jobQuery, workerJob->taskList); + RebuildQueryStrings(workerJob); } /* - * HandleDeferredShardPruningForFastPathQueries does the shard pruning for + * RegenerateTaskForFasthPathQuery does the shard pruning for * UPDATE/DELETE/SELECT fast path router queries and rebuilds the query strings. */ static void -HandleDeferredShardPruningForFastPathQueries(DistributedPlan *distributedPlan) +RegenerateTaskForFasthPathQuery(Job *workerJob) { - Assert(distributedPlan->fastPathRouterPlan); - - Job *workerJob = distributedPlan->workerJob; - bool isMultiShardQuery = false; List *shardIntervalList = TargetShardIntervalForFastPathQuery(workerJob->jobQuery, @@ -756,10 +813,11 @@ static void CitusReScan(CustomScanState *node) { CitusScanState *scanState = (CitusScanState *) node; + Job *workerJob = scanState->distributedPlan->workerJob; EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; - if (paramListInfo != NULL) + if (paramListInfo != NULL && !workerJob->parametersInJobQueryResolved) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("Cursors for queries on distributed tables with " diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 89e5c9e7c..f460b96aa 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -184,15 +184,25 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) } else { - Query *shardQuery = ParseQueryString(TaskQueryString(task), parameterTypes, - numParams); + int taskNumParams = numParams; + Oid *taskParameterTypes = parameterTypes; + + if (task->parametersInQueryStringResolved) + { + /* + * Parameters were removed from the query string so do not pass them + * here. Otherwise, we might see errors when passing custom types, + * since their OIDs were set to 0 and their type is normally + * inferred from + */ + taskNumParams = 0; + taskParameterTypes = NULL; + } + + Query *shardQuery = ParseQueryString(TaskQueryString(task), + taskParameterTypes, + taskNumParams); - /* - * We should not consider using CURSOR_OPT_FORCE_DISTRIBUTED in case of - * intermediate results in the query. That'd trigger ExecuteLocalTaskPlan() - * go through the distributed executor, which we do not want since the - * query is already known to be local. - */ int cursorOptions = 0; /* diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 4c88dc167..f536e0105 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -17,6 +17,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/deparse_shard_query.h" #include "distributed/insert_select_planner.h" +#include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/metadata_cache.h" #include "distributed/multi_physical_planner.h" @@ -46,18 +47,20 @@ static char * DeparseTaskQuery(Task *task, Query *query); * include execution-time changes such as function evaluation. */ void -RebuildQueryStrings(Query *originalQuery, List *taskList) +RebuildQueryStrings(Job *workerJob) { - ListCell *taskCell = NULL; + Query *originalQuery = workerJob->jobQuery; + List *taskList = workerJob->taskList; Oid relationId = ((RangeTblEntry *) linitial(originalQuery->rtable))->relid; RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery); - foreach(taskCell, taskList) + Task *task = NULL; + + foreach_ptr(task, taskList) { - Task *task = (Task *) lfirst(taskCell); Query *query = originalQuery; - if (UpdateOrDeleteQuery(query) && list_length(taskList)) + if (UpdateOrDeleteQuery(query) && list_length(taskList) > 1) { query = copyObject(originalQuery); } @@ -115,6 +118,12 @@ RebuildQueryStrings(Query *originalQuery, List *taskList) UpdateTaskQueryString(query, relationId, valuesRTE, task); + /* + * If parameters were resolved in the job query, then they are now also + * resolved in the query string. + */ + task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved; + ereport(DEBUG4, (errmsg("query after rebuilding: %s", ApplyLogRedaction(TaskQueryString(task))))); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index e10c4dd66..8438f2880 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -158,11 +158,11 @@ static int CompareInsertValuesByShardId(const void *leftElement, const void *rightElement); static List * SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, - uint64 shardId); + uint64 shardId, bool parametersInQueryResolved); static bool RowLocksOnRelations(Node *node, List **rtiLockList); static List * SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, - uint64 shardId); + uint64 shardId, bool parametersInQueryResolved); static List * RemoveCoordinatorPlacement(List *placementList); static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType @@ -1447,7 +1447,9 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann } else { - taskList = RouterInsertTaskList(query, planningError); + bool parametersInQueryResolved = false; + + taskList = RouterInsertTaskList(query, parametersInQueryResolved, planningError); if (*planningError) { return NULL; @@ -1457,19 +1459,20 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); } + Job *job = CreateJob(originalQuery); + job->taskList = taskList; + job->requiresMasterEvaluation = requiresMasterEvaluation; + job->deferredPruning = deferredPruning; + if (!requiresMasterEvaluation) { /* no functions or parameters, build the query strings upfront */ - RebuildQueryStrings(originalQuery, taskList); + RebuildQueryStrings(job); /* remember the partition column value */ partitionKeyValue = ExtractInsertPartitionKeyValue(originalQuery); } - Job *job = CreateJob(originalQuery); - job->taskList = taskList; - job->requiresMasterEvaluation = requiresMasterEvaluation; - job->deferredPruning = deferredPruning; job->partitionKeyValue = partitionKeyValue; return job; @@ -1561,7 +1564,8 @@ ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry) * a distributed table via the router executor. */ List * -RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError) +RouterInsertTaskList(Query *query, bool parametersInQueryResolved, + DeferredErrorMessage **planningError) { List *insertTaskList = NIL; ListCell *modifyRouteCell = NULL; @@ -1593,8 +1597,8 @@ RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError) relationShard->relationId = distributedTableId; modifyTask->relationShardList = list_make1(relationShard); - modifyTask->taskPlacementList = ShardPlacementList(modifyRoute->shardId); + modifyTask->parametersInQueryStringResolved = parametersInQueryResolved; insertTaskList = lappend(insertTaskList, modifyTask); } @@ -1772,7 +1776,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, { job->taskList = SingleShardSelectTaskList(originalQuery, job->jobId, relationShardList, placementList, - shardId); + shardId, + job->parametersInJobQueryResolved); /* * Queries to reference tables, or distributed tables with multiple replica's have @@ -1798,7 +1803,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, { job->taskList = SingleShardModifyTaskList(originalQuery, job->jobId, relationShardList, placementList, - shardId); + shardId, + job->parametersInJobQueryResolved); } } @@ -1890,7 +1896,8 @@ RemoveCoordinatorPlacement(List *placementList) */ static List * SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList, - List *placementList, uint64 shardId) + List *placementList, uint64 shardId, + bool parametersInQueryResolved) { Task *task = CreateTask(SELECT_TASK); List *relationRowLockList = NIL; @@ -1908,6 +1915,7 @@ SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList, task->jobId = jobId; task->relationShardList = relationShardList; task->relationRowLockList = relationRowLockList; + task->parametersInQueryStringResolved = parametersInQueryResolved; return list_make1(task); } @@ -1960,7 +1968,8 @@ RowLocksOnRelations(Node *node, List **relationRowLockList) */ static List * SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, - List *placementList, uint64 shardId) + List *placementList, uint64 shardId, + bool parametersInQueryResolved) { Task *task = CreateTask(MODIFY_TASK); List *rangeTableList = NIL; @@ -1987,6 +1996,7 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, task->jobId = jobId; task->relationShardList = relationShardList; task->replicationModel = modificationTableCacheEntry->replicationModel; + task->parametersInQueryStringResolved = parametersInQueryResolved; return list_make1(task); } diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 8f1efdda2..b5ea6562b 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -45,6 +45,11 @@ static bool ShouldEvaluateFunctionWithMasterContext(MasterEvaluationContext * bool RequiresMasterEvaluation(Query *query) { + if (query->commandType == CMD_SELECT) + { + return false; + } + return FindNodeCheck((Node *) query, CitusIsMutableFunction); } diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index dcac8b8c7..26ede196e 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -86,6 +86,7 @@ copyJobInfo(Job *newnode, Job *from) COPY_SCALAR_FIELD(deferredPruning); COPY_NODE_FIELD(partitionKeyValue); COPY_NODE_FIELD(localPlannedStatements); + COPY_SCALAR_FIELD(parametersInJobQueryResolved); } @@ -274,6 +275,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_NODE_FIELD(relationRowLockList); COPY_NODE_FIELD(rowValuesLists); COPY_SCALAR_FIELD(partiallyLocalOrRemote); + COPY_SCALAR_FIELD(parametersInQueryStringResolved); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index ead94ef78..73df7fa23 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -339,6 +339,7 @@ OutJobFields(StringInfo str, const Job *node) WRITE_BOOL_FIELD(deferredPruning); WRITE_NODE_FIELD(partitionKeyValue); WRITE_NODE_FIELD(localPlannedStatements); + WRITE_BOOL_FIELD(parametersInJobQueryResolved); } @@ -492,6 +493,7 @@ OutTask(OUTFUNC_ARGS) WRITE_NODE_FIELD(relationRowLockList); WRITE_NODE_FIELD(rowValuesLists); WRITE_BOOL_FIELD(partiallyLocalOrRemote); + WRITE_BOOL_FIELD(parametersInQueryStringResolved); } diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 54c176ca0..331903fa6 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -21,7 +21,7 @@ #include "distributed/citus_custom_scan.h" -extern void RebuildQueryStrings(Query *originalQuery, List *taskList); +extern void RebuildQueryStrings(Job *workerJob); extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern void SetTaskQuery(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 0773ed5d4..e8db507d9 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -157,6 +157,13 @@ typedef struct Job /* for local shard queries, we may save the local plan here */ List *localPlannedStatements; + + /* + * When we evaluate functions and parameters in jobQuery then we + * should no longer send the list of parameters along with the + * query. + */ + bool parametersInJobQueryResolved; } Job; @@ -274,6 +281,13 @@ typedef struct Task * the task splitted into local and remote tasks. */ bool partiallyLocalOrRemote; + + /* + * When we evaluate functions and parameters in the query string then + * we should no longer send the list of parameters long with the + * query. + */ + bool parametersInQueryStringResolved; } Task; diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 80a625f54..641a8dcaf 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -47,7 +47,8 @@ extern List * RelationShardListForShardIntervalList(List *shardIntervalList, bool *shardsPresent); extern List * FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, bool replacePrunedQueryWithDummy); -extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError); +extern List * RouterInsertTaskList(Query *query, bool parametersInQueryResolved, + DeferredErrorMessage **planningError); extern Const * ExtractInsertPartitionKeyValue(Query *query); extern List * TargetShardIntervalsForRestrictInfo(RelationRestrictionContext * restrictionContext,