From ae00795dab2a375eb24f39921a875e15a2aec764 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 24 Aug 2017 18:34:24 +0200 Subject: [PATCH] Allow default columns in multi-row INSERTs --- .../planner/multi_router_planner.c | 128 +++++++++++-- src/backend/distributed/utils/citus_clauses.c | 45 +++-- .../distributed/multi_router_planner.h | 1 + .../regress/expected/multi_modifications.out | 176 ++++++++++++++++++ src/test/regress/sql/multi_modifications.sql | 59 ++++++ 5 files changed, 377 insertions(+), 32 deletions(-) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 60c6d043c..c6b0b4a42 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -133,6 +133,7 @@ static List * WorkersContainingAllShards(List *prunedShardIntervalsList); static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError); static List * GroupInsertValuesByShardId(List *insertValuesList); static List * ExtractInsertValuesList(Query *query, Var *partitionColumn); +static int GetTargetListEntryIndexByResno(List *targetList, int resno); static bool MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext); static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree); @@ -1080,10 +1081,35 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann Job *job = NULL; bool requiresMasterEvaluation = false; bool deferredPruning = false; + bool isMultiRowInsert = false; - if (!CanShardPrune(distributedTableId, query)) + RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery); + if (valuesRTE != NULL) { - /* there is a non-constant in the partition column, cannot prune yet */ + /* + * We expand the values_lists to contain all default expressions + * from the target list. By doing this early on, in the original + * query, we can later evaluate default expressions for each + * individual row and then perform shard pruning. + */ + valuesRTE->values_lists = ExpandValuesLists(originalQuery->targetList, + valuesRTE->values_lists); + + isMultiRowInsert = true; + } + + if (isMultiRowInsert || !CanShardPrune(distributedTableId, query)) + { + /* + * If there is a non-constant (e.g. parameter, function call) in the partition + * column of the INSERT then we defer shard pruning until the executor where + * these values are known. + * + * XXX: We also defer pruning for multi-row INSERTs because of some current + * limitations with the way multi-row INSERTs are handled. Most notably, we + * don't evaluate functions in task->rowValuesList. Therefore we need to + * perform function evaluation before we can run RouterInsertTaskList. + */ taskList = NIL; deferredPruning = true; @@ -2030,6 +2056,50 @@ ExtractDistributedInsertValuesRTE(Query *query) } +/* + * ExpandValuesLists expands VALUES lists by building new lists + * that include expressions from the target list for columns + * with a default expression. + */ +List * +ExpandValuesLists(List *targetList, List *valuesLists) +{ + ListCell *valuesListCell = NULL; + List *expandedValuesLists = NIL; + ListCell *targetEntryCell = NULL; + + foreach(valuesListCell, valuesLists) + { + List *valuesList = (List *) lfirst(valuesListCell); + List *expandedValuesList = NIL; + + foreach(targetEntryCell, targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + Expr *targetExpr = targetEntry->expr; + + if (IsA(targetExpr, Var)) + { + /* expression from the VALUES section */ + Var *targetListVar = (Var *) targetExpr; + targetExpr = list_nth(valuesList, targetListVar->varattno - 1); + } + else + { + /* copy the column's default expression */ + targetExpr = copyObject(targetExpr); + } + + expandedValuesList = lappend(expandedValuesList, targetExpr); + } + + expandedValuesLists = lappend(expandedValuesLists, expandedValuesList); + } + + return expandedValuesLists; +} + + /* * IntersectPlacementList performs placement pruning based on matching on * nodeName:nodePort fields of shard placement data. We start pruning from all @@ -2138,10 +2208,11 @@ static List * ExtractInsertValuesList(Query *query, Var *partitionColumn) { List *insertValuesList = NIL; - TargetEntry *targetEntry = get_tle_by_resno(query->targetList, - partitionColumn->varattno); + RangeTblEntry *valuesRTE = NULL; - if (targetEntry == NULL) + int partitionColumnIndex = GetTargetListEntryIndexByResno(query->targetList, + partitionColumn->varattno); + if (partitionColumnIndex == -1) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg("cannot perform an INSERT without a partition column " @@ -2150,25 +2221,20 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn) /* * We've got a multi-row INSERT. PostgreSQL internally represents such - * commands by linking Vars in the target list to lists of values within - * a special VALUES range table entry. By extracting the right positional - * expression from each list within that RTE, we will extract the partition - * values for each row within the multi-row INSERT. + * commands with a special VALUES range table entry. */ - if (IsA(targetEntry->expr, Var)) + valuesRTE = ExtractDistributedInsertValuesRTE(query); + if (valuesRTE != NULL) { - Var *partitionVar = (Var *) targetEntry->expr; - RangeTblEntry *referencedRTE = NULL; ListCell *valuesListCell = NULL; Index ivIndex = 0; - referencedRTE = rt_fetch(partitionVar->varno, query->rtable); - foreach(valuesListCell, referencedRTE->values_lists) + foreach(valuesListCell, valuesRTE->values_lists) { InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues)); insertValues->rowValues = (List *) lfirst(valuesListCell); insertValues->partitionValueExpr = list_nth(insertValues->rowValues, - (partitionVar->varattno - 1)); + partitionColumnIndex); insertValues->shardId = INVALID_SHARD_ID; insertValues->listIndex = ivIndex; @@ -2180,9 +2246,12 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn) /* nothing's been found yet; this is a simple single-row INSERT */ if (insertValuesList == NIL) { + TargetEntry *partitionColumnTargetEntry = list_nth(query->targetList, + partitionColumnIndex); + InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues)); insertValues->rowValues = NIL; - insertValues->partitionValueExpr = targetEntry->expr; + insertValues->partitionValueExpr = partitionColumnTargetEntry->expr; insertValues->shardId = INVALID_SHARD_ID; insertValuesList = lappend(insertValuesList, insertValues); @@ -2192,6 +2261,33 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn) } +/* + * GetTargetListEntryIndexByResno is the equivalent of get_tle_by_resno + * but returns the index in the target list instead of the TargetEntry + * itself, or -1 if it cannot be found. + */ +static int +GetTargetListEntryIndexByResno(List *targetList, int resno) +{ + ListCell *targetEntryCell = NULL; + int targetEntryIndex = 0; + + foreach(targetEntryCell, targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(targetEntryCell); + + if (tle->resno == resno) + { + return targetEntryIndex; + } + + targetEntryIndex++; + } + + return -1; +} + + /* * MultiRouterPlannableQuery returns true if given query can be router plannable. * The query is router plannable if it is a modify query, or if its is a select diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 79e5268b1..5c14a8c43 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -110,6 +110,7 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) ListCell *rteCell = NULL; ListCell *cteCell = NULL; Node *modifiedNode = NULL; + bool isMultiRowInsert = false; if (query->jointree && query->jointree->quals) { @@ -117,22 +118,6 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) planState); } - foreach(targetEntryCell, query->targetList) - { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - - /* performance optimization for the most common cases */ - if (IsA(targetEntry->expr, Const) || IsA(targetEntry->expr, Var)) - { - continue; - } - - modifiedNode = PartiallyEvaluateExpression((Node *) targetEntry->expr, - planState); - - targetEntry->expr = (Expr *) modifiedNode; - } - foreach(rteCell, query->rtable) { RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell); @@ -144,6 +129,34 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) else if (rte->rtekind == RTE_VALUES) { EvaluateValuesListsItems(rte->values_lists, planState); + isMultiRowInsert = (query->commandType == CMD_INSERT); + } + } + + /* + * For multi-row INSERTs, functions are evaluated by expanding the + * values_lists with the default expressions in the target list and + * performing function evaluation on the values_lists. Expressions + * in the target list should not be evaluated since they serve only + * as templates and evaluating them would cause unexpected results + * (e.g. sequences being called one more time). + */ + if (!isMultiRowInsert) + { + foreach(targetEntryCell, query->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + /* performance optimization for the most common cases */ + if (IsA(targetEntry->expr, Const) || IsA(targetEntry->expr, Var)) + { + continue; + } + + modifiedNode = PartiallyEvaluateExpression((Node *) targetEntry->expr, + planState); + + targetEntry->expr = (Expr *) modifiedNode; } } diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 3ff9f048a..c91aa6f97 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -49,6 +49,7 @@ extern Oid ExtractFirstDistributedTableId(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query); +extern List * ExpandValuesLists(List *targetList, List *valuesLists); extern void AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval); diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 0d235119a..a4a00ed67 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -690,6 +690,182 @@ INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING * 3 | 103 | Mynt (1 row) +-- Test multi-row insert with serial in the partition column +INSERT INTO app_analytics_events (app_id, name) +VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; + id | app_id | name +----+--------+------ + 5 | 105 | Mynt + 4 | 104 | Wayz +(2 rows) + +INSERT INTO app_analytics_events (id, name) +VALUES (DEFAULT, 'Foo'), (300, 'Wah') RETURNING *; + id | app_id | name +-----+--------+------ + 6 | | Foo + 300 | | Wah +(2 rows) + +PREPARE prep(varchar) AS +INSERT INTO app_analytics_events (id, name) +VALUES (DEFAULT, $1 || '.1'), (400 , $1 || '.2') RETURNING *; +EXECUTE prep('version-1'); + id | app_id | name +-----+--------+------------- + 7 | | version-1.1 + 400 | | version-1.2 +(2 rows) + +EXECUTE prep('version-2'); + id | app_id | name +-----+--------+------------- + 8 | | version-2.1 + 400 | | version-2.2 +(2 rows) + +EXECUTE prep('version-3'); + id | app_id | name +-----+--------+------------- + 400 | | version-3.2 + 9 | | version-3.1 +(2 rows) + +EXECUTE prep('version-4'); + id | app_id | name +-----+--------+------------- + 10 | | version-4.1 + 400 | | version-4.2 +(2 rows) + +EXECUTE prep('version-5'); + id | app_id | name +-----+--------+------------- + 400 | | version-5.2 + 11 | | version-5.1 +(2 rows) + +EXECUTE prep('version-6'); + id | app_id | name +-----+--------+------------- + 400 | | version-6.2 + 12 | | version-6.1 +(2 rows) + +SELECT * FROM app_analytics_events ORDER BY id, name; + id | app_id | name +-----+--------+----------------- + 1 | 101 | Fauxkemon Geaux + 2 | 102 | Wayz + 3 | 103 | Mynt + 4 | 104 | Wayz + 5 | 105 | Mynt + 6 | | Foo + 7 | | version-1.1 + 8 | | version-2.1 + 9 | | version-3.1 + 10 | | version-4.1 + 11 | | version-5.1 + 12 | | version-6.1 + 300 | | Wah + 400 | | version-1.2 + 400 | | version-2.2 + 400 | | version-3.2 + 400 | | version-4.2 + 400 | | version-5.2 + 400 | | version-6.2 +(19 rows) + +TRUNCATE app_analytics_events; +-- Test multi-row insert with a dropped column +ALTER TABLE app_analytics_events DROP COLUMN app_id; +INSERT INTO app_analytics_events (name) +VALUES ('Wayz'), ('Mynt') RETURNING *; + id | name +----+------ + 14 | Mynt + 13 | Wayz +(2 rows) + +SELECT * FROM app_analytics_events ORDER BY id; + id | name +----+------ + 13 | Wayz + 14 | Mynt +(2 rows) + +DROP TABLE app_analytics_events; +-- Test multi-row insert with a dropped column before the partition column +CREATE TABLE app_analytics_events (id int default 3, app_id integer, name text); +SELECT create_distributed_table('app_analytics_events', 'name'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE app_analytics_events DROP COLUMN app_id; +INSERT INTO app_analytics_events (name) +VALUES ('Wayz'), ('Mynt') RETURNING *; + id | name +----+------ + 3 | Mynt + 3 | Wayz +(2 rows) + +SELECT * FROM app_analytics_events WHERE name = 'Wayz'; + id | name +----+------ + 3 | Wayz +(1 row) + +DROP TABLE app_analytics_events; +-- Test multi-row insert with serial in a reference table +CREATE TABLE app_analytics_events (id serial, app_id integer, name text); +SELECT create_reference_table('app_analytics_events'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO app_analytics_events (app_id, name) +VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; + id | app_id | name +----+--------+------ + 1 | 104 | Wayz + 2 | 105 | Mynt +(2 rows) + +SELECT * FROM app_analytics_events ORDER BY id; + id | app_id | name +----+--------+------ + 1 | 104 | Wayz + 2 | 105 | Mynt +(2 rows) + +DROP TABLE app_analytics_events; +-- Test multi-row insert with serial in a non-partition column +CREATE TABLE app_analytics_events (id int, app_id serial, name text); +SELECT create_distributed_table('app_analytics_events', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO app_analytics_events (id, name) +VALUES (99, 'Wayz'), (98, 'Mynt') RETURNING name, app_id; + name | app_id +------+-------- + Mynt | 2 + Wayz | 1 +(2 rows) + +SELECT * FROM app_analytics_events ORDER BY id; + id | app_id | name +----+--------+------ + 98 | 2 | Mynt + 99 | 1 | Wayz +(2 rows) + DROP TABLE app_analytics_events; -- test UPDATE with subqueries CREATE TABLE raw_table (id bigint, value bigint); diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 9828c765a..42b5a36ae 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -447,6 +447,65 @@ INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURN INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id; INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *; +-- Test multi-row insert with serial in the partition column +INSERT INTO app_analytics_events (app_id, name) +VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; + +INSERT INTO app_analytics_events (id, name) +VALUES (DEFAULT, 'Foo'), (300, 'Wah') RETURNING *; + +PREPARE prep(varchar) AS +INSERT INTO app_analytics_events (id, name) +VALUES (DEFAULT, $1 || '.1'), (400 , $1 || '.2') RETURNING *; + +EXECUTE prep('version-1'); +EXECUTE prep('version-2'); +EXECUTE prep('version-3'); +EXECUTE prep('version-4'); +EXECUTE prep('version-5'); +EXECUTE prep('version-6'); + +SELECT * FROM app_analytics_events ORDER BY id, name; +TRUNCATE app_analytics_events; + +-- Test multi-row insert with a dropped column +ALTER TABLE app_analytics_events DROP COLUMN app_id; +INSERT INTO app_analytics_events (name) +VALUES ('Wayz'), ('Mynt') RETURNING *; + +SELECT * FROM app_analytics_events ORDER BY id; +DROP TABLE app_analytics_events; + +-- Test multi-row insert with a dropped column before the partition column +CREATE TABLE app_analytics_events (id int default 3, app_id integer, name text); +SELECT create_distributed_table('app_analytics_events', 'name'); + +ALTER TABLE app_analytics_events DROP COLUMN app_id; + +INSERT INTO app_analytics_events (name) +VALUES ('Wayz'), ('Mynt') RETURNING *; + +SELECT * FROM app_analytics_events WHERE name = 'Wayz'; +DROP TABLE app_analytics_events; + +-- Test multi-row insert with serial in a reference table +CREATE TABLE app_analytics_events (id serial, app_id integer, name text); +SELECT create_reference_table('app_analytics_events'); + +INSERT INTO app_analytics_events (app_id, name) +VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; + +SELECT * FROM app_analytics_events ORDER BY id; +DROP TABLE app_analytics_events; + +-- Test multi-row insert with serial in a non-partition column +CREATE TABLE app_analytics_events (id int, app_id serial, name text); +SELECT create_distributed_table('app_analytics_events', 'id'); + +INSERT INTO app_analytics_events (id, name) +VALUES (99, 'Wayz'), (98, 'Mynt') RETURNING name, app_id; + +SELECT * FROM app_analytics_events ORDER BY id; DROP TABLE app_analytics_events; -- test UPDATE with subqueries