Simplify INSERT logic in router planner

pull/3584/head
Marco Slot 2020-03-06 10:15:56 +01:00
parent d0d51bb8c3
commit cb3d90bdc8
4 changed files with 52 additions and 107 deletions

View File

@ -52,6 +52,7 @@ static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags);
static void CitusBeginSelectScan(CustomScanState *node, EState *estate, int eflags);
static void CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags);
static void CitusPreExecScan(CitusScanState *scanState);
static bool ModifyJobNeedsEvaluation(Job *workerJob);
static void RegenerateTaskForFasthPathQuery(Job *workerJob);
static void RegenerateTaskListForInsert(Job *workerJob);
static void CacheLocalPlanForShardQuery(Task *task,
@ -334,10 +335,8 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
Job *workerJob = currentPlan->workerJob;
Query *jobQuery = workerJob->jobQuery;
bool evaluateAllExpressions = workerJob->requiresMasterEvaluation ||
workerJob->deferredPruning;
if (evaluateAllExpressions)
if (ModifyJobNeedsEvaluation(workerJob))
{
/* evaluate both functions and parameters */
ExecuteMasterEvaluableFunctionsAndParameters(jobQuery, planState);
@ -414,6 +413,30 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
}
/*
* ModifyJobNeedsEvaluation checks whether the functions and parameters in the job query
* need to be evaluated before we can build task query strings.
*/
static bool
ModifyJobNeedsEvaluation(Job *workerJob)
{
if (workerJob->requiresMasterEvaluation)
{
/* query contains functions that need to be evaluated on the coordinator */
return true;
}
if (workerJob->partitionKeyValue != NULL)
{
/* the value of the distribution column is already known */
return false;
}
/* pruning was deferred due to a parameter in the partition column */
return workerJob->deferredPruning;
}
/*
* CopyDistributedPlanWithoutCache is a helper function which copies the
* distributedPlan into the current memory context.
@ -630,14 +653,22 @@ RegenerateTaskListForInsert(Job *workerJob)
/* need to perform shard pruning, rebuild the task list from scratch */
List *taskList = RouterInsertTaskList(jobQuery, parametersInJobQueryResolved,
&planningError);
if (planningError != NULL)
{
RaiseDeferredError(planningError, ERROR);
}
workerJob->taskList = taskList;
if (workerJob->partitionKeyValue == NULL)
{
/*
* If we were not able to determine the partition key value in the planner,
* take another shot now. It may still be NULL in case of a multi-row
* insert.
*/
workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery);
}
RebuildQueryStrings(workerJob);
}

View File

@ -133,11 +133,9 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *sta
static bool MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context);
static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
FromExpr *joinTree);
static Job * RouterInsertJob(Query *originalQuery, Query *query,
DeferredErrorMessage **planningError);
static Job * RouterInsertJob(Query *originalQuery);
static void ErrorIfNoShardsExist(CitusTableCacheEntry *cacheEntry);
static DeferredErrorMessage * DeferErrorIfModifyView(Query *queryTree);
static bool CanShardPrune(Oid distributedTableId, Query *query);
static Job * CreateJob(Query *query);
static Task * CreateTask(TaskType taskType);
static Job * RouterJob(Query *originalQuery,
@ -225,7 +223,7 @@ CreateModifyPlan(Query *originalQuery, Query *query,
}
else
{
job = RouterInsertJob(originalQuery, query, &distributedPlan->planningError);
job = RouterInsertJob(originalQuery);
}
if (distributedPlan->planningError != NULL)
@ -1407,73 +1405,24 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
/*
* RouterInsertJob builds a Job to represent an insertion performed by
* the provided query against the provided shard interval. This task contains
* shard-extended deparsed SQL to be run during execution.
* RouterInsertJob builds a Job to represent an insertion performed by the provided
* query. For inserts we always defer shard pruning and generating the task list to
* the executor.
*/
static Job *
RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **planningError)
RouterInsertJob(Query *originalQuery)
{
Oid distributedTableId = ExtractFirstCitusTableId(query);
List *taskList = NIL;
bool requiresMasterEvaluation = false;
bool deferredPruning = false;
Const *partitionKeyValue = NULL;
bool isMultiRowInsert = IsMultiRowInsert(query);
bool isMultiRowInsert = IsMultiRowInsert(originalQuery);
if (isMultiRowInsert)
{
/* add default expressions to RTE_VALUES in multi-row INSERTs */
NormalizeMultiRowInsertTargetList(originalQuery);
}
if (isMultiRowInsert || !CanShardPrune(distributedTableId, query))
{
/*
* If there is a non-constant (e.g. parameter, function call) in the partition
* column of the INSERT then we defer shard pruning until the executor where
* these values are known.
*
* XXX: We also defer pruning for multi-row INSERTs because of some current
* limitations with the way multi-row INSERTs are handled. Most notably, we
* don't evaluate functions in task->rowValuesList. Therefore we need to
* perform function evaluation before we can run RouterInsertTaskList.
*/
taskList = NIL;
deferredPruning = true;
/* must evaluate the non-constant in the partition column */
requiresMasterEvaluation = true;
}
else
{
bool parametersInQueryResolved = false;
taskList = RouterInsertTaskList(query, parametersInQueryResolved, planningError);
if (*planningError)
{
return NULL;
}
/* determine whether there are function calls to evaluate */
requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
}
Job *job = CreateJob(originalQuery);
job->taskList = taskList;
job->requiresMasterEvaluation = requiresMasterEvaluation;
job->deferredPruning = deferredPruning;
if (!requiresMasterEvaluation)
{
/* no functions or parameters, build the query strings upfront */
RebuildQueryStrings(job);
/* remember the partition column value */
partitionKeyValue = ExtractInsertPartitionKeyValue(originalQuery);
}
job->partitionKeyValue = partitionKeyValue;
job->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
job->deferredPruning = true;
job->partitionKeyValue = ExtractInsertPartitionKeyValue(originalQuery);
return job;
}
@ -1498,45 +1447,6 @@ CreateJob(Query *query)
}
/*
* CanShardPrune determines whether a query is ready for shard pruning
* by checking whether there is a constant value in the partition column.
*/
static bool
CanShardPrune(Oid distributedTableId, Query *query)
{
uint32 rangeTableId = 1;
ListCell *insertValuesCell = NULL;
if (query->commandType != CMD_INSERT)
{
/* we assume UPDATE/DELETE is always prunable */
return true;
}
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
if (partitionColumn == NULL)
{
/* can always do shard pruning for reference tables */
return true;
}
/* get full list of partition values and ensure they are all Consts */
List *insertValuesList = ExtractInsertValuesList(query, partitionColumn);
foreach(insertValuesCell, insertValuesList)
{
InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell);
if (!IsA(insertValues->partitionValueExpr, Const))
{
/* can't do shard pruning if the partition column is not constant */
return false;
}
}
return true;
}
/*
* ErrorIfNoShardsExist throws an error if the given table has no shards.
*/
@ -2709,15 +2619,17 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
{
InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell);
List *prunedShardIntervalList = NIL;
Expr *partitionValueExpr = (Expr *) strip_implicit_coercions(
(Node *) insertValues->partitionValueExpr);
if (!IsA(insertValues->partitionValueExpr, Const))
if (!IsA(partitionValueExpr, Const))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("failed to evaluate partition key in insert"),
errhint("try using constant values for partition column")));
}
Const *partitionValueConst = (Const *) insertValues->partitionValueExpr;
Const *partitionValueConst = (Const *) partitionValueExpr;
if (partitionValueConst->constisnull)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),

View File

@ -2362,6 +2362,7 @@ DETAIL: distribution column value: 1
INSERT INTO articles_hash VALUES (51, 1, 'amateus', 1814), (52, 1, 'second amateus', 2824);
DEBUG: Creating router plan
DEBUG: Plan is router executable
DETAIL: distribution column value: 1
-- verify insert is successfull (not router plannable and executable)
SELECT id
FROM articles_hash

View File

@ -2139,6 +2139,7 @@ DETAIL: distribution column value: 1
INSERT INTO articles_hash VALUES (51, 1, 'amateus', 1814), (52, 1, 'second amateus', 2824);
DEBUG: Creating router plan
DEBUG: Plan is router executable
DETAIL: distribution column value: 1
-- verify insert is successfull (not router plannable and executable)
SELECT id
FROM articles_hash