mirror of https://github.com/citusdata/citus.git
Allow default columns in multi-row INSERTs
parent
a658f5ecda
commit
ae00795dab
|
@ -133,6 +133,7 @@ static List * WorkersContainingAllShards(List *prunedShardIntervalsList);
|
|||
static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError);
|
||||
static List * GroupInsertValuesByShardId(List *insertValuesList);
|
||||
static List * ExtractInsertValuesList(Query *query, Var *partitionColumn);
|
||||
static int GetTargetListEntryIndexByResno(List *targetList, int resno);
|
||||
static bool MultiRouterPlannableQuery(Query *query,
|
||||
RelationRestrictionContext *restrictionContext);
|
||||
static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree);
|
||||
|
@ -1080,10 +1081,35 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann
|
|||
Job *job = NULL;
|
||||
bool requiresMasterEvaluation = false;
|
||||
bool deferredPruning = false;
|
||||
bool isMultiRowInsert = false;
|
||||
|
||||
if (!CanShardPrune(distributedTableId, query))
|
||||
RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery);
|
||||
if (valuesRTE != NULL)
|
||||
{
|
||||
/* there is a non-constant in the partition column, cannot prune yet */
|
||||
/*
|
||||
* We expand the values_lists to contain all default expressions
|
||||
* from the target list. By doing this early on, in the original
|
||||
* query, we can later evaluate default expressions for each
|
||||
* individual row and then perform shard pruning.
|
||||
*/
|
||||
valuesRTE->values_lists = ExpandValuesLists(originalQuery->targetList,
|
||||
valuesRTE->values_lists);
|
||||
|
||||
isMultiRowInsert = true;
|
||||
}
|
||||
|
||||
if (isMultiRowInsert || !CanShardPrune(distributedTableId, query))
|
||||
{
|
||||
/*
|
||||
* If there is a non-constant (e.g. parameter, function call) in the partition
|
||||
* column of the INSERT then we defer shard pruning until the executor where
|
||||
* these values are known.
|
||||
*
|
||||
* XXX: We also defer pruning for multi-row INSERTs because of some current
|
||||
* limitations with the way multi-row INSERTs are handled. Most notably, we
|
||||
* don't evaluate functions in task->rowValuesList. Therefore we need to
|
||||
* perform function evaluation before we can run RouterInsertTaskList.
|
||||
*/
|
||||
taskList = NIL;
|
||||
deferredPruning = true;
|
||||
|
||||
|
@ -2030,6 +2056,50 @@ ExtractDistributedInsertValuesRTE(Query *query)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExpandValuesLists expands VALUES lists by building new lists
|
||||
* that include expressions from the target list for columns
|
||||
* with a default expression.
|
||||
*/
|
||||
List *
|
||||
ExpandValuesLists(List *targetList, List *valuesLists)
|
||||
{
|
||||
ListCell *valuesListCell = NULL;
|
||||
List *expandedValuesLists = NIL;
|
||||
ListCell *targetEntryCell = NULL;
|
||||
|
||||
foreach(valuesListCell, valuesLists)
|
||||
{
|
||||
List *valuesList = (List *) lfirst(valuesListCell);
|
||||
List *expandedValuesList = NIL;
|
||||
|
||||
foreach(targetEntryCell, targetList)
|
||||
{
|
||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||
Expr *targetExpr = targetEntry->expr;
|
||||
|
||||
if (IsA(targetExpr, Var))
|
||||
{
|
||||
/* expression from the VALUES section */
|
||||
Var *targetListVar = (Var *) targetExpr;
|
||||
targetExpr = list_nth(valuesList, targetListVar->varattno - 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* copy the column's default expression */
|
||||
targetExpr = copyObject(targetExpr);
|
||||
}
|
||||
|
||||
expandedValuesList = lappend(expandedValuesList, targetExpr);
|
||||
}
|
||||
|
||||
expandedValuesLists = lappend(expandedValuesLists, expandedValuesList);
|
||||
}
|
||||
|
||||
return expandedValuesLists;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IntersectPlacementList performs placement pruning based on matching on
|
||||
* nodeName:nodePort fields of shard placement data. We start pruning from all
|
||||
|
@ -2138,10 +2208,11 @@ static List *
|
|||
ExtractInsertValuesList(Query *query, Var *partitionColumn)
|
||||
{
|
||||
List *insertValuesList = NIL;
|
||||
TargetEntry *targetEntry = get_tle_by_resno(query->targetList,
|
||||
partitionColumn->varattno);
|
||||
RangeTblEntry *valuesRTE = NULL;
|
||||
|
||||
if (targetEntry == NULL)
|
||||
int partitionColumnIndex = GetTargetListEntryIndexByResno(query->targetList,
|
||||
partitionColumn->varattno);
|
||||
if (partitionColumnIndex == -1)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
||||
errmsg("cannot perform an INSERT without a partition column "
|
||||
|
@ -2150,25 +2221,20 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn)
|
|||
|
||||
/*
|
||||
* We've got a multi-row INSERT. PostgreSQL internally represents such
|
||||
* commands by linking Vars in the target list to lists of values within
|
||||
* a special VALUES range table entry. By extracting the right positional
|
||||
* expression from each list within that RTE, we will extract the partition
|
||||
* values for each row within the multi-row INSERT.
|
||||
* commands with a special VALUES range table entry.
|
||||
*/
|
||||
if (IsA(targetEntry->expr, Var))
|
||||
valuesRTE = ExtractDistributedInsertValuesRTE(query);
|
||||
if (valuesRTE != NULL)
|
||||
{
|
||||
Var *partitionVar = (Var *) targetEntry->expr;
|
||||
RangeTblEntry *referencedRTE = NULL;
|
||||
ListCell *valuesListCell = NULL;
|
||||
Index ivIndex = 0;
|
||||
|
||||
referencedRTE = rt_fetch(partitionVar->varno, query->rtable);
|
||||
foreach(valuesListCell, referencedRTE->values_lists)
|
||||
foreach(valuesListCell, valuesRTE->values_lists)
|
||||
{
|
||||
InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues));
|
||||
insertValues->rowValues = (List *) lfirst(valuesListCell);
|
||||
insertValues->partitionValueExpr = list_nth(insertValues->rowValues,
|
||||
(partitionVar->varattno - 1));
|
||||
partitionColumnIndex);
|
||||
insertValues->shardId = INVALID_SHARD_ID;
|
||||
insertValues->listIndex = ivIndex;
|
||||
|
||||
|
@ -2180,9 +2246,12 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn)
|
|||
/* nothing's been found yet; this is a simple single-row INSERT */
|
||||
if (insertValuesList == NIL)
|
||||
{
|
||||
TargetEntry *partitionColumnTargetEntry = list_nth(query->targetList,
|
||||
partitionColumnIndex);
|
||||
|
||||
InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues));
|
||||
insertValues->rowValues = NIL;
|
||||
insertValues->partitionValueExpr = targetEntry->expr;
|
||||
insertValues->partitionValueExpr = partitionColumnTargetEntry->expr;
|
||||
insertValues->shardId = INVALID_SHARD_ID;
|
||||
|
||||
insertValuesList = lappend(insertValuesList, insertValues);
|
||||
|
@ -2192,6 +2261,33 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetTargetListEntryIndexByResno is the equivalent of get_tle_by_resno
|
||||
* but returns the index in the target list instead of the TargetEntry
|
||||
* itself, or -1 if it cannot be found.
|
||||
*/
|
||||
static int
|
||||
GetTargetListEntryIndexByResno(List *targetList, int resno)
|
||||
{
|
||||
ListCell *targetEntryCell = NULL;
|
||||
int targetEntryIndex = 0;
|
||||
|
||||
foreach(targetEntryCell, targetList)
|
||||
{
|
||||
TargetEntry *tle = (TargetEntry *) lfirst(targetEntryCell);
|
||||
|
||||
if (tle->resno == resno)
|
||||
{
|
||||
return targetEntryIndex;
|
||||
}
|
||||
|
||||
targetEntryIndex++;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MultiRouterPlannableQuery returns true if given query can be router plannable.
|
||||
* The query is router plannable if it is a modify query, or if its is a select
|
||||
|
|
|
@ -110,6 +110,7 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState)
|
|||
ListCell *rteCell = NULL;
|
||||
ListCell *cteCell = NULL;
|
||||
Node *modifiedNode = NULL;
|
||||
bool isMultiRowInsert = false;
|
||||
|
||||
if (query->jointree && query->jointree->quals)
|
||||
{
|
||||
|
@ -117,22 +118,6 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState)
|
|||
planState);
|
||||
}
|
||||
|
||||
foreach(targetEntryCell, query->targetList)
|
||||
{
|
||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||
|
||||
/* performance optimization for the most common cases */
|
||||
if (IsA(targetEntry->expr, Const) || IsA(targetEntry->expr, Var))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
modifiedNode = PartiallyEvaluateExpression((Node *) targetEntry->expr,
|
||||
planState);
|
||||
|
||||
targetEntry->expr = (Expr *) modifiedNode;
|
||||
}
|
||||
|
||||
foreach(rteCell, query->rtable)
|
||||
{
|
||||
RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell);
|
||||
|
@ -144,6 +129,34 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState)
|
|||
else if (rte->rtekind == RTE_VALUES)
|
||||
{
|
||||
EvaluateValuesListsItems(rte->values_lists, planState);
|
||||
isMultiRowInsert = (query->commandType == CMD_INSERT);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* For multi-row INSERTs, functions are evaluated by expanding the
|
||||
* values_lists with the default expressions in the target list and
|
||||
* performing function evaluation on the values_lists. Expressions
|
||||
* in the target list should not be evaluated since they serve only
|
||||
* as templates and evaluating them would cause unexpected results
|
||||
* (e.g. sequences being called one more time).
|
||||
*/
|
||||
if (!isMultiRowInsert)
|
||||
{
|
||||
foreach(targetEntryCell, query->targetList)
|
||||
{
|
||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||
|
||||
/* performance optimization for the most common cases */
|
||||
if (IsA(targetEntry->expr, Const) || IsA(targetEntry->expr, Var))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
modifiedNode = PartiallyEvaluateExpression((Node *) targetEntry->expr,
|
||||
planState);
|
||||
|
||||
targetEntry->expr = (Expr *) modifiedNode;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ extern Oid ExtractFirstDistributedTableId(Query *query);
|
|||
extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
|
||||
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query);
|
||||
extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query);
|
||||
extern List * ExpandValuesLists(List *targetList, List *valuesLists);
|
||||
extern void AddShardIntervalRestrictionToSelect(Query *subqery,
|
||||
ShardInterval *shardInterval);
|
||||
|
||||
|
|
|
@ -690,6 +690,182 @@ INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *
|
|||
3 | 103 | Mynt
|
||||
(1 row)
|
||||
|
||||
-- Test multi-row insert with serial in the partition column
|
||||
INSERT INTO app_analytics_events (app_id, name)
|
||||
VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *;
|
||||
id | app_id | name
|
||||
----+--------+------
|
||||
5 | 105 | Mynt
|
||||
4 | 104 | Wayz
|
||||
(2 rows)
|
||||
|
||||
INSERT INTO app_analytics_events (id, name)
|
||||
VALUES (DEFAULT, 'Foo'), (300, 'Wah') RETURNING *;
|
||||
id | app_id | name
|
||||
-----+--------+------
|
||||
6 | | Foo
|
||||
300 | | Wah
|
||||
(2 rows)
|
||||
|
||||
PREPARE prep(varchar) AS
|
||||
INSERT INTO app_analytics_events (id, name)
|
||||
VALUES (DEFAULT, $1 || '.1'), (400 , $1 || '.2') RETURNING *;
|
||||
EXECUTE prep('version-1');
|
||||
id | app_id | name
|
||||
-----+--------+-------------
|
||||
7 | | version-1.1
|
||||
400 | | version-1.2
|
||||
(2 rows)
|
||||
|
||||
EXECUTE prep('version-2');
|
||||
id | app_id | name
|
||||
-----+--------+-------------
|
||||
8 | | version-2.1
|
||||
400 | | version-2.2
|
||||
(2 rows)
|
||||
|
||||
EXECUTE prep('version-3');
|
||||
id | app_id | name
|
||||
-----+--------+-------------
|
||||
400 | | version-3.2
|
||||
9 | | version-3.1
|
||||
(2 rows)
|
||||
|
||||
EXECUTE prep('version-4');
|
||||
id | app_id | name
|
||||
-----+--------+-------------
|
||||
10 | | version-4.1
|
||||
400 | | version-4.2
|
||||
(2 rows)
|
||||
|
||||
EXECUTE prep('version-5');
|
||||
id | app_id | name
|
||||
-----+--------+-------------
|
||||
400 | | version-5.2
|
||||
11 | | version-5.1
|
||||
(2 rows)
|
||||
|
||||
EXECUTE prep('version-6');
|
||||
id | app_id | name
|
||||
-----+--------+-------------
|
||||
400 | | version-6.2
|
||||
12 | | version-6.1
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id, name;
|
||||
id | app_id | name
|
||||
-----+--------+-----------------
|
||||
1 | 101 | Fauxkemon Geaux
|
||||
2 | 102 | Wayz
|
||||
3 | 103 | Mynt
|
||||
4 | 104 | Wayz
|
||||
5 | 105 | Mynt
|
||||
6 | | Foo
|
||||
7 | | version-1.1
|
||||
8 | | version-2.1
|
||||
9 | | version-3.1
|
||||
10 | | version-4.1
|
||||
11 | | version-5.1
|
||||
12 | | version-6.1
|
||||
300 | | Wah
|
||||
400 | | version-1.2
|
||||
400 | | version-2.2
|
||||
400 | | version-3.2
|
||||
400 | | version-4.2
|
||||
400 | | version-5.2
|
||||
400 | | version-6.2
|
||||
(19 rows)
|
||||
|
||||
TRUNCATE app_analytics_events;
|
||||
-- Test multi-row insert with a dropped column
|
||||
ALTER TABLE app_analytics_events DROP COLUMN app_id;
|
||||
INSERT INTO app_analytics_events (name)
|
||||
VALUES ('Wayz'), ('Mynt') RETURNING *;
|
||||
id | name
|
||||
----+------
|
||||
14 | Mynt
|
||||
13 | Wayz
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id;
|
||||
id | name
|
||||
----+------
|
||||
13 | Wayz
|
||||
14 | Mynt
|
||||
(2 rows)
|
||||
|
||||
DROP TABLE app_analytics_events;
|
||||
-- Test multi-row insert with a dropped column before the partition column
|
||||
CREATE TABLE app_analytics_events (id int default 3, app_id integer, name text);
|
||||
SELECT create_distributed_table('app_analytics_events', 'name');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE app_analytics_events DROP COLUMN app_id;
|
||||
INSERT INTO app_analytics_events (name)
|
||||
VALUES ('Wayz'), ('Mynt') RETURNING *;
|
||||
id | name
|
||||
----+------
|
||||
3 | Mynt
|
||||
3 | Wayz
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM app_analytics_events WHERE name = 'Wayz';
|
||||
id | name
|
||||
----+------
|
||||
3 | Wayz
|
||||
(1 row)
|
||||
|
||||
DROP TABLE app_analytics_events;
|
||||
-- Test multi-row insert with serial in a reference table
|
||||
CREATE TABLE app_analytics_events (id serial, app_id integer, name text);
|
||||
SELECT create_reference_table('app_analytics_events');
|
||||
create_reference_table
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO app_analytics_events (app_id, name)
|
||||
VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *;
|
||||
id | app_id | name
|
||||
----+--------+------
|
||||
1 | 104 | Wayz
|
||||
2 | 105 | Mynt
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id;
|
||||
id | app_id | name
|
||||
----+--------+------
|
||||
1 | 104 | Wayz
|
||||
2 | 105 | Mynt
|
||||
(2 rows)
|
||||
|
||||
DROP TABLE app_analytics_events;
|
||||
-- Test multi-row insert with serial in a non-partition column
|
||||
CREATE TABLE app_analytics_events (id int, app_id serial, name text);
|
||||
SELECT create_distributed_table('app_analytics_events', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO app_analytics_events (id, name)
|
||||
VALUES (99, 'Wayz'), (98, 'Mynt') RETURNING name, app_id;
|
||||
name | app_id
|
||||
------+--------
|
||||
Mynt | 2
|
||||
Wayz | 1
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id;
|
||||
id | app_id | name
|
||||
----+--------+------
|
||||
98 | 2 | Mynt
|
||||
99 | 1 | Wayz
|
||||
(2 rows)
|
||||
|
||||
DROP TABLE app_analytics_events;
|
||||
-- test UPDATE with subqueries
|
||||
CREATE TABLE raw_table (id bigint, value bigint);
|
||||
|
|
|
@ -447,6 +447,65 @@ INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURN
|
|||
INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id;
|
||||
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;
|
||||
|
||||
-- Test multi-row insert with serial in the partition column
|
||||
INSERT INTO app_analytics_events (app_id, name)
|
||||
VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *;
|
||||
|
||||
INSERT INTO app_analytics_events (id, name)
|
||||
VALUES (DEFAULT, 'Foo'), (300, 'Wah') RETURNING *;
|
||||
|
||||
PREPARE prep(varchar) AS
|
||||
INSERT INTO app_analytics_events (id, name)
|
||||
VALUES (DEFAULT, $1 || '.1'), (400 , $1 || '.2') RETURNING *;
|
||||
|
||||
EXECUTE prep('version-1');
|
||||
EXECUTE prep('version-2');
|
||||
EXECUTE prep('version-3');
|
||||
EXECUTE prep('version-4');
|
||||
EXECUTE prep('version-5');
|
||||
EXECUTE prep('version-6');
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id, name;
|
||||
TRUNCATE app_analytics_events;
|
||||
|
||||
-- Test multi-row insert with a dropped column
|
||||
ALTER TABLE app_analytics_events DROP COLUMN app_id;
|
||||
INSERT INTO app_analytics_events (name)
|
||||
VALUES ('Wayz'), ('Mynt') RETURNING *;
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id;
|
||||
DROP TABLE app_analytics_events;
|
||||
|
||||
-- Test multi-row insert with a dropped column before the partition column
|
||||
CREATE TABLE app_analytics_events (id int default 3, app_id integer, name text);
|
||||
SELECT create_distributed_table('app_analytics_events', 'name');
|
||||
|
||||
ALTER TABLE app_analytics_events DROP COLUMN app_id;
|
||||
|
||||
INSERT INTO app_analytics_events (name)
|
||||
VALUES ('Wayz'), ('Mynt') RETURNING *;
|
||||
|
||||
SELECT * FROM app_analytics_events WHERE name = 'Wayz';
|
||||
DROP TABLE app_analytics_events;
|
||||
|
||||
-- Test multi-row insert with serial in a reference table
|
||||
CREATE TABLE app_analytics_events (id serial, app_id integer, name text);
|
||||
SELECT create_reference_table('app_analytics_events');
|
||||
|
||||
INSERT INTO app_analytics_events (app_id, name)
|
||||
VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *;
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id;
|
||||
DROP TABLE app_analytics_events;
|
||||
|
||||
-- Test multi-row insert with serial in a non-partition column
|
||||
CREATE TABLE app_analytics_events (id int, app_id serial, name text);
|
||||
SELECT create_distributed_table('app_analytics_events', 'id');
|
||||
|
||||
INSERT INTO app_analytics_events (id, name)
|
||||
VALUES (99, 'Wayz'), (98, 'Mynt') RETURNING name, app_id;
|
||||
|
||||
SELECT * FROM app_analytics_events ORDER BY id;
|
||||
DROP TABLE app_analytics_events;
|
||||
|
||||
-- test UPDATE with subqueries
|
||||
|
|
Loading…
Reference in New Issue