diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index d83b7a3e0..207c89748 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -100,7 +100,6 @@ static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = { static void PrepareMasterJobDirectory(Job *workerJob); static void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); static Relation StubRelation(TupleDesc tupleDescriptor); -static bool IsMultiRowInsert(Query *query); /* @@ -195,20 +194,6 @@ RouterCreateScan(CustomScan *scan) } -/* - * IsMultiRowInsert returns whether the given query is a multi-row INSERT. - * - * It does this by determining whether the query is an INSERT that has an - * RTE_VALUES. Single-row INSERTs will have their RTE_VALUES optimised away - * in transformInsertStmt, and instead use the target list. - */ -static bool -IsMultiRowInsert(Query *query) -{ - return ExtractDistributedInsertValuesRTE(query) != NULL; -} - - /* * CoordinatorInsertSelectCrateScan creates the scan state for executing * INSERT..SELECT into a distributed table via the coordinator. diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 60c6d043c..1013a214a 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -130,6 +130,7 @@ static List * TargetShardIntervalsForRouter(Query *query, RelationRestrictionContext *restrictionContext, bool *multiShardQuery); static List * WorkersContainingAllShards(List *prunedShardIntervalsList); +static void NormalizeMultiRowInsertTargetList(Query *query); static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError); static List * GroupInsertValuesByShardId(List *insertValuesList); static List * ExtractInsertValuesList(Query *query, Var *partitionColumn); @@ -1081,9 +1082,25 @@ RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **plann bool requiresMasterEvaluation = false; bool deferredPruning = false; - if (!CanShardPrune(distributedTableId, query)) + bool isMultiRowInsert = IsMultiRowInsert(query); + if (isMultiRowInsert) { - /* there is a non-constant in the partition column, cannot prune yet */ + /* add default expressions to RTE_VALUES in multi-row INSERTs */ + NormalizeMultiRowInsertTargetList(originalQuery); + } + + if (isMultiRowInsert || !CanShardPrune(distributedTableId, query)) + { + /* + * If there is a non-constant (e.g. parameter, function call) in the partition + * column of the INSERT then we defer shard pruning until the executor where + * these values are known. + * + * XXX: We also defer pruning for multi-row INSERTs because of some current + * limitations with the way multi-row INSERTs are handled. Most notably, we + * don't evaluate functions in task->rowValuesList. Therefore we need to + * perform function evaluation before we can run RouterInsertTaskList. + */ taskList = NIL; deferredPruning = true; @@ -1997,6 +2014,20 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError) } +/* + * IsMultiRowInsert returns whether the given query is a multi-row INSERT. + * + * It does this by determining whether the query is an INSERT that has an + * RTE_VALUES. Single-row INSERTs will have their RTE_VALUES optimised away + * in transformInsertStmt, and instead use the target list. + */ +bool +IsMultiRowInsert(Query *query) +{ + return ExtractDistributedInsertValuesRTE(query) != NULL; +} + + /* * ExtractDistributedInsertValuesRTE does precisely that. If the provided * query is not an INSERT, or if the INSERT does not have a VALUES RTE @@ -2030,6 +2061,104 @@ ExtractDistributedInsertValuesRTE(Query *query) } +/* + * NormalizeMultiRowInsertTargetList ensures all elements of multi-row INSERT target + * lists are Vars. In multi-row INSERTs, most target list entries contain a Var + * expression pointing to a position within the values_lists field of a VALUES + * RTE, but non-NULL default columns are handled differently. Instead of adding + * the default expression to each row, a single expression encoding the DEFAULT + * appears in the target list. For consistency, we move these expressions into + * values lists and replace them with an appropriately constructed Var. + */ +static void +NormalizeMultiRowInsertTargetList(Query *query) +{ + ListCell *valuesListCell = NULL; + ListCell *targetEntryCell = NULL; + int targetEntryNo = 0; + + RangeTblEntry *valuesRTE = ExtractDistributedInsertValuesRTE(query); + if (valuesRTE == NULL) + { + return; + } + + foreach(valuesListCell, valuesRTE->values_lists) + { + List *valuesList = (List *) lfirst(valuesListCell); + Expr **valuesArray = (Expr **) PointerArrayFromList(valuesList); + List *expandedValuesList = NIL; + + foreach(targetEntryCell, query->targetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + Expr *targetExpr = targetEntry->expr; + + if (IsA(targetExpr, Var)) + { + /* expression from the VALUES section */ + Var *targetListVar = (Var *) targetExpr; + targetExpr = valuesArray[targetListVar->varattno - 1]; + } + else + { + /* copy the column's default expression */ + targetExpr = copyObject(targetExpr); + } + + expandedValuesList = lappend(expandedValuesList, targetExpr); + } + + valuesListCell->data.ptr_value = (void *) expandedValuesList; + } + +#if (PG_VERSION_NUM >= 100000) + + /* reset coltypes, coltypmods, colcollations and rebuild them below */ + valuesRTE->coltypes = NIL; + valuesRTE->coltypmods = NIL; + valuesRTE->colcollations = NIL; +#endif + + foreach(targetEntryCell, query->targetList) + { + TargetEntry *targetEntry = lfirst(targetEntryCell); + Node *targetExprNode = (Node *) targetEntry->expr; + Oid targetType = InvalidOid; + int32 targetTypmod = -1; + Oid targetColl = InvalidOid; + Var *syntheticVar = NULL; + + /* RTE_VALUES comes 2nd, after destination table */ + Index valuesVarno = 2; + + targetEntryNo++; + + targetType = exprType(targetExprNode); + targetTypmod = exprTypmod(targetExprNode); + targetColl = exprCollation(targetExprNode); + +#if (PG_VERSION_NUM >= 100000) + valuesRTE->coltypes = lappend_oid(valuesRTE->coltypes, targetType); + valuesRTE->coltypmods = lappend_int(valuesRTE->coltypmods, targetTypmod); + valuesRTE->colcollations = lappend_oid(valuesRTE->colcollations, targetColl); +#endif + + if (IsA(targetExprNode, Var)) + { + Var *targetVar = (Var *) targetExprNode; + targetVar->varattno = targetEntryNo; + continue; + } + + /* replace the original expression with a Var referencing values_lists */ + syntheticVar = makeVar(valuesVarno, targetEntryNo, targetType, targetTypmod, + targetColl, 0); + targetEntry->expr = (Expr *) syntheticVar; + } +} + + /* * IntersectPlacementList performs placement pruning based on matching on * nodeName:nodePort fields of shard placement data. We start pruning from all diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 3ff9f048a..f9b7461d4 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -49,6 +49,7 @@ extern Oid ExtractFirstDistributedTableId(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query); +extern bool IsMultiRowInsert(Query *query); extern void AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval); diff --git a/src/test/regress/expected/isolation_insert_vs_all.out b/src/test/regress/expected/isolation_insert_vs_all.out index 109ff3ea5..cb3dbd6d6 100644 --- a/src/test/regress/expected/isolation_insert_vs_all.out +++ b/src/test/regress/expected/isolation_insert_vs_all.out @@ -687,15 +687,14 @@ create_distributed_table step s1-initialize: COPY insert_hash FROM PROGRAM 'echo 0, a\\n1, b\\n2, c\\n3, d\\n4, e' WITH CSV; step s1-ddl-add-column: ALTER TABLE insert_hash ADD new_column int DEFAULT 0; step s1-begin: BEGIN; -WARNING: INSERT has more target columns than expressions step s1-insert-multi-row: INSERT INTO insert_hash VALUES(7, 'k'), (8, 'l'), (9, 'm'); -ERROR: could not modify any active placements -step s2-ddl-drop-column: ALTER TABLE insert_hash DROP new_column; +step s2-ddl-drop-column: ALTER TABLE insert_hash DROP new_column; step s1-commit: COMMIT; +step s2-ddl-drop-column: <... completed> step s1-select-count: SELECT COUNT(*) FROM insert_hash; count -5 +8 step s1-show-columns: SELECT run_command_on_workers('SELECT column_name FROM information_schema.columns WHERE table_name LIKE ''insert_hash%'' AND column_name = ''new_column'' ORDER BY 1 LIMIT 1'); run_command_on_workers @@ -897,13 +896,11 @@ step s1-begin: BEGIN; step s1-ddl-add-column: ALTER TABLE insert_hash ADD new_column int DEFAULT 0; step s2-insert-multi-row: INSERT INTO insert_hash VALUES(7, 'k'), (8, 'l'), (9, 'm'); step s1-commit: COMMIT; -WARNING: INSERT has more target columns than expressions step s2-insert-multi-row: <... completed> -error in steps s1-commit s2-insert-multi-row: ERROR: could not modify any active placements step s1-select-count: SELECT COUNT(*) FROM insert_hash; count -5 +8 step s1-show-columns: SELECT run_command_on_workers('SELECT column_name FROM information_schema.columns WHERE table_name LIKE ''insert_hash%'' AND column_name = ''new_column'' ORDER BY 1 LIMIT 1'); run_command_on_workers diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index 0d235119a..a4a00ed67 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -690,6 +690,182 @@ INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING * 3 | 103 | Mynt (1 row) +-- Test multi-row insert with serial in the partition column +INSERT INTO app_analytics_events (app_id, name) +VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; + id | app_id | name +----+--------+------ + 5 | 105 | Mynt + 4 | 104 | Wayz +(2 rows) + +INSERT INTO app_analytics_events (id, name) +VALUES (DEFAULT, 'Foo'), (300, 'Wah') RETURNING *; + id | app_id | name +-----+--------+------ + 6 | | Foo + 300 | | Wah +(2 rows) + +PREPARE prep(varchar) AS +INSERT INTO app_analytics_events (id, name) +VALUES (DEFAULT, $1 || '.1'), (400 , $1 || '.2') RETURNING *; +EXECUTE prep('version-1'); + id | app_id | name +-----+--------+------------- + 7 | | version-1.1 + 400 | | version-1.2 +(2 rows) + +EXECUTE prep('version-2'); + id | app_id | name +-----+--------+------------- + 8 | | version-2.1 + 400 | | version-2.2 +(2 rows) + +EXECUTE prep('version-3'); + id | app_id | name +-----+--------+------------- + 400 | | version-3.2 + 9 | | version-3.1 +(2 rows) + +EXECUTE prep('version-4'); + id | app_id | name +-----+--------+------------- + 10 | | version-4.1 + 400 | | version-4.2 +(2 rows) + +EXECUTE prep('version-5'); + id | app_id | name +-----+--------+------------- + 400 | | version-5.2 + 11 | | version-5.1 +(2 rows) + +EXECUTE prep('version-6'); + id | app_id | name +-----+--------+------------- + 400 | | version-6.2 + 12 | | version-6.1 +(2 rows) + +SELECT * FROM app_analytics_events ORDER BY id, name; + id | app_id | name +-----+--------+----------------- + 1 | 101 | Fauxkemon Geaux + 2 | 102 | Wayz + 3 | 103 | Mynt + 4 | 104 | Wayz + 5 | 105 | Mynt + 6 | | Foo + 7 | | version-1.1 + 8 | | version-2.1 + 9 | | version-3.1 + 10 | | version-4.1 + 11 | | version-5.1 + 12 | | version-6.1 + 300 | | Wah + 400 | | version-1.2 + 400 | | version-2.2 + 400 | | version-3.2 + 400 | | version-4.2 + 400 | | version-5.2 + 400 | | version-6.2 +(19 rows) + +TRUNCATE app_analytics_events; +-- Test multi-row insert with a dropped column +ALTER TABLE app_analytics_events DROP COLUMN app_id; +INSERT INTO app_analytics_events (name) +VALUES ('Wayz'), ('Mynt') RETURNING *; + id | name +----+------ + 14 | Mynt + 13 | Wayz +(2 rows) + +SELECT * FROM app_analytics_events ORDER BY id; + id | name +----+------ + 13 | Wayz + 14 | Mynt +(2 rows) + +DROP TABLE app_analytics_events; +-- Test multi-row insert with a dropped column before the partition column +CREATE TABLE app_analytics_events (id int default 3, app_id integer, name text); +SELECT create_distributed_table('app_analytics_events', 'name'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE app_analytics_events DROP COLUMN app_id; +INSERT INTO app_analytics_events (name) +VALUES ('Wayz'), ('Mynt') RETURNING *; + id | name +----+------ + 3 | Mynt + 3 | Wayz +(2 rows) + +SELECT * FROM app_analytics_events WHERE name = 'Wayz'; + id | name +----+------ + 3 | Wayz +(1 row) + +DROP TABLE app_analytics_events; +-- Test multi-row insert with serial in a reference table +CREATE TABLE app_analytics_events (id serial, app_id integer, name text); +SELECT create_reference_table('app_analytics_events'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO app_analytics_events (app_id, name) +VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; + id | app_id | name +----+--------+------ + 1 | 104 | Wayz + 2 | 105 | Mynt +(2 rows) + +SELECT * FROM app_analytics_events ORDER BY id; + id | app_id | name +----+--------+------ + 1 | 104 | Wayz + 2 | 105 | Mynt +(2 rows) + +DROP TABLE app_analytics_events; +-- Test multi-row insert with serial in a non-partition column +CREATE TABLE app_analytics_events (id int, app_id serial, name text); +SELECT create_distributed_table('app_analytics_events', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO app_analytics_events (id, name) +VALUES (99, 'Wayz'), (98, 'Mynt') RETURNING name, app_id; + name | app_id +------+-------- + Mynt | 2 + Wayz | 1 +(2 rows) + +SELECT * FROM app_analytics_events ORDER BY id; + id | app_id | name +----+--------+------ + 98 | 2 | Mynt + 99 | 1 | Wayz +(2 rows) + DROP TABLE app_analytics_events; -- test UPDATE with subqueries CREATE TABLE raw_table (id bigint, value bigint); diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 9828c765a..42b5a36ae 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -447,6 +447,65 @@ INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURN INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id; INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *; +-- Test multi-row insert with serial in the partition column +INSERT INTO app_analytics_events (app_id, name) +VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; + +INSERT INTO app_analytics_events (id, name) +VALUES (DEFAULT, 'Foo'), (300, 'Wah') RETURNING *; + +PREPARE prep(varchar) AS +INSERT INTO app_analytics_events (id, name) +VALUES (DEFAULT, $1 || '.1'), (400 , $1 || '.2') RETURNING *; + +EXECUTE prep('version-1'); +EXECUTE prep('version-2'); +EXECUTE prep('version-3'); +EXECUTE prep('version-4'); +EXECUTE prep('version-5'); +EXECUTE prep('version-6'); + +SELECT * FROM app_analytics_events ORDER BY id, name; +TRUNCATE app_analytics_events; + +-- Test multi-row insert with a dropped column +ALTER TABLE app_analytics_events DROP COLUMN app_id; +INSERT INTO app_analytics_events (name) +VALUES ('Wayz'), ('Mynt') RETURNING *; + +SELECT * FROM app_analytics_events ORDER BY id; +DROP TABLE app_analytics_events; + +-- Test multi-row insert with a dropped column before the partition column +CREATE TABLE app_analytics_events (id int default 3, app_id integer, name text); +SELECT create_distributed_table('app_analytics_events', 'name'); + +ALTER TABLE app_analytics_events DROP COLUMN app_id; + +INSERT INTO app_analytics_events (name) +VALUES ('Wayz'), ('Mynt') RETURNING *; + +SELECT * FROM app_analytics_events WHERE name = 'Wayz'; +DROP TABLE app_analytics_events; + +-- Test multi-row insert with serial in a reference table +CREATE TABLE app_analytics_events (id serial, app_id integer, name text); +SELECT create_reference_table('app_analytics_events'); + +INSERT INTO app_analytics_events (app_id, name) +VALUES (104, 'Wayz'), (105, 'Mynt') RETURNING *; + +SELECT * FROM app_analytics_events ORDER BY id; +DROP TABLE app_analytics_events; + +-- Test multi-row insert with serial in a non-partition column +CREATE TABLE app_analytics_events (id int, app_id serial, name text); +SELECT create_distributed_table('app_analytics_events', 'id'); + +INSERT INTO app_analytics_events (id, name) +VALUES (99, 'Wayz'), (98, 'Mynt') RETURNING name, app_id; + +SELECT * FROM app_analytics_events ORDER BY id; DROP TABLE app_analytics_events; -- test UPDATE with subqueries