mirror of https://github.com/citusdata/citus.git
Merge pull request #3584 from citusdata/simplify_insert_logic
Simplify INSERT logic in router plannerpull/3603/head
commit
c26f99ea82
|
@ -52,6 +52,7 @@ static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags);
|
||||||
static void CitusBeginSelectScan(CustomScanState *node, EState *estate, int eflags);
|
static void CitusBeginSelectScan(CustomScanState *node, EState *estate, int eflags);
|
||||||
static void CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags);
|
static void CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags);
|
||||||
static void CitusPreExecScan(CitusScanState *scanState);
|
static void CitusPreExecScan(CitusScanState *scanState);
|
||||||
|
static bool ModifyJobNeedsEvaluation(Job *workerJob);
|
||||||
static void RegenerateTaskForFasthPathQuery(Job *workerJob);
|
static void RegenerateTaskForFasthPathQuery(Job *workerJob);
|
||||||
static void RegenerateTaskListForInsert(Job *workerJob);
|
static void RegenerateTaskListForInsert(Job *workerJob);
|
||||||
static void CacheLocalPlanForShardQuery(Task *task,
|
static void CacheLocalPlanForShardQuery(Task *task,
|
||||||
|
@ -334,10 +335,8 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
|
|
||||||
Job *workerJob = currentPlan->workerJob;
|
Job *workerJob = currentPlan->workerJob;
|
||||||
Query *jobQuery = workerJob->jobQuery;
|
Query *jobQuery = workerJob->jobQuery;
|
||||||
bool evaluateAllExpressions = workerJob->requiresMasterEvaluation ||
|
|
||||||
workerJob->deferredPruning;
|
|
||||||
|
|
||||||
if (evaluateAllExpressions)
|
if (ModifyJobNeedsEvaluation(workerJob))
|
||||||
{
|
{
|
||||||
/* evaluate both functions and parameters */
|
/* evaluate both functions and parameters */
|
||||||
ExecuteMasterEvaluableFunctionsAndParameters(jobQuery, planState);
|
ExecuteMasterEvaluableFunctionsAndParameters(jobQuery, planState);
|
||||||
|
@ -414,6 +413,30 @@ CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ModifyJobNeedsEvaluation checks whether the functions and parameters in the job query
|
||||||
|
* need to be evaluated before we can build task query strings.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
ModifyJobNeedsEvaluation(Job *workerJob)
|
||||||
|
{
|
||||||
|
if (workerJob->requiresMasterEvaluation)
|
||||||
|
{
|
||||||
|
/* query contains functions that need to be evaluated on the coordinator */
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (workerJob->partitionKeyValue != NULL)
|
||||||
|
{
|
||||||
|
/* the value of the distribution column is already known */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* pruning was deferred due to a parameter in the partition column */
|
||||||
|
return workerJob->deferredPruning;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CopyDistributedPlanWithoutCache is a helper function which copies the
|
* CopyDistributedPlanWithoutCache is a helper function which copies the
|
||||||
* distributedPlan into the current memory context.
|
* distributedPlan into the current memory context.
|
||||||
|
@ -630,14 +653,22 @@ RegenerateTaskListForInsert(Job *workerJob)
|
||||||
/* need to perform shard pruning, rebuild the task list from scratch */
|
/* need to perform shard pruning, rebuild the task list from scratch */
|
||||||
List *taskList = RouterInsertTaskList(jobQuery, parametersInJobQueryResolved,
|
List *taskList = RouterInsertTaskList(jobQuery, parametersInJobQueryResolved,
|
||||||
&planningError);
|
&planningError);
|
||||||
|
|
||||||
if (planningError != NULL)
|
if (planningError != NULL)
|
||||||
{
|
{
|
||||||
RaiseDeferredError(planningError, ERROR);
|
RaiseDeferredError(planningError, ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
workerJob->taskList = taskList;
|
workerJob->taskList = taskList;
|
||||||
|
|
||||||
|
if (workerJob->partitionKeyValue == NULL)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If we were not able to determine the partition key value in the planner,
|
||||||
|
* take another shot now. It may still be NULL in case of a multi-row
|
||||||
|
* insert.
|
||||||
|
*/
|
||||||
workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery);
|
workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery);
|
||||||
|
}
|
||||||
|
|
||||||
RebuildQueryStrings(workerJob);
|
RebuildQueryStrings(workerJob);
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,11 +133,9 @@ static bool MasterIrreducibleExpressionWalker(Node *expression, WalkerState *sta
|
||||||
static bool MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context);
|
static bool MasterIrreducibleExpressionFunctionChecker(Oid func_id, void *context);
|
||||||
static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
|
static bool TargetEntryChangesValue(TargetEntry *targetEntry, Var *column,
|
||||||
FromExpr *joinTree);
|
FromExpr *joinTree);
|
||||||
static Job * RouterInsertJob(Query *originalQuery, Query *query,
|
static Job * RouterInsertJob(Query *originalQuery);
|
||||||
DeferredErrorMessage **planningError);
|
|
||||||
static void ErrorIfNoShardsExist(CitusTableCacheEntry *cacheEntry);
|
static void ErrorIfNoShardsExist(CitusTableCacheEntry *cacheEntry);
|
||||||
static DeferredErrorMessage * DeferErrorIfModifyView(Query *queryTree);
|
static DeferredErrorMessage * DeferErrorIfModifyView(Query *queryTree);
|
||||||
static bool CanShardPrune(Oid distributedTableId, Query *query);
|
|
||||||
static Job * CreateJob(Query *query);
|
static Job * CreateJob(Query *query);
|
||||||
static Task * CreateTask(TaskType taskType);
|
static Task * CreateTask(TaskType taskType);
|
||||||
static Job * RouterJob(Query *originalQuery,
|
static Job * RouterJob(Query *originalQuery,
|
||||||
|
@ -225,7 +223,7 @@ CreateModifyPlan(Query *originalQuery, Query *query,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
job = RouterInsertJob(originalQuery, query, &distributedPlan->planningError);
|
job = RouterInsertJob(originalQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (distributedPlan->planningError != NULL)
|
if (distributedPlan->planningError != NULL)
|
||||||
|
@ -1407,73 +1405,24 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RouterInsertJob builds a Job to represent an insertion performed by
|
* RouterInsertJob builds a Job to represent an insertion performed by the provided
|
||||||
* the provided query against the provided shard interval. This task contains
|
* query. For inserts we always defer shard pruning and generating the task list to
|
||||||
* shard-extended deparsed SQL to be run during execution.
|
* the executor.
|
||||||
*/
|
*/
|
||||||
static Job *
|
static Job *
|
||||||
RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **planningError)
|
RouterInsertJob(Query *originalQuery)
|
||||||
{
|
{
|
||||||
Oid distributedTableId = ExtractFirstCitusTableId(query);
|
bool isMultiRowInsert = IsMultiRowInsert(originalQuery);
|
||||||
List *taskList = NIL;
|
|
||||||
bool requiresMasterEvaluation = false;
|
|
||||||
bool deferredPruning = false;
|
|
||||||
Const *partitionKeyValue = NULL;
|
|
||||||
|
|
||||||
bool isMultiRowInsert = IsMultiRowInsert(query);
|
|
||||||
if (isMultiRowInsert)
|
if (isMultiRowInsert)
|
||||||
{
|
{
|
||||||
/* add default expressions to RTE_VALUES in multi-row INSERTs */
|
/* add default expressions to RTE_VALUES in multi-row INSERTs */
|
||||||
NormalizeMultiRowInsertTargetList(originalQuery);
|
NormalizeMultiRowInsertTargetList(originalQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isMultiRowInsert || !CanShardPrune(distributedTableId, query))
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* If there is a non-constant (e.g. parameter, function call) in the partition
|
|
||||||
* column of the INSERT then we defer shard pruning until the executor where
|
|
||||||
* these values are known.
|
|
||||||
*
|
|
||||||
* XXX: We also defer pruning for multi-row INSERTs because of some current
|
|
||||||
* limitations with the way multi-row INSERTs are handled. Most notably, we
|
|
||||||
* don't evaluate functions in task->rowValuesList. Therefore we need to
|
|
||||||
* perform function evaluation before we can run RouterInsertTaskList.
|
|
||||||
*/
|
|
||||||
taskList = NIL;
|
|
||||||
deferredPruning = true;
|
|
||||||
|
|
||||||
/* must evaluate the non-constant in the partition column */
|
|
||||||
requiresMasterEvaluation = true;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
bool parametersInQueryResolved = false;
|
|
||||||
|
|
||||||
taskList = RouterInsertTaskList(query, parametersInQueryResolved, planningError);
|
|
||||||
if (*planningError)
|
|
||||||
{
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* determine whether there are function calls to evaluate */
|
|
||||||
requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
|
|
||||||
}
|
|
||||||
|
|
||||||
Job *job = CreateJob(originalQuery);
|
Job *job = CreateJob(originalQuery);
|
||||||
job->taskList = taskList;
|
job->requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
|
||||||
job->requiresMasterEvaluation = requiresMasterEvaluation;
|
job->deferredPruning = true;
|
||||||
job->deferredPruning = deferredPruning;
|
job->partitionKeyValue = ExtractInsertPartitionKeyValue(originalQuery);
|
||||||
|
|
||||||
if (!requiresMasterEvaluation)
|
|
||||||
{
|
|
||||||
/* no functions or parameters, build the query strings upfront */
|
|
||||||
RebuildQueryStrings(job);
|
|
||||||
|
|
||||||
/* remember the partition column value */
|
|
||||||
partitionKeyValue = ExtractInsertPartitionKeyValue(originalQuery);
|
|
||||||
}
|
|
||||||
|
|
||||||
job->partitionKeyValue = partitionKeyValue;
|
|
||||||
|
|
||||||
return job;
|
return job;
|
||||||
}
|
}
|
||||||
|
@ -1498,45 +1447,6 @@ CreateJob(Query *query)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CanShardPrune determines whether a query is ready for shard pruning
|
|
||||||
* by checking whether there is a constant value in the partition column.
|
|
||||||
*/
|
|
||||||
static bool
|
|
||||||
CanShardPrune(Oid distributedTableId, Query *query)
|
|
||||||
{
|
|
||||||
uint32 rangeTableId = 1;
|
|
||||||
ListCell *insertValuesCell = NULL;
|
|
||||||
|
|
||||||
if (query->commandType != CMD_INSERT)
|
|
||||||
{
|
|
||||||
/* we assume UPDATE/DELETE is always prunable */
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
|
||||||
if (partitionColumn == NULL)
|
|
||||||
{
|
|
||||||
/* can always do shard pruning for reference tables */
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* get full list of partition values and ensure they are all Consts */
|
|
||||||
List *insertValuesList = ExtractInsertValuesList(query, partitionColumn);
|
|
||||||
foreach(insertValuesCell, insertValuesList)
|
|
||||||
{
|
|
||||||
InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell);
|
|
||||||
if (!IsA(insertValues->partitionValueExpr, Const))
|
|
||||||
{
|
|
||||||
/* can't do shard pruning if the partition column is not constant */
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ErrorIfNoShardsExist throws an error if the given table has no shards.
|
* ErrorIfNoShardsExist throws an error if the given table has no shards.
|
||||||
*/
|
*/
|
||||||
|
@ -2709,15 +2619,17 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||||
{
|
{
|
||||||
InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell);
|
InsertValues *insertValues = (InsertValues *) lfirst(insertValuesCell);
|
||||||
List *prunedShardIntervalList = NIL;
|
List *prunedShardIntervalList = NIL;
|
||||||
|
Expr *partitionValueExpr = (Expr *) strip_implicit_coercions(
|
||||||
|
(Node *) insertValues->partitionValueExpr);
|
||||||
|
|
||||||
if (!IsA(insertValues->partitionValueExpr, Const))
|
if (!IsA(partitionValueExpr, Const))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("failed to evaluate partition key in insert"),
|
errmsg("failed to evaluate partition key in insert"),
|
||||||
errhint("try using constant values for partition column")));
|
errhint("try using constant values for partition column")));
|
||||||
}
|
}
|
||||||
|
|
||||||
Const *partitionValueConst = (Const *) insertValues->partitionValueExpr;
|
Const *partitionValueConst = (Const *) partitionValueExpr;
|
||||||
if (partitionValueConst->constisnull)
|
if (partitionValueConst->constisnull)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
||||||
|
|
|
@ -2362,6 +2362,7 @@ DETAIL: distribution column value: 1
|
||||||
INSERT INTO articles_hash VALUES (51, 1, 'amateus', 1814), (52, 1, 'second amateus', 2824);
|
INSERT INTO articles_hash VALUES (51, 1, 'amateus', 1814), (52, 1, 'second amateus', 2824);
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
DEBUG: Plan is router executable
|
DEBUG: Plan is router executable
|
||||||
|
DETAIL: distribution column value: 1
|
||||||
-- verify insert is successfull (not router plannable and executable)
|
-- verify insert is successfull (not router plannable and executable)
|
||||||
SELECT id
|
SELECT id
|
||||||
FROM articles_hash
|
FROM articles_hash
|
||||||
|
|
|
@ -2139,6 +2139,7 @@ DETAIL: distribution column value: 1
|
||||||
INSERT INTO articles_hash VALUES (51, 1, 'amateus', 1814), (52, 1, 'second amateus', 2824);
|
INSERT INTO articles_hash VALUES (51, 1, 'amateus', 1814), (52, 1, 'second amateus', 2824);
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
DEBUG: Plan is router executable
|
DEBUG: Plan is router executable
|
||||||
|
DETAIL: distribution column value: 1
|
||||||
-- verify insert is successfull (not router plannable and executable)
|
-- verify insert is successfull (not router plannable and executable)
|
||||||
SELECT id
|
SELECT id
|
||||||
FROM articles_hash
|
FROM articles_hash
|
||||||
|
|
Loading…
Reference in New Issue