From cb3d90bdc827233c9b2eaf69db1dfb0b7e0be3b1 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 6 Mar 2020 10:15:56 +0100 Subject: [PATCH] Simplify INSERT logic in router planner --- .../distributed/executor/citus_custom_scan.c | 41 ++++++- .../planner/multi_router_planner.c | 116 +++--------------- .../regress/expected/multi_router_planner.out | 1 + .../multi_router_planner_fast_path.out | 1 + 4 files changed, 52 insertions(+), 107 deletions(-) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 155795f8b..c0de40978 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -52,6 +52,7 @@ static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags); static void CitusBeginSelectScan(CustomScanState *node, EState *estate, int eflags); static void CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags); static void CitusPreExecScan(CitusScanState *scanState); +static bool ModifyJobNeedsEvaluation(Job *workerJob); static void RegenerateTaskForFasthPathQuery(Job *workerJob); static void RegenerateTaskListForInsert(Job *workerJob); static void CacheLocalPlanForShardQuery(Task *task, @@ -334,10 +335,8 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags) Job *workerJob = currentPlan->workerJob; Query *jobQuery = workerJob->jobQuery; - bool evaluateAllExpressions = workerJob->requiresMasterEvaluation || - workerJob->deferredPruning; - if (evaluateAllExpressions) + if (ModifyJobNeedsEvaluation(workerJob)) { /* evaluate both functions and parameters */ ExecuteMasterEvaluableFunctionsAndParameters(jobQuery, planState); @@ -414,6 +413,30 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags) } +/* + * ModifyJobNeedsEvaluation checks whether the functions and parameters in the job query + * need to be evaluated before we can build task query strings. + */ +static bool +ModifyJobNeedsEvaluation(Job *workerJob) +{ + if (workerJob->requiresMasterEvaluation) + { + /* query contains functions that need to be evaluated on the coordinator */ + return true; + } + + if (workerJob->partitionKeyValue != NULL) + { + /* the value of the distribution column is already known */ + return false; + } + + /* pruning was deferred due to a parameter in the partition column */ + return workerJob->deferredPruning; +} + + /* * CopyDistributedPlanWithoutCache is a helper function which copies the * distributedPlan into the current memory context. @@ -630,14 +653,22 @@ RegenerateTaskListForInsert(Job *workerJob) /* need to perform shard pruning, rebuild the task list from scratch */ List *taskList = RouterInsertTaskList(jobQuery, parametersInJobQueryResolved, &planningError); - if (planningError != NULL) { RaiseDeferredError(planningError, ERROR); } workerJob->taskList = taskList; - workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery); + + if (workerJob->partitionKeyValue == NULL) + { + /* + * If we were not able to determine the partition key value in the planner, + * take another shot now. It may still be NULL in case of a multi-row + * insert. + */ + workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery); + } RebuildQueryStrings(workerJob); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index fde919af1..588bbd3db 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -133,11 +133,9 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *sta static bool MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context); static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTree); -static Job * RouterInsertJob(Query *originalQuery, Query *query, - DeferredErrorMessage **planningError); +static Job * RouterInsertJob(Query *originalQuery); static void ErrorIfNoShardsExist(CitusTableCacheEntry *cacheEntry); static DeferredErrorMessage * DeferErrorIfModifyView(Query *queryTree); -static bool CanShardPrune(Oid distributedTableId, Query *query); static Job * CreateJob(Query *query); static Task * CreateTask(TaskType taskType); static Job * RouterJob(Query *originalQuery, @@ -225,7 +223,7 @@ CreateModifyPlan(Query *originalQuery, Query *query, } else { - job = RouterInsertJob(originalQuery, query, &distributedPlan->planningError); + job = RouterInsertJob(originalQuery); } if (distributedPlan->planningError != NULL) @@ -1407,73 +1405,24 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre /* - * RouterInsertJob builds a Job to represent an insertion performed by - * the provided query against the provided shard interval. This task contains - * shard-extended deparsed SQL to be run during execution. + * RouterInsertJob builds a Job to represent an insertion performed by the provided + * query. For inserts we always defer shard pruning and generating the task list to + * the executor. */ static Job * -RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **planningError) +RouterInsertJob(Query *originalQuery) { - Oid distributedTableId = ExtractFirstCitusTableId(query); - List *taskList = NIL; - bool requiresMasterEvaluation = false; - bool deferredPruning = false; - Const *partitionKeyValue = NULL; - - bool isMultiRowInsert = IsMultiRowInsert(query); + bool isMultiRowInsert = IsMultiRowInsert(originalQuery); if (isMultiRowInsert) { /* add default expressions to RTE_VALUES in multi-row INSERTs */ NormalizeMultiRowInsertTargetList(originalQuery); } - if (isMultiRowInsert || !CanShardPrune(distributedTableId, query)) - { - /* - * If there is a non-constant (e.g. parameter, function call) in the partition - * column of the INSERT then we defer shard pruning until the executor where - * these values are known. - * - * XXX: We also defer pruning for multi-row INSERTs because of some current - * limitations with the way multi-row INSERTs are handled. Most notably, we - * don't evaluate functions in task->rowValuesList. Therefore we need to - * perform function evaluation before we can run RouterInsertTaskList. - */ - taskList = NIL; - deferredPruning = true; - - /* must evaluate the non-constant in the partition column */ - requiresMasterEvaluation = true; - } - else - { - bool parametersInQueryResolved = false; - - taskList = RouterInsertTaskList(query, parametersInQueryResolved, planningError); - if (*planningError) - { - return NULL; - } - - /* determine whether there are function calls to evaluate */ - requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); - } - Job *job = CreateJob(originalQuery); - job->taskList = taskList; - job->requiresMasterEvaluation = requiresMasterEvaluation; - job->deferredPruning = deferredPruning; - - if (!requiresMasterEvaluation) - { - /* no functions or parameters, build the query strings upfront */ - RebuildQueryStrings(job); - - /* remember the partition column value */ - partitionKeyValue = ExtractInsertPartitionKeyValue(originalQuery); - } - - job->partitionKeyValue = partitionKeyValue; + job->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); + job->deferredPruning = true; + job->partitionKeyValue = ExtractInsertPartitionKeyValue(originalQuery); return job; } @@ -1498,45 +1447,6 @@ CreateJob(Query *query) } -/* - * CanShardPrune determines whether a query is ready for shard pruning - * by checking whether there is a constant value in the partition column. - */ -static bool -CanShardPrune(Oid distributedTableId, Query *query) -{ - uint32 rangeTableId = 1; - ListCell *insertValuesCell = NULL; - - if (query->commandType != CMD_INSERT) - { - /* we assume UPDATE/DELETE is always prunable */ - return true; - } - - Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId); - if (partitionColumn == NULL) - { - /* can always do shard pruning for reference tables */ - return true; - } - - /* get full list of partition values and ensure they are all Consts */ - List *insertValuesList = ExtractInsertValuesList(query, partitionColumn); - foreach(insertValuesCell, insertValuesList) - { - InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell); - if (!IsA(insertValues->partitionValueExpr, Const)) - { - /* can't do shard pruning if the partition column is not constant */ - return false; - } - } - - return true; -} - - /* * ErrorIfNoShardsExist throws an error if the given table has no shards. */ @@ -2709,15 +2619,17 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError) { InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell); List *prunedShardIntervalList = NIL; + Expr *partitionValueExpr = (Expr *) strip_implicit_coercions( + (Node *) insertValues->partitionValueExpr); - if (!IsA(insertValues->partitionValueExpr, Const)) + if (!IsA(partitionValueExpr, Const)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("failed to evaluate partition key in insert"), errhint("try using constant values for partition column"))); } - Const *partitionValueConst = (Const *) insertValues->partitionValueExpr; + Const *partitionValueConst = (Const *) partitionValueExpr; if (partitionValueConst->constisnull) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index b982e583e..b6a68e849 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -2362,6 +2362,7 @@ DETAIL: distribution column value: 1 INSERT INTO articles_hash VALUES (51, 1, 'amateus', 1814), (52, 1, 'second amateus', 2824); DEBUG: Creating router plan DEBUG: Plan is router executable +DETAIL: distribution column value: 1 -- verify insert is successfull (not router plannable and executable) SELECT id FROM articles_hash diff --git a/src/test/regress/expected/multi_router_planner_fast_path.out b/src/test/regress/expected/multi_router_planner_fast_path.out index 20392ad78..974761913 100644 --- a/src/test/regress/expected/multi_router_planner_fast_path.out +++ b/src/test/regress/expected/multi_router_planner_fast_path.out @@ -2139,6 +2139,7 @@ DETAIL: distribution column value: 1 INSERT INTO articles_hash VALUES (51, 1, 'amateus', 1814), (52, 1, 'second amateus', 2824); DEBUG: Creating router plan DEBUG: Plan is router executable +DETAIL: distribution column value: 1 -- verify insert is successfull (not router plannable and executable) SELECT id FROM articles_hash