Refactor CitusBeginScan into separate DML / SELECT paths

pull/3553/head
Marco Slot 2020-03-02 17:57:40 +01:00
parent 268ad741a9
commit dc4c0c032e
11 changed files with 332 additions and 221 deletions

View File

@ -3185,7 +3185,7 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
session->currentTask = placementExecution;
placementExecution->executionState = PLACEMENT_EXECUTION_RUNNING;
if (paramListInfo != NULL)
if (paramListInfo != NULL && !task->parametersInQueryStringResolved)
{
int parameterCount = paramListInfo->numParams;
Oid *parameterTypes = NULL;

View File

@ -16,6 +16,7 @@
#include "distributed/citus_clauses.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/insert_select_executor.h"
@ -48,17 +49,17 @@ static Node * DelayedErrorCreateScan(CustomScan *scan);
/* functions that are common to different scans */
static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags);
static void CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate,
int eflags);
static void CitusBeginSelectScan(CustomScanState *node, EState *estate, int eflags);
static void CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags);
static void CitusPreExecScan(CitusScanState *scanState);
static void HandleDeferredShardPruningForFastPathQueries(
DistributedPlan *distributedPlan);
static void HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan);
static void CacheLocalPlanForTask(Task *task, DistributedPlan *originalDistributedPlan);
static DistributedPlan * CopyDistributedPlanWithoutCache(CitusScanState *scanState);
static void ResetExecutionParameters(EState *executorState);
static void CitusBeginScanWithoutCoordinatorProcessing(CustomScanState *node,
EState *estate, int eflags);
static void RegenerateTaskForFasthPathQuery(Job *workerJob);
static void RegenerateTaskListForInsert(Job *workerJob);
static void CacheLocalPlanForShardQuery(Task *task,
DistributedPlan *originalDistributedPlan);
static bool IsLocalPlanCachingSupported(Job *workerJob,
DistributedPlan *originalDistributedPlan);
static DistributedPlan * CopyDistributedPlanWithoutCache(
DistributedPlan *originalDistributedPlan);
static void CitusEndScan(CustomScanState *node);
static void CitusReScan(CustomScanState *node);
@ -187,16 +188,22 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
#endif
DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = distributedPlan->workerJob;
if (workerJob &&
(workerJob->requiresMasterEvaluation || workerJob->deferredPruning))
if (distributedPlan->insertSelectQuery != NULL)
{
CitusBeginScanWithCoordinatorProcessing(node, estate, eflags);
/*
* INSERT..SELECT via coordinator or re-partitioning are special because
* the SELECT part is planned separately.
*/
return;
}
CitusBeginScanWithoutCoordinatorProcessing(node, estate, eflags);
else if (distributedPlan->modLevel == ROW_MODIFY_READONLY)
{
CitusBeginSelectScan(node, estate, eflags);
}
else
{
CitusBeginModifyScan(node, estate, eflags);
}
}
@ -233,179 +240,176 @@ CitusExecScan(CustomScanState *node)
/*
* CitusBeginScanWithoutCoordinatorProcessing is intended to work on all executions
* that do not require any coordinator processing. The function simply acquires the
* necessary locks on the shards involved in the task list of the distributed plan
* and does the placement assignements. This implies that the function is a no-op for
* SELECT queries as they do not require any locking and placement assignements.
* CitusBeginSelectScan handles deferred pruning and plan caching for SELECTs.
*/
static void
CitusBeginScanWithoutCoordinatorProcessing(CustomScanState *node, EState *estate, int
eflags)
CitusBeginSelectScan(CustomScanState *node, EState *estate, int eflags)
{
CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *distributedPlan = scanState->distributedPlan;
DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
if (distributedPlan->modLevel == ROW_MODIFY_READONLY ||
distributedPlan->insertSelectQuery != NULL)
if (!originalDistributedPlan->workerJob->deferredPruning)
{
/*
* For SELECT queries that have already been pruned we can proceed straight
* to execution, since none of the prepared statement logic applies.
*/
return;
}
/* we'll be modifying the distributed plan by assigning taskList, do it on a copy */
distributedPlan = copyObject(distributedPlan);
scanState->distributedPlan = distributedPlan;
/*
* Create a copy of the generic plan for the current execution, but make a shallow
* copy of the plan cache. That means we'll be able to access the plan cache via
* currentPlan->workerJob->localPlannedStatements, but it will be preserved across
* executions by the prepared statement logic.
*/
DistributedPlan *currentPlan =
CopyDistributedPlanWithoutCache(originalDistributedPlan);
scanState->distributedPlan = currentPlan;
Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList;
Job *workerJob = currentPlan->workerJob;
Query *jobQuery = workerJob->jobQuery;
PlanState *planState = &(scanState->customScanState.ss.ps);
/*
* These more complex jobs should have been evaluated in
* CitusBeginScanWithCoordinatorProcessing.
* We only do deferred pruning for fast path queries, which have a single
* partition column value.
*/
Assert(!(workerJob->requiresMasterEvaluation || workerJob->deferredPruning));
Assert(currentPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner);
/* prevent concurrent placement changes */
AcquireMetadataLocks(taskList);
/*
* Evaluate parameters, because the parameters are only available on the
* coordinator and are required for pruning.
*
* We don't evaluate functions for read-only queries on the coordinator
* at the moment. Most function calls would be in a context where they
* should be re-evaluated for every row in case of volatile functions.
*
* TODO: evaluate stable functions
*/
ExecuteMasterEvaluableParameters(jobQuery, planState);
/* modify tasks are always assigned using first-replica policy */
workerJob->taskList = FirstReplicaAssignTaskList(taskList);
/* job query no longer has parameters, so we should not send any */
workerJob->parametersInJobQueryResolved = true;
/* parameters are filled in, so we can generate a task for this execution */
RegenerateTaskForFasthPathQuery(workerJob);
if (IsLocalPlanCachingSupported(workerJob, originalDistributedPlan))
{
Task *task = linitial(workerJob->taskList);
/*
* We are going to execute this task locally. If it's not already in
* the cache, create a local plan now and add it to the cache. During
* execution, we will get the plan from the cache.
*
* The plan will be cached across executions when originalDistributedPlan
* represents a prepared statement.
*/
CacheLocalPlanForShardQuery(task, originalDistributedPlan);
}
}
/*
* CitusBeginScanWithCoordinatorProcessing generates query strings at the start of the execution
* in two cases: when the query requires master evaluation and/or deferred shard pruning.
* CitusBeginModifyScan prepares the scan state for a modification.
*
* The function is also smart about caching plans if the plan is local to this node.
* Modifications are special because:
* a) we evaluate function calls (e.g. nextval) here and the outcome may
* determine which shards are affected by this query.
* b) we need to take metadata locks to make sure no write is left behind
* when finalizing a shard move.
*/
static void
CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, int eflags)
CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
{
CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
DistributedPlan *distributedPlan = CopyDistributedPlanWithoutCache(scanState);
Job *workerJob = distributedPlan->workerJob;
Query *jobQuery = workerJob->jobQuery;
/* we'd only get to this function with the following conditions */
Assert(workerJob->requiresMasterEvaluation || workerJob->deferredPruning);
PlanState *planState = &(scanState->customScanState.ss.ps);
DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
/* citus only evaluates functions for modification queries */
bool modifyQueryRequiresMasterEvaluation =
jobQuery->commandType != CMD_SELECT &&
(workerJob->requiresMasterEvaluation || workerJob->deferredPruning);
DistributedPlan *currentPlan =
CopyDistributedPlanWithoutCache(originalDistributedPlan);
scanState->distributedPlan = currentPlan;
/*
* ExecuteMasterEvaluableFunctions handles both function evalation
* and parameter evaluation. Pruning is most likely deferred because
* there is a parameter on the distribution key. So, evaluate in both
* cases.
*/
if (modifyQueryRequiresMasterEvaluation)
Job *workerJob = currentPlan->workerJob;
Query *jobQuery = workerJob->jobQuery;
bool evaluateAllExpressions = workerJob->requiresMasterEvaluation ||
workerJob->deferredPruning;
if (evaluateAllExpressions)
{
/* evaluate functions and parameters for modification queries */
/* evaluate both functions and parameters */
ExecuteMasterEvaluableFunctionsAndParameters(jobQuery, planState);
}
else if (jobQuery->commandType == CMD_SELECT && !workerJob->deferredPruning)
{
/* we'll use generated strings, no need to have the parameters anymore */
EState *executorState = planState->state;
ResetExecutionParameters(executorState);
/* we're done, we don't want to evaluate functions for SELECT queries */
return;
/* job query no longer has parameters, so we should not send any */
workerJob->parametersInJobQueryResolved = true;
}
else if (jobQuery->commandType == CMD_SELECT && workerJob->deferredPruning)
if (workerJob->deferredPruning)
{
/*
* Evaluate parameters, because the parameters are only avaliable on the
* coordinator and are required for pruning.
* At this point, we're about to do the shard pruning for fast-path queries.
* Given that pruning is deferred always for INSERTs, we get here
* !EnableFastPathRouterPlanner as well.
*/
Assert(currentPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner);
/*
* We can only now decide which shard to use, so we need to build a new task
* list.
*/
if (jobQuery->commandType == CMD_INSERT)
{
RegenerateTaskListForInsert(workerJob);
}
else
{
RegenerateTaskForFasthPathQuery(workerJob);
}
}
else if (workerJob->requiresMasterEvaluation)
{
/*
* When there is no deferred pruning, but we did evaluate functions, then
* we only rebuild the query strings in the existing tasks.
*/
RebuildQueryStrings(workerJob);
}
/*
* Now that we know the shard ID(s) we can acquire the necessary shard metadata
* locks. Once we have the locks it's safe to load the placement metadata.
*/
/* prevent concurrent placement changes */
AcquireMetadataLocks(workerJob->taskList);
/* modify tasks are always assigned using first-replica policy */
workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList);
/*
* Now that we have populated the task placements we can determine whether
* any of them are local to this node and cache a plan if needed.
*/
if (IsLocalPlanCachingSupported(workerJob, originalDistributedPlan))
{
Task *task = linitial(workerJob->taskList);
/*
* We are going to execute this task locally. If it's not already in
* the cache, create a local plan now and add it to the cache. During
* execution, we will get the plan from the cache.
*
* But, we don't want to evaluate functions for read-only queries on the
* coordinator as the volatile functions could yield different
* results per shard (also per row) and could have side-effects.
* WARNING: In this function we'll use the original plan with the original
* query tree, meaning parameters and function calls are back and we'll
* redo evaluation in the local (Postgres) executor. The reason we do this
* is that we only need to cache one generic plan per shard.
*
* Note that Citus already errors out for modification queries during
* planning when the query involve any volatile function that might
* diverge the shards as such functions are expected to yield different
* results per shard (also per row).
* The plan will be cached across executions when originalDistributedPlan
* represents a prepared statement.
*/
ExecuteMasterEvaluableParameters(jobQuery, planState);
}
/*
* After evaluating the function/parameters, we're done unless shard pruning
* is also deferred.
*/
if (workerJob->requiresMasterEvaluation && !workerJob->deferredPruning)
{
RebuildQueryStrings(workerJob->jobQuery, workerJob->taskList);
/* we'll use generated strings, no need to have the parameters anymore */
EState *executorState = planState->state;
ResetExecutionParameters(executorState);
return;
}
/*
* At this point, we're about to do the shard pruning for fast-path queries.
* Given that pruning is deferred always for INSERTs, we get here
* !EnableFastPathRouterPlanner as well.
*/
Assert(workerJob->deferredPruning &&
(distributedPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner));
if (jobQuery->commandType == CMD_INSERT)
{
HandleDeferredShardPruningForInserts(distributedPlan);
}
else
{
HandleDeferredShardPruningForFastPathQueries(distributedPlan);
}
if (jobQuery->commandType != CMD_SELECT)
{
/* prevent concurrent placement changes */
AcquireMetadataLocks(workerJob->taskList);
/* modify tasks are always assigned using first-replica policy */
workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList);
}
if (list_length(distributedPlan->workerJob->taskList) != 1)
{
/*
* We might have zero shard queries or multi-row INSERTs at this point,
* we only want to cache single task queries.
*/
return;
}
/*
* As long as the task accesses local node and the query doesn't have
* any volatile functions, we cache the local Postgres plan on the
* shard for re-use.
*/
Task *task = linitial(distributedPlan->workerJob->taskList);
if (EnableLocalExecution && TaskAccessesLocalNode(task) &&
!contain_volatile_functions(
(Node *) originalDistributedPlan->workerJob->jobQuery))
{
CacheLocalPlanForTask(task, originalDistributedPlan);
}
else
{
/*
* If we're not going to use a cached plan, we'll use the query string that is
* already generated where the parameters are replaced, so we should not have
* the parameters anymore.
*/
EState *executorState = planState->state;
ResetExecutionParameters(executorState);
CacheLocalPlanForShardQuery(task, originalDistributedPlan);
}
}
@ -422,15 +426,13 @@ CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate, i
* reasons, as they are immutable, so no need to have a deep copy.
*/
static DistributedPlan *
CopyDistributedPlanWithoutCache(CitusScanState *scanState)
CopyDistributedPlanWithoutCache(DistributedPlan *originalDistributedPlan)
{
DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
List *localPlannedStatements =
originalDistributedPlan->workerJob->localPlannedStatements;
originalDistributedPlan->workerJob->localPlannedStatements = NIL;
DistributedPlan *distributedPlan = copyObject(originalDistributedPlan);
scanState->distributedPlan = distributedPlan;
/* set back the immutable field */
originalDistributedPlan->workerJob->localPlannedStatements = localPlannedStatements;
@ -441,30 +443,12 @@ CopyDistributedPlanWithoutCache(CitusScanState *scanState)
/*
* ResetExecutionParameters set the parameter list to NULL. See the function
* for details.
* CacheLocalPlanForShardQuery replaces the relation OIDs in the job query
* with shard relation OIDs and then plans the query and caches the result
* in the originalDistributedPlan (which may be preserved across executions).
*/
static void
ResetExecutionParameters(EState *executorState)
{
/*
* We've processed parameters in ExecuteMasterEvaluableFunctions and
* don't need to send their values to workers, since they will be
* represented as constants in the deparsed query. To avoid sending
* parameter values, we set the parameter list to NULL.
*/
executorState->es_param_list_info = NULL;
}
/*
* CacheLocalPlanForTask caches a plan that is local to this node in the
* originalDistributedPlan.
*
* The basic idea is to be able to skip planning on the shards when possible.
*/
static void
CacheLocalPlanForTask(Task *task, DistributedPlan *originalDistributedPlan)
CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan)
{
PlannedStmt *localPlan = GetCachedLocalPlan(task, originalDistributedPlan);
if (localPlan != NULL)
@ -512,6 +496,15 @@ CacheLocalPlanForTask(Task *task, DistributedPlan *originalDistributedPlan)
return;
}
if (IsLoggableLevel(DEBUG5))
{
StringInfo queryString = makeStringInfo();
pg_get_query_def(shardQuery, queryString);
ereport(DEBUG5, (errmsg("caching plan for query: %s",
queryString->data)));
}
LockRelationOid(rangeTableEntry->relid, lockMode);
LocalPlannedStatement *localPlannedStatement = CitusMakeNode(LocalPlannedStatement);
@ -554,18 +547,86 @@ GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan)
/*
* HandleDeferredShardPruningForInserts does the shard pruning for INSERT
* IsLocalPlanCachingSupported returns whether (part of) the task can be planned
* and executed locally and whether caching is supported (single shard, no volatile
* functions).
*/
static bool
IsLocalPlanCachingSupported(Job *currentJob, DistributedPlan *originalDistributedPlan)
{
if (!currentJob->deferredPruning)
{
/*
* When not using deferred pruning we may have already replaced distributed
* table RTEs with citus_extradata_container RTEs to pass the shard ID to the
* deparser. In that case, we cannot pass the query tree directly to the
* planner.
*
* If desired, we can relax this check by improving the implementation of
* CacheLocalPlanForShardQuery to translate citus_extradata_container
* to a shard relation OID.
*/
return false;
}
List *taskList = currentJob->taskList;
if (list_length(taskList) != 1)
{
/* we only support plan caching for single shard queries */
return false;
}
Task *task = linitial(taskList);
if (!TaskAccessesLocalNode(task))
{
/* not a local task */
return false;
}
if (!EnableLocalExecution)
{
/* user requested not to use local execution */
return false;
}
if (TransactionConnectedToLocalGroup)
{
/* transaction already connected to localhost */
return false;
}
Query *originalJobQuery = originalDistributedPlan->workerJob->jobQuery;
if (contain_volatile_functions((Node *) originalJobQuery))
{
/*
* We do not cache plans with volatile functions in the query.
*
* The reason we care about volatile functions is primarily that we
* already executed them in ExecuteMasterEvaluableFunctionsAndParameters
* and since we're falling back to the original query tree here we would
* execute them again if we execute the plan.
*/
return false;
}
return true;
}
/*
* RegenerateTaskListForInsert does the shard pruning for an INSERT query
* queries and rebuilds the query strings.
*/
static void
HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan)
RegenerateTaskListForInsert(Job *workerJob)
{
Job *workerJob = distributedPlan->workerJob;
Query *jobQuery = workerJob->jobQuery;
bool parametersInJobQueryResolved = workerJob->parametersInJobQueryResolved;
DeferredErrorMessage *planningError = NULL;
/* need to perform shard pruning, rebuild the task list from scratch */
List *taskList = RouterInsertTaskList(jobQuery, &planningError);
List *taskList = RouterInsertTaskList(jobQuery, parametersInJobQueryResolved,
&planningError);
if (planningError != NULL)
{
@ -575,21 +636,17 @@ HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan)
workerJob->taskList = taskList;
workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery);
RebuildQueryStrings(jobQuery, workerJob->taskList);
RebuildQueryStrings(workerJob);
}
/*
* HandleDeferredShardPruningForFastPathQueries does the shard pruning for
* RegenerateTaskForFasthPathQuery does the shard pruning for
* UPDATE/DELETE/SELECT fast path router queries and rebuilds the query strings.
*/
static void
HandleDeferredShardPruningForFastPathQueries(DistributedPlan *distributedPlan)
RegenerateTaskForFasthPathQuery(Job *workerJob)
{
Assert(distributedPlan->fastPathRouterPlan);
Job *workerJob = distributedPlan->workerJob;
bool isMultiShardQuery = false;
List *shardIntervalList =
TargetShardIntervalForFastPathQuery(workerJob->jobQuery,
@ -756,10 +813,11 @@ static void
CitusReScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
Job *workerJob = scanState->distributedPlan->workerJob;
EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info;
if (paramListInfo != NULL)
if (paramListInfo != NULL && !workerJob->parametersInJobQueryResolved)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Cursors for queries on distributed tables with "

View File

@ -184,15 +184,25 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
}
else
{
Query *shardQuery = ParseQueryString(TaskQueryString(task), parameterTypes,
numParams);
int taskNumParams = numParams;
Oid *taskParameterTypes = parameterTypes;
if (task->parametersInQueryStringResolved)
{
/*
* Parameters were removed from the query string so do not pass them
* here. Otherwise, we might see errors when passing custom types,
* since their OIDs were set to 0 and their type is normally
* inferred from
*/
taskNumParams = 0;
taskParameterTypes = NULL;
}
Query *shardQuery = ParseQueryString(TaskQueryString(task),
taskParameterTypes,
taskNumParams);
/*
* We should not consider using CURSOR_OPT_FORCE_DISTRIBUTED in case of
* intermediate results in the query. That'd trigger ExecuteLocalTaskPlan()
* go through the distributed executor, which we do not want since the
* query is already known to be local.
*/
int cursorOptions = 0;
/*

View File

@ -17,6 +17,7 @@
#include "distributed/citus_ruleutils.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/insert_select_planner.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_physical_planner.h"
@ -46,18 +47,20 @@ static char * DeparseTaskQuery(Task *task, Query *query);
* include execution-time changes such as function evaluation.
*/
void
RebuildQueryStrings(Query *originalQuery, List *taskList)
RebuildQueryStrings(Job *workerJob)
{
ListCell *taskCell = NULL;
Query *originalQuery = workerJob->jobQuery;
List *taskList = workerJob->taskList;
Oid relationId = ((RangeTblEntry *) linitial(originalQuery->rtable))->relid;
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery);
foreach(taskCell, taskList)
Task *task = NULL;
foreach_ptr(task, taskList)
{
Task *task = (Task *) lfirst(taskCell);
Query *query = originalQuery;
if (UpdateOrDeleteQuery(query) && list_length(taskList))
if (UpdateOrDeleteQuery(query) && list_length(taskList) > 1)
{
query = copyObject(originalQuery);
}
@ -115,6 +118,12 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
UpdateTaskQueryString(query, relationId, valuesRTE, task);
/*
* If parameters were resolved in the job query, then they are now also
* resolved in the query string.
*/
task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved;
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
ApplyLogRedaction(TaskQueryString(task)))));
}

View File

@ -158,11 +158,11 @@ static int CompareInsertValuesByShardId(const void *leftElement,
const void *rightElement);
static List * SingleShardSelectTaskList(Query *query, uint64 jobId,
List *relationShardList, List *placementList,
uint64 shardId);
uint64 shardId, bool parametersInQueryResolved);
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
static List * SingleShardModifyTaskList(Query *query, uint64 jobId,
List *relationShardList, List *placementList,
uint64 shardId);
uint64 shardId, bool parametersInQueryResolved);
static List * RemoveCoordinatorPlacement(List *placementList);
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
TaskAssignmentPolicyType
@ -1447,7 +1447,9 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann
}
else
{
taskList = RouterInsertTaskList(query, planningError);
bool parametersInQueryResolved = false;
taskList = RouterInsertTaskList(query, parametersInQueryResolved, planningError);
if (*planningError)
{
return NULL;
@ -1457,19 +1459,20 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann
requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
}
Job *job = CreateJob(originalQuery);
job->taskList = taskList;
job->requiresMasterEvaluation = requiresMasterEvaluation;
job->deferredPruning = deferredPruning;
if (!requiresMasterEvaluation)
{
/* no functions or parameters, build the query strings upfront */
RebuildQueryStrings(originalQuery, taskList);
RebuildQueryStrings(job);
/* remember the partition column value */
partitionKeyValue = ExtractInsertPartitionKeyValue(originalQuery);
}
Job *job = CreateJob(originalQuery);
job->taskList = taskList;
job->requiresMasterEvaluation = requiresMasterEvaluation;
job->deferredPruning = deferredPruning;
job->partitionKeyValue = partitionKeyValue;
return job;
@ -1561,7 +1564,8 @@ ErrorIfNoShardsExist(DistTableCacheEntry *cacheEntry)
* a distributed table via the router executor.
*/
List *
RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError)
RouterInsertTaskList(Query *query, bool parametersInQueryResolved,
DeferredErrorMessage **planningError)
{
List *insertTaskList = NIL;
ListCell *modifyRouteCell = NULL;
@ -1593,8 +1597,8 @@ RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError)
relationShard->relationId = distributedTableId;
modifyTask->relationShardList = list_make1(relationShard);
modifyTask->taskPlacementList = ShardPlacementList(modifyRoute->shardId);
modifyTask->parametersInQueryStringResolved = parametersInQueryResolved;
insertTaskList = lappend(insertTaskList, modifyTask);
}
@ -1772,7 +1776,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
{
job->taskList = SingleShardSelectTaskList(originalQuery, job->jobId,
relationShardList, placementList,
shardId);
shardId,
job->parametersInJobQueryResolved);
/*
* Queries to reference tables, or distributed tables with multiple replica's have
@ -1798,7 +1803,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
{
job->taskList = SingleShardModifyTaskList(originalQuery, job->jobId,
relationShardList, placementList,
shardId);
shardId,
job->parametersInJobQueryResolved);
}
}
@ -1890,7 +1896,8 @@ RemoveCoordinatorPlacement(List *placementList)
*/
static List *
SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList,
List *placementList, uint64 shardId)
List *placementList, uint64 shardId,
bool parametersInQueryResolved)
{
Task *task = CreateTask(SELECT_TASK);
List *relationRowLockList = NIL;
@ -1908,6 +1915,7 @@ SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList,
task->jobId = jobId;
task->relationShardList = relationShardList;
task->relationRowLockList = relationRowLockList;
task->parametersInQueryStringResolved = parametersInQueryResolved;
return list_make1(task);
}
@ -1960,7 +1968,8 @@ RowLocksOnRelations(Node *node, List **relationRowLockList)
*/
static List *
SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList,
List *placementList, uint64 shardId)
List *placementList, uint64 shardId,
bool parametersInQueryResolved)
{
Task *task = CreateTask(MODIFY_TASK);
List *rangeTableList = NIL;
@ -1987,6 +1996,7 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList,
task->jobId = jobId;
task->relationShardList = relationShardList;
task->replicationModel = modificationTableCacheEntry->replicationModel;
task->parametersInQueryStringResolved = parametersInQueryResolved;
return list_make1(task);
}

View File

@ -45,6 +45,11 @@ static bool ShouldEvaluateFunctionWithMasterContext(MasterEvaluationContext *
bool
RequiresMasterEvaluation(Query *query)
{
if (query->commandType == CMD_SELECT)
{
return false;
}
return FindNodeCheck((Node *) query, CitusIsMutableFunction);
}

View File

@ -86,6 +86,7 @@ copyJobInfo(Job *newnode, Job *from)
COPY_SCALAR_FIELD(deferredPruning);
COPY_NODE_FIELD(partitionKeyValue);
COPY_NODE_FIELD(localPlannedStatements);
COPY_SCALAR_FIELD(parametersInJobQueryResolved);
}
@ -274,6 +275,7 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_NODE_FIELD(relationRowLockList);
COPY_NODE_FIELD(rowValuesLists);
COPY_SCALAR_FIELD(partiallyLocalOrRemote);
COPY_SCALAR_FIELD(parametersInQueryStringResolved);
}

View File

@ -339,6 +339,7 @@ OutJobFields(StringInfo str, const Job *node)
WRITE_BOOL_FIELD(deferredPruning);
WRITE_NODE_FIELD(partitionKeyValue);
WRITE_NODE_FIELD(localPlannedStatements);
WRITE_BOOL_FIELD(parametersInJobQueryResolved);
}
@ -492,6 +493,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_NODE_FIELD(relationRowLockList);
WRITE_NODE_FIELD(rowValuesLists);
WRITE_BOOL_FIELD(partiallyLocalOrRemote);
WRITE_BOOL_FIELD(parametersInQueryStringResolved);
}

View File

@ -21,7 +21,7 @@
#include "distributed/citus_custom_scan.h"
extern void RebuildQueryStrings(Query *originalQuery, List *taskList);
extern void RebuildQueryStrings(Job *workerJob);
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
extern void SetTaskQuery(Task *task, Query *query);
extern void SetTaskQueryString(Task *task, char *queryString);

View File

@ -157,6 +157,13 @@ typedef struct Job
/* for local shard queries, we may save the local plan here */
List *localPlannedStatements;
/*
* When we evaluate functions and parameters in jobQuery then we
* should no longer send the list of parameters along with the
* query.
*/
bool parametersInJobQueryResolved;
} Job;
@ -274,6 +281,13 @@ typedef struct Task
* the task splitted into local and remote tasks.
*/
bool partiallyLocalOrRemote;
/*
* When we evaluate functions and parameters in the query string then
* we should no longer send the list of parameters long with the
* query.
*/
bool parametersInQueryStringResolved;
} Task;

View File

@ -47,7 +47,8 @@ extern List * RelationShardListForShardIntervalList(List *shardIntervalList,
bool *shardsPresent);
extern List * FindRouterWorkerList(List *shardIntervalList, bool shardsPresent,
bool replacePrunedQueryWithDummy);
extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError);
extern List * RouterInsertTaskList(Query *query, bool parametersInQueryResolved,
DeferredErrorMessage **planningError);
extern Const * ExtractInsertPartitionKeyValue(Query *query);
extern List * TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *
restrictionContext,