mirror of https://github.com/citusdata/citus.git
Convert multi-row INSERT target list to Vars
parent
1920390688
commit
0aadbb1760
|
@ -100,7 +100,6 @@ static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = {
|
||||||
static void PrepareMasterJobDirectory(Job *workerJob);
|
static void PrepareMasterJobDirectory(Job *workerJob);
|
||||||
static void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
|
static void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
|
||||||
static Relation StubRelation(TupleDesc tupleDescriptor);
|
static Relation StubRelation(TupleDesc tupleDescriptor);
|
||||||
static bool IsMultiRowInsert(Query *query);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -195,20 +194,6 @@ RouterCreateScan(CustomScan *scan)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* IsMultiRowInsert returns whether the given query is a multi-row INSERT.
|
|
||||||
*
|
|
||||||
* It does this by determining whether the query is an INSERT that has an
|
|
||||||
* RTE_VALUES. Single-row INSERTs will have their RTE_VALUES optimised away
|
|
||||||
* in transformInsertStmt, and instead use the target list.
|
|
||||||
*/
|
|
||||||
static bool
|
|
||||||
IsMultiRowInsert(Query *query)
|
|
||||||
{
|
|
||||||
return ExtractDistributedInsertValuesRTE(query) != NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CoordinatorInsertSelectCrateScan creates the scan state for executing
|
* CoordinatorInsertSelectCrateScan creates the scan state for executing
|
||||||
* INSERT..SELECT into a distributed table via the coordinator.
|
* INSERT..SELECT into a distributed table via the coordinator.
|
||||||
|
|
|
@ -130,10 +130,10 @@ static List * TargetShardIntervalsForRouter(Query *query,
|
||||||
RelationRestrictionContext *restrictionContext,
|
RelationRestrictionContext *restrictionContext,
|
||||||
bool *multiShardQuery);
|
bool *multiShardQuery);
|
||||||
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
|
static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
|
||||||
|
static void NormalizeMultiRowInsertTargetList(Query *query);
|
||||||
static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError);
|
static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError);
|
||||||
static List * GroupInsertValuesByShardId(List *insertValuesList);
|
static List * GroupInsertValuesByShardId(List *insertValuesList);
|
||||||
static List * ExtractInsertValuesList(Query *query, Var *partitionColumn);
|
static List * ExtractInsertValuesList(Query *query, Var *partitionColumn);
|
||||||
static int GetTargetListEntryIndexByResno(List *targetList, int resno);
|
|
||||||
static bool MultiRouterPlannableQuery(Query *query,
|
static bool MultiRouterPlannableQuery(Query *query,
|
||||||
RelationRestrictionContext *restrictionContext);
|
RelationRestrictionContext *restrictionContext);
|
||||||
static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree);
|
static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree);
|
||||||
|
@ -1081,21 +1081,12 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann
|
||||||
Job *job = NULL;
|
Job *job = NULL;
|
||||||
bool requiresMasterEvaluation = false;
|
bool requiresMasterEvaluation = false;
|
||||||
bool deferredPruning = false;
|
bool deferredPruning = false;
|
||||||
bool isMultiRowInsert = false;
|
|
||||||
|
|
||||||
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery);
|
bool isMultiRowInsert = IsMultiRowInsert(query);
|
||||||
if (valuesRTE != NULL)
|
if (isMultiRowInsert)
|
||||||
{
|
{
|
||||||
/*
|
/* add default expressions to RTE_VALUES in multi-row INSERTs */
|
||||||
* We expand the values_lists to contain all default expressions
|
NormalizeMultiRowInsertTargetList(originalQuery);
|
||||||
* from the target list. By doing this early on, in the original
|
|
||||||
* query, we can later evaluate default expressions for each
|
|
||||||
* individual row and then perform shard pruning.
|
|
||||||
*/
|
|
||||||
valuesRTE->values_lists = ExpandValuesLists(originalQuery->targetList,
|
|
||||||
valuesRTE->values_lists);
|
|
||||||
|
|
||||||
isMultiRowInsert = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isMultiRowInsert || !CanShardPrune(distributedTableId, query))
|
if (isMultiRowInsert || !CanShardPrune(distributedTableId, query))
|
||||||
|
@ -2023,6 +2014,20 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsMultiRowInsert returns whether the given query is a multi-row INSERT.
|
||||||
|
*
|
||||||
|
* It does this by determining whether the query is an INSERT that has an
|
||||||
|
* RTE_VALUES. Single-row INSERTs will have their RTE_VALUES optimised away
|
||||||
|
* in transformInsertStmt, and instead use the target list.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
IsMultiRowInsert(Query *query)
|
||||||
|
{
|
||||||
|
return ExtractDistributedInsertValuesRTE(query) != NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExtractDistributedInsertValuesRTE does precisely that. If the provided
|
* ExtractDistributedInsertValuesRTE does precisely that. If the provided
|
||||||
* query is not an INSERT, or if the INSERT does not have a VALUES RTE
|
* query is not an INSERT, or if the INSERT does not have a VALUES RTE
|
||||||
|
@ -2057,23 +2062,34 @@ ExtractDistributedInsertValuesRTE(Query *query)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExpandValuesLists expands VALUES lists by building new lists
|
* NormalizeMultiRowInsertTargetList ensures all elements of multi-row INSERT target
|
||||||
* that include expressions from the target list for columns
|
* lists are Vars. In multi-row INSERTs, most target list entries contain a Var
|
||||||
* with a default expression.
|
* expression pointing to a position within the values_lists field of a VALUES
|
||||||
|
* RTE, but non-NULL default columns are handled differently. Instead of adding
|
||||||
|
* the default expression to each row, a single expression encoding the DEFAULT
|
||||||
|
* appears in the target list. For consistency, we move these expressions into
|
||||||
|
* values lists and replace them with an appropriately constructed Var.
|
||||||
*/
|
*/
|
||||||
List *
|
static void
|
||||||
ExpandValuesLists(List *targetList, List *valuesLists)
|
NormalizeMultiRowInsertTargetList(Query *query)
|
||||||
{
|
{
|
||||||
ListCell *valuesListCell = NULL;
|
ListCell *valuesListCell = NULL;
|
||||||
List *expandedValuesLists = NIL;
|
|
||||||
ListCell *targetEntryCell = NULL;
|
ListCell *targetEntryCell = NULL;
|
||||||
|
int targetEntryNo = 0;
|
||||||
|
|
||||||
foreach(valuesListCell, valuesLists)
|
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(query);
|
||||||
|
if (valuesRTE == NULL)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach(valuesListCell, valuesRTE->values_lists)
|
||||||
{
|
{
|
||||||
List *valuesList = (List *) lfirst(valuesListCell);
|
List *valuesList = (List *) lfirst(valuesListCell);
|
||||||
|
Expr **valuesArray = (Expr **) PointerArrayFromList(valuesList);
|
||||||
List *expandedValuesList = NIL;
|
List *expandedValuesList = NIL;
|
||||||
|
|
||||||
foreach(targetEntryCell, targetList)
|
foreach(targetEntryCell, query->targetList)
|
||||||
{
|
{
|
||||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||||
Expr *targetExpr = targetEntry->expr;
|
Expr *targetExpr = targetEntry->expr;
|
||||||
|
@ -2082,7 +2098,7 @@ ExpandValuesLists(List *targetList, List *valuesLists)
|
||||||
{
|
{
|
||||||
/* expression from the VALUES section */
|
/* expression from the VALUES section */
|
||||||
Var *targetListVar = (Var *) targetExpr;
|
Var *targetListVar = (Var *) targetExpr;
|
||||||
targetExpr = list_nth(valuesList, targetListVar->varattno - 1);
|
targetExpr = valuesArray[targetListVar->varattno - 1];
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -2093,10 +2109,53 @@ ExpandValuesLists(List *targetList, List *valuesLists)
|
||||||
expandedValuesList = lappend(expandedValuesList, targetExpr);
|
expandedValuesList = lappend(expandedValuesList, targetExpr);
|
||||||
}
|
}
|
||||||
|
|
||||||
expandedValuesLists = lappend(expandedValuesLists, expandedValuesList);
|
valuesListCell->data.ptr_value = (void *) expandedValuesList;
|
||||||
}
|
}
|
||||||
|
|
||||||
return expandedValuesLists;
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
|
||||||
|
/* reset coltypes, coltypmods, colcollations and rebuild them below */
|
||||||
|
valuesRTE->coltypes = NIL;
|
||||||
|
valuesRTE->coltypmods = NIL;
|
||||||
|
valuesRTE->colcollations = NIL;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
foreach(targetEntryCell, query->targetList)
|
||||||
|
{
|
||||||
|
TargetEntry *targetEntry = lfirst(targetEntryCell);
|
||||||
|
Node *targetExprNode = (Node *) targetEntry->expr;
|
||||||
|
Oid targetType = InvalidOid;
|
||||||
|
int32 targetTypmod = -1;
|
||||||
|
Oid targetColl = InvalidOid;
|
||||||
|
Var *syntheticVar = NULL;
|
||||||
|
|
||||||
|
/* RTE_VALUES comes 2nd, after destination table */
|
||||||
|
Index valuesVarno = 2;
|
||||||
|
|
||||||
|
targetEntryNo++;
|
||||||
|
|
||||||
|
targetType = exprType(targetExprNode);
|
||||||
|
targetTypmod = exprTypmod(targetExprNode);
|
||||||
|
targetColl = exprCollation(targetExprNode);
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
valuesRTE->coltypes = lappend_oid(valuesRTE->coltypes, targetType);
|
||||||
|
valuesRTE->coltypmods = lappend_int(valuesRTE->coltypmods, targetTypmod);
|
||||||
|
valuesRTE->colcollations = lappend_oid(valuesRTE->colcollations, targetColl);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if (IsA(targetExprNode, Var))
|
||||||
|
{
|
||||||
|
Var *targetVar = (Var *) targetExprNode;
|
||||||
|
targetVar->varattno = targetEntryNo;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* replace the original expression with a Var referencing values_lists */
|
||||||
|
syntheticVar = makeVar(valuesVarno, targetEntryNo, targetType, targetTypmod,
|
||||||
|
targetColl, 0);
|
||||||
|
targetEntry->expr = (Expr *) syntheticVar;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2208,11 +2267,10 @@ static List *
|
||||||
ExtractInsertValuesList(Query *query, Var *partitionColumn)
|
ExtractInsertValuesList(Query *query, Var *partitionColumn)
|
||||||
{
|
{
|
||||||
List *insertValuesList = NIL;
|
List *insertValuesList = NIL;
|
||||||
RangeTblEntry *valuesRTE = NULL;
|
TargetEntry *targetEntry = get_tle_by_resno(query->targetList,
|
||||||
|
|
||||||
int partitionColumnIndex = GetTargetListEntryIndexByResno(query->targetList,
|
|
||||||
partitionColumn->varattno);
|
partitionColumn->varattno);
|
||||||
if (partitionColumnIndex == -1)
|
|
||||||
|
if (targetEntry == NULL)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
||||||
errmsg("cannot perform an INSERT without a partition column "
|
errmsg("cannot perform an INSERT without a partition column "
|
||||||
|
@ -2221,20 +2279,25 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We've got a multi-row INSERT. PostgreSQL internally represents such
|
* We've got a multi-row INSERT. PostgreSQL internally represents such
|
||||||
* commands with a special VALUES range table entry.
|
* commands by linking Vars in the target list to lists of values within
|
||||||
|
* a special VALUES range table entry. By extracting the right positional
|
||||||
|
* expression from each list within that RTE, we will extract the partition
|
||||||
|
* values for each row within the multi-row INSERT.
|
||||||
*/
|
*/
|
||||||
valuesRTE = ExtractDistributedInsertValuesRTE(query);
|
if (IsA(targetEntry->expr, Var))
|
||||||
if (valuesRTE != NULL)
|
|
||||||
{
|
{
|
||||||
|
Var *partitionVar = (Var *) targetEntry->expr;
|
||||||
|
RangeTblEntry *referencedRTE = NULL;
|
||||||
ListCell *valuesListCell = NULL;
|
ListCell *valuesListCell = NULL;
|
||||||
Index ivIndex = 0;
|
Index ivIndex = 0;
|
||||||
|
|
||||||
foreach(valuesListCell, valuesRTE->values_lists)
|
referencedRTE = rt_fetch(partitionVar->varno, query->rtable);
|
||||||
|
foreach(valuesListCell, referencedRTE->values_lists)
|
||||||
{
|
{
|
||||||
InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues));
|
InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues));
|
||||||
insertValues->rowValues = (List *) lfirst(valuesListCell);
|
insertValues->rowValues = (List *) lfirst(valuesListCell);
|
||||||
insertValues->partitionValueExpr = list_nth(insertValues->rowValues,
|
insertValues->partitionValueExpr = list_nth(insertValues->rowValues,
|
||||||
partitionColumnIndex);
|
(partitionVar->varattno - 1));
|
||||||
insertValues->shardId = INVALID_SHARD_ID;
|
insertValues->shardId = INVALID_SHARD_ID;
|
||||||
insertValues->listIndex = ivIndex;
|
insertValues->listIndex = ivIndex;
|
||||||
|
|
||||||
|
@ -2246,12 +2309,9 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn)
|
||||||
/* nothing's been found yet; this is a simple single-row INSERT */
|
/* nothing's been found yet; this is a simple single-row INSERT */
|
||||||
if (insertValuesList == NIL)
|
if (insertValuesList == NIL)
|
||||||
{
|
{
|
||||||
TargetEntry *partitionColumnTargetEntry = list_nth(query->targetList,
|
|
||||||
partitionColumnIndex);
|
|
||||||
|
|
||||||
InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues));
|
InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues));
|
||||||
insertValues->rowValues = NIL;
|
insertValues->rowValues = NIL;
|
||||||
insertValues->partitionValueExpr = partitionColumnTargetEntry->expr;
|
insertValues->partitionValueExpr = targetEntry->expr;
|
||||||
insertValues->shardId = INVALID_SHARD_ID;
|
insertValues->shardId = INVALID_SHARD_ID;
|
||||||
|
|
||||||
insertValuesList = lappend(insertValuesList, insertValues);
|
insertValuesList = lappend(insertValuesList, insertValues);
|
||||||
|
@ -2261,33 +2321,6 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* GetTargetListEntryIndexByResno is the equivalent of get_tle_by_resno
|
|
||||||
* but returns the index in the target list instead of the TargetEntry
|
|
||||||
* itself, or -1 if it cannot be found.
|
|
||||||
*/
|
|
||||||
static int
|
|
||||||
GetTargetListEntryIndexByResno(List *targetList, int resno)
|
|
||||||
{
|
|
||||||
ListCell *targetEntryCell = NULL;
|
|
||||||
int targetEntryIndex = 0;
|
|
||||||
|
|
||||||
foreach(targetEntryCell, targetList)
|
|
||||||
{
|
|
||||||
TargetEntry *tle = (TargetEntry *) lfirst(targetEntryCell);
|
|
||||||
|
|
||||||
if (tle->resno == resno)
|
|
||||||
{
|
|
||||||
return targetEntryIndex;
|
|
||||||
}
|
|
||||||
|
|
||||||
targetEntryIndex++;
|
|
||||||
}
|
|
||||||
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MultiRouterPlannableQuery returns true if given query can be router plannable.
|
* MultiRouterPlannableQuery returns true if given query can be router plannable.
|
||||||
* The query is router plannable if it is a modify query, or if its is a select
|
* The query is router plannable if it is a modify query, or if its is a select
|
||||||
|
|
|
@ -110,7 +110,6 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState)
|
||||||
ListCell *rteCell = NULL;
|
ListCell *rteCell = NULL;
|
||||||
ListCell *cteCell = NULL;
|
ListCell *cteCell = NULL;
|
||||||
Node *modifiedNode = NULL;
|
Node *modifiedNode = NULL;
|
||||||
bool isMultiRowInsert = false;
|
|
||||||
|
|
||||||
if (query->jointree && query->jointree->quals)
|
if (query->jointree && query->jointree->quals)
|
||||||
{
|
{
|
||||||
|
@ -118,31 +117,6 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState)
|
||||||
planState);
|
planState);
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach(rteCell, query->rtable)
|
|
||||||
{
|
|
||||||
RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell);
|
|
||||||
|
|
||||||
if (rte->rtekind == RTE_SUBQUERY)
|
|
||||||
{
|
|
||||||
ExecuteMasterEvaluableFunctions(rte->subquery, planState);
|
|
||||||
}
|
|
||||||
else if (rte->rtekind == RTE_VALUES)
|
|
||||||
{
|
|
||||||
EvaluateValuesListsItems(rte->values_lists, planState);
|
|
||||||
isMultiRowInsert = (query->commandType == CMD_INSERT);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* For multi-row INSERTs, functions are evaluated by expanding the
|
|
||||||
* values_lists with the default expressions in the target list and
|
|
||||||
* performing function evaluation on the values_lists. Expressions
|
|
||||||
* in the target list should not be evaluated since they serve only
|
|
||||||
* as templates and evaluating them would cause unexpected results
|
|
||||||
* (e.g. sequences being called one more time).
|
|
||||||
*/
|
|
||||||
if (!isMultiRowInsert)
|
|
||||||
{
|
|
||||||
foreach(targetEntryCell, query->targetList)
|
foreach(targetEntryCell, query->targetList)
|
||||||
{
|
{
|
||||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||||
|
@ -158,6 +132,19 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState)
|
||||||
|
|
||||||
targetEntry->expr = (Expr *) modifiedNode;
|
targetEntry->expr = (Expr *) modifiedNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
foreach(rteCell, query->rtable)
|
||||||
|
{
|
||||||
|
RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell);
|
||||||
|
|
||||||
|
if (rte->rtekind == RTE_SUBQUERY)
|
||||||
|
{
|
||||||
|
ExecuteMasterEvaluableFunctions(rte->subquery, planState);
|
||||||
|
}
|
||||||
|
else if (rte->rtekind == RTE_VALUES)
|
||||||
|
{
|
||||||
|
EvaluateValuesListsItems(rte->values_lists, planState);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach(cteCell, query->cteList)
|
foreach(cteCell, query->cteList)
|
||||||
|
|
|
@ -49,7 +49,7 @@ extern Oid ExtractFirstDistributedTableId(Query *query);
|
||||||
extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
|
extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
|
||||||
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
|
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
|
||||||
extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query);
|
extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query);
|
||||||
extern List * ExpandValuesLists(List *targetList, List *valuesLists);
|
extern bool IsMultiRowInsert(Query *query);
|
||||||
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
|
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
|
||||||
ShardInterval *shardInterval);
|
ShardInterval *shardInterval);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue