From ae00795dab2a375eb24f39921a875e15a2aec764 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 24 Aug 2017 18:34:24 +0200 Subject: [PATCH 1/3] Allow default columns in multi-row INSERTs --- .../planner/multi_router_planner.c | 128 +++++++++++-- src/backend/distributed/utils/citus_clauses.c | 45 +++-- .../distributed/multi_router_planner.h | 1 + .../regress/expected/multi_modifications.out | 176 ++++++++++++++++++ src/test/regress/sql/multi_modifications.sql | 59 ++++++ 5 files changed, 377 insertions(+), 32 deletions(-) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 60c6d043c..c6b0b4a42 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -133,6 +133,7 @@ static List * WorkersContainingAllShards(List *prunedShardIntervalsList); static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError); static List * GroupInsertValuesByShardId(List *insertValuesList); static List * ExtractInsertValuesList(Query *query, Var *partitionColumn); +static int GetTargetListEntryIndexByResno(List *targetList, int resno); static bool MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext); static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree); @@ -1080,10 +1081,35 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann Job *job = NULL; bool requiresMasterEvaluation = false; bool deferredPruning = false; + bool isMultiRowInsert = false; - if (!CanShardPrune(distributedTableId, query)) + RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery); + if (valuesRTE != NULL) { - /* there is a non-constant in the partition column, cannot prune yet */ + /* + * We expand the values_lists to contain all default expressions + * from the target list. By doing this early on, in the original + * query, we can later evaluate default expressions for each + * individual row and then perform shard pruning. + */ + valuesRTE->values_lists = ExpandValuesLists(originalQuery->targetList, + valuesRTE->values_lists); + + isMultiRowInsert = true; + } + + if (isMultiRowInsert || !CanShardPrune(distributedTableId, query)) + { + /* + * If there is a non-constant (e.g. parameter, function call) in the partition + * column of the INSERT then we defer shard pruning until the executor where + * these values are known. + * + * XXX: We also defer pruning for multi-row INSERTs because of some current + * limitations with the way multi-row INSERTs are handled. Most notably, we + * don't evaluate functions in task->rowValuesList. Therefore we need to + * perform function evaluation before we can run RouterInsertTaskList. + */ taskList = NIL; deferredPruning = true; @@ -2030,6 +2056,50 @@ ExtractDistributedInsertValuesRTE(Query *query) } +/* + * ExpandValuesLists expands VALUES lists by building new lists + * that include expressions from the target list for columns + * with a default expression. + */ +List * +ExpandValuesLists(List *targetList, List *valuesLists) +{ + ListCell *valuesListCell = NULL; + List *expandedValuesLists = NIL; + ListCell *targetEntryCell = NULL; + + foreach(valuesListCell, valuesLists) + { + List *valuesList = (List *) lfirst(valuesListCell); + List *expandedValuesList = NIL; + + foreach(targetEntryCell, targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + Expr *targetExpr = targetEntry->expr; + + if (IsA(targetExpr, Var)) + { + /* expression from the VALUES section */ + Var *targetListVar = (Var *) targetExpr; + targetExpr = list_nth(valuesList, targetListVar->varattno - 1); + } + else + { + /* copy the column's default expression */ + targetExpr = copyObject(targetExpr); + } + + expandedValuesList = lappend(expandedValuesList, targetExpr); + } + + expandedValuesLists = lappend(expandedValuesLists, expandedValuesList); + } + + return expandedValuesLists; +} + + /* * IntersectPlacementList performs placement pruning based on matching on * nodeName:nodePort fields of shard placement data. We start pruning from all @@ -2138,10 +2208,11 @@ static List * ExtractInsertValuesList(Query *query, Var *partitionColumn) { List *insertValuesList = NIL; - TargetEntry *targetEntry = get_tle_by_resno(query->targetList, - partitionColumn->varattno); + RangeTblEntry *valuesRTE = NULL; - if (targetEntry == NULL) + int partitionColumnIndex = GetTargetListEntryIndexByResno(query->targetList, + partitionColumn->varattno); + if (partitionColumnIndex == -1) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg("cannot perform an INSERT without a partition column " @@ -2150,25 +2221,20 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn) /* * We've got a multi-row INSERT. PostgreSQL internally represents such - * commands by linking Vars in the target list to lists of values within - * a special VALUES range table entry. By extracting the right positional - * expression from each list within that RTE, we will extract the partition - * values for each row within the multi-row INSERT. + * commands with a special VALUES range table entry. */ - if (IsA(targetEntry->expr, Var)) + valuesRTE = ExtractDistributedInsertValuesRTE(query); + if (valuesRTE != NULL) { - Var *partitionVar = (Var *) targetEntry->expr; - RangeTblEntry *referencedRTE = NULL; ListCell *valuesListCell = NULL; Index ivIndex = 0; - referencedRTE = rt_fetch(partitionVar->varno, query->rtable); - foreach(valuesListCell, referencedRTE->values_lists) + foreach(valuesListCell, valuesRTE->values_lists) { InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues)); insertValues->rowValues = (List *) lfirst(valuesListCell); insertValues->partitionValueExpr = list_nth(insertValues->rowValues, - (partitionVar->varattno - 1)); + partitionColumnIndex); insertValues->shardId = INVALID_SHARD_ID; insertValues->listIndex = ivIndex; @@ -2180,9 +2246,12 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn) /* nothing's been found yet; this is a simple single-row INSERT */ if (insertValuesList == NIL) { + TargetEntry *partitionColumnTargetEntry = list_nth(query->targetList, + partitionColumnIndex); + InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues)); insertValues->rowValues = NIL; - insertValues->partitionValueExpr = targetEntry->expr; + insertValues->partitionValueExpr = partitionColumnTargetEntry->expr; insertValues->shardId = INVALID_SHARD_ID; insertValuesList = lappend(insertValuesList, insertValues); @@ -2192,6 +2261,33 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn) } +/* + * GetTargetListEntryIndexByResno is the equivalent of get_tle_by_resno + * but returns the index in the target list instead of the TargetEntry + * itself, or -1 if it cannot be found. + */ +static int +GetTargetListEntryIndexByResno(List *targetList, int resno) +{ + ListCell *targetEntryCell = NULL; + int targetEntryIndex = 0; + + foreach(targetEntryCell, targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(targetEntryCell); + + if (tle->resno == resno) + { + return targetEntryIndex; + } + + targetEntryIndex++; + } + + return -1; +} + + /* * MultiRouterPlannableQuery returns true if given query can be router plannable. * The query is router plannable if it is a modify query, or if its is a select diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 79e5268b1..5c14a8c43 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -110,6 +110,7 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) ListCell *rteCell = NULL; ListCell *cteCell = NULL; Node *modifiedNode = NULL; + bool isMultiRowInsert = false; if (query->jointree && query->jointree->quals) { @@ -117,22 +118,6 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) planState); } - foreach(targetEntryCell, query->targetList) - { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - - /* performance optimization for the most common cases */ - if (IsA(targetEntry->expr, Const) || IsA(targetEntry->expr, Var)) - { - continue; - } - - modifiedNode = PartiallyEvaluateExpression((Node *) targetEntry->expr, - planState); - - targetEntry->expr = (Expr *) modifiedNode; - } - foreach(rteCell, query->rtable) { RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell); @@ -144,6 +129,34 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) else if (rte->rtekind == RTE_VALUES) { EvaluateValuesListsItems(rte->values_lists, planState); + isMultiRowInsert = (query->commandType == CMD_INSERT); + } + } + + /* + * For multi-row INSERTs, functions are evaluated by expanding the + * values_lists with the default expressions in the target list and + * performing function evaluation on the values_lists. Expressions + * in the target list should not be evaluated since they serve only + * as templates and evaluating them would cause unexpected results + * (e.g. sequences being called one more time). + */ + if (!isMultiRowInsert) + { + foreach(targetEntryCell, query->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + /* performance optimization for the most common cases */ + if (IsA(targetEntry->expr, Const) || IsA(targetEntry->expr, Var)) + { + continue; + } + + modifiedNode = PartiallyEvaluateExpression((Node *) targetEntry->expr, + planState); + + targetEntry->expr = (Expr *) modifiedNode; } } diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 3ff9f048a..c91aa6f97 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -49,6 +49,7 @@ extern Oid ExtractFirstDistributedTableId(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query); +extern List * ExpandValuesLists(List *targetList, List *valuesLists); extern void AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval); diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 0d235119a..a4a00ed67 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -690,6 +690,182 @@ INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING * 3 | 103 | Mynt (1 row) +-- Test multi-row insert with serial in the partition column +INSERT INTO app_analytics_events (app_id, name) +VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; + id | app_id | name +----+--------+------ + 5 | 105 | Mynt + 4 | 104 | Wayz +(2 rows) + +INSERT INTO app_analytics_events (id, name) +VALUES (DEFAULT, 'Foo'), (300, 'Wah') RETURNING *; + id | app_id | name +-----+--------+------ + 6 | | Foo + 300 | | Wah +(2 rows) + +PREPARE prep(varchar) AS +INSERT INTO app_analytics_events (id, name) +VALUES (DEFAULT, $1 || '.1'), (400 , $1 || '.2') RETURNING *; +EXECUTE prep('version-1'); + id | app_id | name +-----+--------+------------- + 7 | | version-1.1 + 400 | | version-1.2 +(2 rows) + +EXECUTE prep('version-2'); + id | app_id | name +-----+--------+------------- + 8 | | version-2.1 + 400 | | version-2.2 +(2 rows) + +EXECUTE prep('version-3'); + id | app_id | name +-----+--------+------------- + 400 | | version-3.2 + 9 | | version-3.1 +(2 rows) + +EXECUTE prep('version-4'); + id | app_id | name +-----+--------+------------- + 10 | | version-4.1 + 400 | | version-4.2 +(2 rows) + +EXECUTE prep('version-5'); + id | app_id | name +-----+--------+------------- + 400 | | version-5.2 + 11 | | version-5.1 +(2 rows) + +EXECUTE prep('version-6'); + id | app_id | name +-----+--------+------------- + 400 | | version-6.2 + 12 | | version-6.1 +(2 rows) + +SELECT * FROM app_analytics_events ORDER BY id, name; + id | app_id | name +-----+--------+----------------- + 1 | 101 | Fauxkemon Geaux + 2 | 102 | Wayz + 3 | 103 | Mynt + 4 | 104 | Wayz + 5 | 105 | Mynt + 6 | | Foo + 7 | | version-1.1 + 8 | | version-2.1 + 9 | | version-3.1 + 10 | | version-4.1 + 11 | | version-5.1 + 12 | | version-6.1 + 300 | | Wah + 400 | | version-1.2 + 400 | | version-2.2 + 400 | | version-3.2 + 400 | | version-4.2 + 400 | | version-5.2 + 400 | | version-6.2 +(19 rows) + +TRUNCATE app_analytics_events; +-- Test multi-row insert with a dropped column +ALTER TABLE app_analytics_events DROP COLUMN app_id; +INSERT INTO app_analytics_events (name) +VALUES ('Wayz'), ('Mynt') RETURNING *; + id | name +----+------ + 14 | Mynt + 13 | Wayz +(2 rows) + +SELECT * FROM app_analytics_events ORDER BY id; + id | name +----+------ + 13 | Wayz + 14 | Mynt +(2 rows) + +DROP TABLE app_analytics_events; +-- Test multi-row insert with a dropped column before the partition column +CREATE TABLE app_analytics_events (id int default 3, app_id integer, name text); +SELECT create_distributed_table('app_analytics_events', 'name'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE app_analytics_events DROP COLUMN app_id; +INSERT INTO app_analytics_events (name) +VALUES ('Wayz'), ('Mynt') RETURNING *; + id | name +----+------ + 3 | Mynt + 3 | Wayz +(2 rows) + +SELECT * FROM app_analytics_events WHERE name = 'Wayz'; + id | name +----+------ + 3 | Wayz +(1 row) + +DROP TABLE app_analytics_events; +-- Test multi-row insert with serial in a reference table +CREATE TABLE app_analytics_events (id serial, app_id integer, name text); +SELECT create_reference_table('app_analytics_events'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO app_analytics_events (app_id, name) +VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; + id | app_id | name +----+--------+------ + 1 | 104 | Wayz + 2 | 105 | Mynt +(2 rows) + +SELECT * FROM app_analytics_events ORDER BY id; + id | app_id | name +----+--------+------ + 1 | 104 | Wayz + 2 | 105 | Mynt +(2 rows) + +DROP TABLE app_analytics_events; +-- Test multi-row insert with serial in a non-partition column +CREATE TABLE app_analytics_events (id int, app_id serial, name text); +SELECT create_distributed_table('app_analytics_events', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO app_analytics_events (id, name) +VALUES (99, 'Wayz'), (98, 'Mynt') RETURNING name, app_id; + name | app_id +------+-------- + Mynt | 2 + Wayz | 1 +(2 rows) + +SELECT * FROM app_analytics_events ORDER BY id; + id | app_id | name +----+--------+------ + 98 | 2 | Mynt + 99 | 1 | Wayz +(2 rows) + DROP TABLE app_analytics_events; -- test UPDATE with subqueries CREATE TABLE raw_table (id bigint, value bigint); diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 9828c765a..42b5a36ae 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -447,6 +447,65 @@ INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURN INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id; INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *; +-- Test multi-row insert with serial in the partition column +INSERT INTO app_analytics_events (app_id, name) +VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; + +INSERT INTO app_analytics_events (id, name) +VALUES (DEFAULT, 'Foo'), (300, 'Wah') RETURNING *; + +PREPARE prep(varchar) AS +INSERT INTO app_analytics_events (id, name) +VALUES (DEFAULT, $1 || '.1'), (400 , $1 || '.2') RETURNING *; + +EXECUTE prep('version-1'); +EXECUTE prep('version-2'); +EXECUTE prep('version-3'); +EXECUTE prep('version-4'); +EXECUTE prep('version-5'); +EXECUTE prep('version-6'); + +SELECT * FROM app_analytics_events ORDER BY id, name; +TRUNCATE app_analytics_events; + +-- Test multi-row insert with a dropped column +ALTER TABLE app_analytics_events DROP COLUMN app_id; +INSERT INTO app_analytics_events (name) +VALUES ('Wayz'), ('Mynt') RETURNING *; + +SELECT * FROM app_analytics_events ORDER BY id; +DROP TABLE app_analytics_events; + +-- Test multi-row insert with a dropped column before the partition column +CREATE TABLE app_analytics_events (id int default 3, app_id integer, name text); +SELECT create_distributed_table('app_analytics_events', 'name'); + +ALTER TABLE app_analytics_events DROP COLUMN app_id; + +INSERT INTO app_analytics_events (name) +VALUES ('Wayz'), ('Mynt') RETURNING *; + +SELECT * FROM app_analytics_events WHERE name = 'Wayz'; +DROP TABLE app_analytics_events; + +-- Test multi-row insert with serial in a reference table +CREATE TABLE app_analytics_events (id serial, app_id integer, name text); +SELECT create_reference_table('app_analytics_events'); + +INSERT INTO app_analytics_events (app_id, name) +VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; + +SELECT * FROM app_analytics_events ORDER BY id; +DROP TABLE app_analytics_events; + +-- Test multi-row insert with serial in a non-partition column +CREATE TABLE app_analytics_events (id int, app_id serial, name text); +SELECT create_distributed_table('app_analytics_events', 'id'); + +INSERT INTO app_analytics_events (id, name) +VALUES (99, 'Wayz'), (98, 'Mynt') RETURNING name, app_id; + +SELECT * FROM app_analytics_events ORDER BY id; DROP TABLE app_analytics_events; -- test UPDATE with subqueries From 1920390688943a32222e084a5024521431a1ea15 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 25 Aug 2017 10:55:38 +0200 Subject: [PATCH 2/3] Multi-row INSERTs no longer throw errors in isolation tests --- src/test/regress/expected/isolation_insert_vs_all.out | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/test/regress/expected/isolation_insert_vs_all.out b/src/test/regress/expected/isolation_insert_vs_all.out index 109ff3ea5..cb3dbd6d6 100644 --- a/src/test/regress/expected/isolation_insert_vs_all.out +++ b/src/test/regress/expected/isolation_insert_vs_all.out @@ -687,15 +687,14 @@ create_distributed_table step s1-initialize: COPY insert_hash FROM PROGRAM 'echo 0, a\\n1, b\\n2, c\\n3, d\\n4, e' WITH CSV; step s1-ddl-add-column: ALTER TABLE insert_hash ADD new_column int DEFAULT 0; step s1-begin: BEGIN; -WARNING: INSERT has more target columns than expressions step s1-insert-multi-row: INSERT INTO insert_hash VALUES(7, 'k'), (8, 'l'), (9, 'm'); -ERROR: could not modify any active placements -step s2-ddl-drop-column: ALTER TABLE insert_hash DROP new_column; +step s2-ddl-drop-column: ALTER TABLE insert_hash DROP new_column; step s1-commit: COMMIT; +step s2-ddl-drop-column: <... completed> step s1-select-count: SELECT COUNT(*) FROM insert_hash; count -5 +8 step s1-show-columns: SELECT run_command_on_workers('SELECT column_name FROM information_schema.columns WHERE table_name LIKE ''insert_hash%'' AND column_name = ''new_column'' ORDER BY 1 LIMIT 1'); run_command_on_workers @@ -897,13 +896,11 @@ step s1-begin: BEGIN; step s1-ddl-add-column: ALTER TABLE insert_hash ADD new_column int DEFAULT 0; step s2-insert-multi-row: INSERT INTO insert_hash VALUES(7, 'k'), (8, 'l'), (9, 'm'); step s1-commit: COMMIT; -WARNING: INSERT has more target columns than expressions step s2-insert-multi-row: <... completed> -error in steps s1-commit s2-insert-multi-row: ERROR: could not modify any active placements step s1-select-count: SELECT COUNT(*) FROM insert_hash; count -5 +8 step s1-show-columns: SELECT run_command_on_workers('SELECT column_name FROM information_schema.columns WHERE table_name LIKE ''insert_hash%'' AND column_name = ''new_column'' ORDER BY 1 LIMIT 1'); run_command_on_workers From 0aadbb17605ef9d3523f16152bfcdcc73039d25a Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 25 Aug 2017 10:41:28 +0200 Subject: [PATCH 3/3] Convert multi-row INSERT target list to Vars --- .../distributed/executor/multi_executor.c | 15 -- .../planner/multi_router_planner.c | 163 +++++++++++------- src/backend/distributed/utils/citus_clauses.c | 45 ++--- .../distributed/multi_router_planner.h | 2 +- 4 files changed, 115 insertions(+), 110 deletions(-) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index d83b7a3e0..207c89748 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -100,7 +100,6 @@ static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = { static void PrepareMasterJobDirectory(Job *workerJob); static void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); static Relation StubRelation(TupleDesc tupleDescriptor); -static bool IsMultiRowInsert(Query *query); /* @@ -195,20 +194,6 @@ RouterCreateScan(CustomScan *scan) } -/* - * IsMultiRowInsert returns whether the given query is a multi-row INSERT. - * - * It does this by determining whether the query is an INSERT that has an - * RTE_VALUES. Single-row INSERTs will have their RTE_VALUES optimised away - * in transformInsertStmt, and instead use the target list. - */ -static bool -IsMultiRowInsert(Query *query) -{ - return ExtractDistributedInsertValuesRTE(query) != NULL; -} - - /* * CoordinatorInsertSelectCrateScan creates the scan state for executing * INSERT..SELECT into a distributed table via the coordinator. diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index c6b0b4a42..1013a214a 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -130,10 +130,10 @@ static List * TargetShardIntervalsForRouter(Query *query, RelationRestrictionContext *restrictionContext, bool *multiShardQuery); static List * WorkersContainingAllShards(List *prunedShardIntervalsList); +static void NormalizeMultiRowInsertTargetList(Query *query); static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError); static List * GroupInsertValuesByShardId(List *insertValuesList); static List * ExtractInsertValuesList(Query *query, Var *partitionColumn); -static int GetTargetListEntryIndexByResno(List *targetList, int resno); static bool MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionContext); static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree); @@ -1081,21 +1081,12 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann Job *job = NULL; bool requiresMasterEvaluation = false; bool deferredPruning = false; - bool isMultiRowInsert = false; - RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(originalQuery); - if (valuesRTE != NULL) + bool isMultiRowInsert = IsMultiRowInsert(query); + if (isMultiRowInsert) { - /* - * We expand the values_lists to contain all default expressions - * from the target list. By doing this early on, in the original - * query, we can later evaluate default expressions for each - * individual row and then perform shard pruning. - */ - valuesRTE->values_lists = ExpandValuesLists(originalQuery->targetList, - valuesRTE->values_lists); - - isMultiRowInsert = true; + /* add default expressions to RTE_VALUES in multi-row INSERTs */ + NormalizeMultiRowInsertTargetList(originalQuery); } if (isMultiRowInsert || !CanShardPrune(distributedTableId, query)) @@ -2023,6 +2014,20 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError) } +/* + * IsMultiRowInsert returns whether the given query is a multi-row INSERT. + * + * It does this by determining whether the query is an INSERT that has an + * RTE_VALUES. Single-row INSERTs will have their RTE_VALUES optimised away + * in transformInsertStmt, and instead use the target list. + */ +bool +IsMultiRowInsert(Query *query) +{ + return ExtractDistributedInsertValuesRTE(query) != NULL; +} + + /* * ExtractDistributedInsertValuesRTE does precisely that. If the provided * query is not an INSERT, or if the INSERT does not have a VALUES RTE @@ -2057,23 +2062,34 @@ ExtractDistributedInsertValuesRTE(Query *query) /* - * ExpandValuesLists expands VALUES lists by building new lists - * that include expressions from the target list for columns - * with a default expression. + * NormalizeMultiRowInsertTargetList ensures all elements of multi-row INSERT target + * lists are Vars. In multi-row INSERTs, most target list entries contain a Var + * expression pointing to a position within the values_lists field of a VALUES + * RTE, but non-NULL default columns are handled differently. Instead of adding + * the default expression to each row, a single expression encoding the DEFAULT + * appears in the target list. For consistency, we move these expressions into + * values lists and replace them with an appropriately constructed Var. */ -List * -ExpandValuesLists(List *targetList, List *valuesLists) +static void +NormalizeMultiRowInsertTargetList(Query *query) { ListCell *valuesListCell = NULL; - List *expandedValuesLists = NIL; ListCell *targetEntryCell = NULL; + int targetEntryNo = 0; - foreach(valuesListCell, valuesLists) + RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(query); + if (valuesRTE == NULL) + { + return; + } + + foreach(valuesListCell, valuesRTE->values_lists) { List *valuesList = (List *) lfirst(valuesListCell); + Expr **valuesArray = (Expr **) PointerArrayFromList(valuesList); List *expandedValuesList = NIL; - foreach(targetEntryCell, targetList) + foreach(targetEntryCell, query->targetList) { TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); Expr *targetExpr = targetEntry->expr; @@ -2082,7 +2098,7 @@ ExpandValuesLists(List *targetList, List *valuesLists) { /* expression from the VALUES section */ Var *targetListVar = (Var *) targetExpr; - targetExpr = list_nth(valuesList, targetListVar->varattno - 1); + targetExpr = valuesArray[targetListVar->varattno - 1]; } else { @@ -2093,10 +2109,53 @@ ExpandValuesLists(List *targetList, List *valuesLists) expandedValuesList = lappend(expandedValuesList, targetExpr); } - expandedValuesLists = lappend(expandedValuesLists, expandedValuesList); + valuesListCell->data.ptr_value = (void *) expandedValuesList; } - return expandedValuesLists; +#if (PG_VERSION_NUM >= 100000) + + /* reset coltypes, coltypmods, colcollations and rebuild them below */ + valuesRTE->coltypes = NIL; + valuesRTE->coltypmods = NIL; + valuesRTE->colcollations = NIL; +#endif + + foreach(targetEntryCell, query->targetList) + { + TargetEntry *targetEntry = lfirst(targetEntryCell); + Node *targetExprNode = (Node *) targetEntry->expr; + Oid targetType = InvalidOid; + int32 targetTypmod = -1; + Oid targetColl = InvalidOid; + Var *syntheticVar = NULL; + + /* RTE_VALUES comes 2nd, after destination table */ + Index valuesVarno = 2; + + targetEntryNo++; + + targetType = exprType(targetExprNode); + targetTypmod = exprTypmod(targetExprNode); + targetColl = exprCollation(targetExprNode); + +#if (PG_VERSION_NUM >= 100000) + valuesRTE->coltypes = lappend_oid(valuesRTE->coltypes, targetType); + valuesRTE->coltypmods = lappend_int(valuesRTE->coltypmods, targetTypmod); + valuesRTE->colcollations = lappend_oid(valuesRTE->colcollations, targetColl); +#endif + + if (IsA(targetExprNode, Var)) + { + Var *targetVar = (Var *) targetExprNode; + targetVar->varattno = targetEntryNo; + continue; + } + + /* replace the original expression with a Var referencing values_lists */ + syntheticVar = makeVar(valuesVarno, targetEntryNo, targetType, targetTypmod, + targetColl, 0); + targetEntry->expr = (Expr *) syntheticVar; + } } @@ -2208,11 +2267,10 @@ static List * ExtractInsertValuesList(Query *query, Var *partitionColumn) { List *insertValuesList = NIL; - RangeTblEntry *valuesRTE = NULL; + TargetEntry *targetEntry = get_tle_by_resno(query->targetList, + partitionColumn->varattno); - int partitionColumnIndex = GetTargetListEntryIndexByResno(query->targetList, - partitionColumn->varattno); - if (partitionColumnIndex == -1) + if (targetEntry == NULL) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg("cannot perform an INSERT without a partition column " @@ -2221,20 +2279,25 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn) /* * We've got a multi-row INSERT. PostgreSQL internally represents such - * commands with a special VALUES range table entry. + * commands by linking Vars in the target list to lists of values within + * a special VALUES range table entry. By extracting the right positional + * expression from each list within that RTE, we will extract the partition + * values for each row within the multi-row INSERT. */ - valuesRTE = ExtractDistributedInsertValuesRTE(query); - if (valuesRTE != NULL) + if (IsA(targetEntry->expr, Var)) { + Var *partitionVar = (Var *) targetEntry->expr; + RangeTblEntry *referencedRTE = NULL; ListCell *valuesListCell = NULL; Index ivIndex = 0; - foreach(valuesListCell, valuesRTE->values_lists) + referencedRTE = rt_fetch(partitionVar->varno, query->rtable); + foreach(valuesListCell, referencedRTE->values_lists) { InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues)); insertValues->rowValues = (List *) lfirst(valuesListCell); insertValues->partitionValueExpr = list_nth(insertValues->rowValues, - partitionColumnIndex); + (partitionVar->varattno - 1)); insertValues->shardId = INVALID_SHARD_ID; insertValues->listIndex = ivIndex; @@ -2246,12 +2309,9 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn) /* nothing's been found yet; this is a simple single-row INSERT */ if (insertValuesList == NIL) { - TargetEntry *partitionColumnTargetEntry = list_nth(query->targetList, - partitionColumnIndex); - InsertValues *insertValues = (InsertValues *) palloc(sizeof(InsertValues)); insertValues->rowValues = NIL; - insertValues->partitionValueExpr = partitionColumnTargetEntry->expr; + insertValues->partitionValueExpr = targetEntry->expr; insertValues->shardId = INVALID_SHARD_ID; insertValuesList = lappend(insertValuesList, insertValues); @@ -2261,33 +2321,6 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn) } -/* - * GetTargetListEntryIndexByResno is the equivalent of get_tle_by_resno - * but returns the index in the target list instead of the TargetEntry - * itself, or -1 if it cannot be found. - */ -static int -GetTargetListEntryIndexByResno(List *targetList, int resno) -{ - ListCell *targetEntryCell = NULL; - int targetEntryIndex = 0; - - foreach(targetEntryCell, targetList) - { - TargetEntry *tle = (TargetEntry *) lfirst(targetEntryCell); - - if (tle->resno == resno) - { - return targetEntryIndex; - } - - targetEntryIndex++; - } - - return -1; -} - - /* * MultiRouterPlannableQuery returns true if given query can be router plannable. * The query is router plannable if it is a modify query, or if its is a select diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 5c14a8c43..79e5268b1 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -110,7 +110,6 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) ListCell *rteCell = NULL; ListCell *cteCell = NULL; Node *modifiedNode = NULL; - bool isMultiRowInsert = false; if (query->jointree && query->jointree->quals) { @@ -118,6 +117,22 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) planState); } + foreach(targetEntryCell, query->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + /* performance optimization for the most common cases */ + if (IsA(targetEntry->expr, Const) || IsA(targetEntry->expr, Var)) + { + continue; + } + + modifiedNode = PartiallyEvaluateExpression((Node *) targetEntry->expr, + planState); + + targetEntry->expr = (Expr *) modifiedNode; + } + foreach(rteCell, query->rtable) { RangeTblEntry *rte = (RangeTblEntry *) lfirst(rteCell); @@ -129,34 +144,6 @@ ExecuteMasterEvaluableFunctions(Query *query, PlanState *planState) else if (rte->rtekind == RTE_VALUES) { EvaluateValuesListsItems(rte->values_lists, planState); - isMultiRowInsert = (query->commandType == CMD_INSERT); - } - } - - /* - * For multi-row INSERTs, functions are evaluated by expanding the - * values_lists with the default expressions in the target list and - * performing function evaluation on the values_lists. Expressions - * in the target list should not be evaluated since they serve only - * as templates and evaluating them would cause unexpected results - * (e.g. sequences being called one more time). - */ - if (!isMultiRowInsert) - { - foreach(targetEntryCell, query->targetList) - { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - - /* performance optimization for the most common cases */ - if (IsA(targetEntry->expr, Const) || IsA(targetEntry->expr, Var)) - { - continue; - } - - modifiedNode = PartiallyEvaluateExpression((Node *) targetEntry->expr, - planState); - - targetEntry->expr = (Expr *) modifiedNode; } } diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index c91aa6f97..f9b7461d4 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -49,7 +49,7 @@ extern Oid ExtractFirstDistributedTableId(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query); -extern List * ExpandValuesLists(List *targetList, List *valuesLists); +extern bool IsMultiRowInsert(Query *query); extern void AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval);