diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index db5c3d4ff..3bf0bb327 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -1247,7 +1247,7 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte, RangeTblEntry *subqueryPartitionColumnRelationIdRTE = NULL; List *parentQueryList = list_make2(query, subquery); - bool skipOuterVars = true; + bool skipOuterVars = false; FindReferencedTableColumn(selectTargetExpr, parentQueryList, subquery, &subqueryPartitionColumn, @@ -1543,71 +1543,147 @@ InsertSelectResultIdPrefix(uint64 planId) /* - * WrapSubquery wraps the given query as a subquery in a newly constructed - * "SELECT * FROM (...subquery...) citus_insert_select_subquery" query. + * Return true if the expression tree can change value within a single scan + * (i.e. the planner must treat it as VOLATILE). + * We just delegate to PostgreSQL’s helper. + */ +static inline bool +expr_is_volatile(Node *node) +{ + /* contain_volatile_functions() also returns true for set-returning + * volatile functions and for nextval()/currval(). */ + return contain_volatile_functions(node); +} + + +/* + * WrapSubquery + * + * Build a wrapper query: + * + * SELECT + * FROM ( ) + * citus_insert_select_subquery + * + * Purpose: + * - Preserve column numbering while lifting volatile expressions to the coordinator. + * - Volatile (non-deterministic) expressions not used in GROUP BY / ORDER BY + * are lifted to the outer SELECT to ensure they are evaluated only once. + * - Stable/immutable expressions or volatile ones required by GROUP BY / ORDER BY + * stay in the subquery and are accessed via Vars in the outer SELECT. */ Query * WrapSubquery(Query *subquery) { + /* + * 1. Build the wrapper skeleton: SELECT ... FROM (subquery) alias + */ ParseState *pstate = make_parsestate(NULL); - List *newTargetList = NIL; - Query *outerQuery = makeNode(Query); outerQuery->commandType = CMD_SELECT; - /* create range table entries */ - Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL); - RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem( - addRangeTableEntryForSubquery( - pstate, subquery, - selectAlias, false, true)); - outerQuery->rtable = list_make1(newRangeTableEntry); + Alias *alias = makeAlias("citus_insert_select_subquery", NIL); + RangeTblEntry *rte_subq = + RangeTableEntryFromNSItem( + addRangeTableEntryForSubquery(pstate, + subquery, /* still points to original subquery */ + alias, + false, /* not LATERAL */ + true)); /* in FROM clause */ + + outerQuery->rtable = list_make1(rte_subq); #if PG_VERSION_NUM >= PG_VERSION_16 - /* - * This part of the code is more of a sanity check for readability, - * it doesn't really do anything. - * addRangeTableEntryForSubquery doesn't add permission info - * because the range table is set to be RTE_SUBQUERY. - * Hence we should also have no perminfos here. - */ - Assert(newRangeTableEntry->rtekind == RTE_SUBQUERY && - newRangeTableEntry->perminfoindex == 0); + /* Ensure RTE_SUBQUERY has proper permission handling */ + Assert(rte_subq->rtekind == RTE_SUBQUERY && + rte_subq->perminfoindex == 0); outerQuery->rteperminfos = NIL; #endif - /* set the FROM expression to the subquery */ - RangeTblRef *newRangeTableRef = makeNode(RangeTblRef); - newRangeTableRef->rtindex = 1; - outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL); + RangeTblRef *rtref = makeNode(RangeTblRef); + rtref->rtindex = 1; /* Only one RTE, so index is 1 */ + outerQuery->jointree = makeFromExpr(list_make1(rtref), NULL); - /* create a target list that matches the SELECT */ - TargetEntry *selectTargetEntry = NULL; - foreach_declared_ptr(selectTargetEntry, subquery->targetList) + /* + * 2. Create new target lists for inner (worker) and outer (coordinator) + */ + List *newInnerTL = NIL; + List *newOuterTL = NIL; + int nextResno = 1; + + TargetEntry *te = NULL; + foreach_declared_ptr(te, subquery->targetList) { - /* exactly 1 entry in FROM */ - int indexInRangeTable = 1; - - if (selectTargetEntry->resjunk) + if (te->resjunk) { + /* Keep resjunk entries only in subquery (not in outer query) */ + newInnerTL = lappend(newInnerTL, te); continue; } - Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno, - exprType((Node *) selectTargetEntry->expr), - exprTypmod((Node *) selectTargetEntry->expr), - exprCollation((Node *) selectTargetEntry->expr), 0); + bool isVolatile = expr_is_volatile((Node *) te->expr); + bool usedInSort = (te->ressortgroupref != 0); - TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar, - selectTargetEntry->resno, - selectTargetEntry->resname, - selectTargetEntry->resjunk); + if (isVolatile && !usedInSort) + { + /* + * Lift volatile expression to outer query so it's evaluated once. + * In inner query, place a NULL of the same type to preserve column position. + */ + TargetEntry *outerTE = + makeTargetEntry(copyObject(te->expr), + list_length(newOuterTL) + 1, + te->resname, + false); + newOuterTL = lappend(newOuterTL, outerTE); - newTargetList = lappend(newTargetList, newSelectTargetEntry); + Const *nullConst = makeNullConst(exprType((Node *) te->expr), + exprTypmod((Node *) te->expr), + exprCollation((Node *) te->expr)); + + TargetEntry *placeholder = + makeTargetEntry((Expr *) nullConst, + nextResno++, /* preserve column position */ + te->resname, + false); /* visible, not resjunk */ + newInnerTL = lappend(newInnerTL, placeholder); + } + else + { + /* + * Either: + * - expression is stable or immutable, or + * - volatile but needed for sorting or grouping + * + * In both cases, keep it in subquery and reference it using a Var. + */ + TargetEntry *innerTE = te; /* reuse original node */ + innerTE->resno = nextResno++; + newInnerTL = lappend(newInnerTL, innerTE); + + Var *v = makeVar(/* subquery reference index is 1 */ + rtref->rtindex, /* same as 1, but self‑documenting */ + innerTE->resno, + exprType((Node *) innerTE->expr), + exprTypmod((Node *) innerTE->expr), + exprCollation((Node *) innerTE->expr), + 0); + + TargetEntry *outerTE = + makeTargetEntry((Expr *) v, + list_length(newOuterTL) + 1, + innerTE->resname, + false); + newOuterTL = lappend(newOuterTL, outerTE); + } } - outerQuery->targetList = newTargetList; + /* + * 3. Assign target lists and return the wrapper query + */ + subquery->targetList = newInnerTL; + outerQuery->targetList = newOuterTL; return outerQuery; } diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 029de7707..c4c11f4eb 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -4478,46 +4478,43 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * return; } - - if (candidateColumn->varlevelsup > 0) + /* Walk up varlevelsup as many times as needed */ + while (candidateColumn->varlevelsup > 0) { + /* Caller asked us to ignore any outer Vars → just bail out */ if (skipOuterVars) { - /* - * we don't want to process outer vars, so we return early. - */ return; } - /* - * We currently don't support finding partition keys in the subqueries - * that reference outer subqueries. For example, in correlated - * subqueries in WHERE clause, we don't support use of partition keys - * in the subquery that is referred from the outer query. - */ - - int parentQueryIndex = list_length(parentQueryList) - - candidateColumn->varlevelsup; - if (!(IsIndexInRange(parentQueryList, parentQueryIndex))) + /* Locate the parent query that owns this Var */ + int parentIdx = + list_length(parentQueryList) - candidateColumn->varlevelsup; + if (!IsIndexInRange(parentQueryList, parentIdx)) { - return; + return; /* malformed tree */ } - /* - * Before we recurse into the query tree, we should update the candidateColumn and we use copy of it. - * As we get the query from varlevelsup up, we reset the varlevelsup. - */ + /* Work on a fresh copy of the Var with varlevelsup reset */ candidateColumn = copyObject(candidateColumn); candidateColumn->varlevelsup = 0; /* - * We should be careful about these fields because they need to - * be updated correctly based on ctelevelsup and varlevelsup. + * Make a *completely private* copy of parentQueryList for the + * next recursion step. We copy the whole list and then truncate + * so every recursive branch owns its own list cells. */ - query = list_nth(parentQueryList, parentQueryIndex); - parentQueryList = list_truncate(parentQueryList, parentQueryIndex); + List *newParent = + list_copy(parentQueryList); /* duplicates every cell */ + newParent = list_truncate(newParent, parentIdx); + + query = list_nth(parentQueryList, parentIdx); + parentQueryList = newParent; /* hand private copy down */ + + /* Loop again if still pointing to an outer level */ } + if (candidateColumn->varattno == InvalidAttrNumber) { /* diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 58d22583e..00e335a82 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -3492,5 +3492,150 @@ $$); Task Count: 1 (2 rows) +--------------------------------------------------------------------- +-- Regression Test Script for Issue #7784 +-- This script tests INSERT ... SELECT with a CTE for: +-- 1. Schema based sharding. +-- 2. A distributed table. +--------------------------------------------------------------------- +-- Enable schema-based sharding +SET citus.enable_schema_based_sharding TO ON; +-- Create a table for schema based sharding +CREATE TABLE version_sch_based ( + id bigserial NOT NULL, + description varchar(255), + PRIMARY KEY (id) +); +-- Insert an initial row. +INSERT INTO version_sch_based (description) VALUES ('Version 1'); +-- Duplicate the row using a CTE and INSERT ... SELECT. +WITH v AS ( + SELECT * FROM version_sch_based WHERE description = 'Version 1' +) +INSERT INTO version_sch_based (description) +SELECT description FROM v; +-- Expected output: +-- id | description +-- ----+------------- +-- 1 | Version 1 +-- 2 | Version 1 +-- Query the table and order by id for consistency. +SELECT * FROM version_sch_based ORDER BY id; + id | description +--------------------------------------------------------------------- + 1 | Version 1 + 2 | Version 1 +(2 rows) + +--------------------------------------------------------------------- +-- Case 2: Distributed Table Scenario +--------------------------------------------------------------------- +SET citus.enable_schema_based_sharding TO OFF; +-- Create a table for the distributed test. +CREATE TABLE version_dist ( + id bigserial NOT NULL, + description varchar(255), + PRIMARY KEY (id) +); +-- Register the table as distributed using the 'id' column as the distribution key. +SELECT create_distributed_table('version_dist', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Insert an initial row. +INSERT INTO version_dist (description) VALUES ('Version 1'); +-- Duplicate the row using a CTE and INSERT ... SELECT. +WITH v AS ( + SELECT * FROM version_dist WHERE description = 'Version 1' +) +INSERT INTO version_dist (description) +SELECT description FROM v; +-- Expected output: +-- id | description +-- ----+------------- +-- 1 | Version 1 +-- 2 | Version 1 +-- Query the table and order by id for consistency. +SELECT * FROM version_dist ORDER BY id; + id | description +--------------------------------------------------------------------- + 1 | Version 1 + 2 | Version 1 +(2 rows) + +--------------------------------------------------------------------- +-- Case 3: Distributed INSERT … SELECT with nextval() +-- Verifies that nextval() is evaluated on the coordinator only. +--------------------------------------------------------------------- +-- A fresh sequence for clarity +CREATE SEQUENCE seq_nextval_test START 100; +-- Table with DEFAULT nextval() +CREATE TABLE version_dist_seq ( + id bigint DEFAULT nextval('seq_nextval_test'), + description text, + PRIMARY KEY (id) +); +SELECT create_distributed_table('version_dist_seq', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Seed one row (id = 100) +INSERT INTO version_dist_seq (description) VALUES ('row‑0'); +-- CTE duplication – should produce **exactly one** new sequence value (id = 101) +WITH v AS ( + SELECT * FROM version_dist_seq WHERE description = 'row‑0' +) +INSERT INTO version_dist_seq (description) +SELECT description FROM v; +-- Expected: ids are 100 and 101 (no gaps, no duplicates) +SELECT id, description FROM version_dist_seq ORDER BY id; + id | description +--------------------------------------------------------------------- + 100 | row‑0 + 101 | row‑0 +(2 rows) + +--------------------------------------------------------------------- +-- Case 4: UNION ALL + nextval() in distributed INSERT … SELECT +--------------------------------------------------------------------- +CREATE SEQUENCE seq_union_test START 200; +CREATE TABLE version_dist_union ( + id bigint DEFAULT nextval('seq_union_test'), + val int, + PRIMARY KEY (id) +); +SELECT create_distributed_table('version_dist_union', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Seed rows +INSERT INTO version_dist_union (val) VALUES (1), (2); +-- UNION ALL duplication; each leg returns two rows -> four inserts total +WITH src AS ( + SELECT val FROM version_dist_union + UNION ALL + SELECT val FROM version_dist_union +) +INSERT INTO version_dist_union(val) +SELECT val FROM src; +-- Expected IDs: 200,201,202,203,204,205 +SELECT id, val FROM version_dist_union ORDER BY id; + id | val +--------------------------------------------------------------------- + 200 | 1 + 201 | 2 + 202 | 1 + 203 | 2 + 204 | 1 + 205 | 2 +(6 rows) + +-- End of Issue #7784 SET client_min_messages TO ERROR; DROP SCHEMA multi_insert_select CASCADE; diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index b773ce906..eabadda7c 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -2452,5 +2452,136 @@ SELECT coordinator_plan($$ SELECT id FROM dist_table_5 JOIN cte_1 USING(id); $$); +------------------------------- +-- Regression Test Script for Issue #7784 +-- This script tests INSERT ... SELECT with a CTE for: +-- 1. Schema based sharding. +-- 2. A distributed table. +------------------------------- + +-- Enable schema-based sharding +SET citus.enable_schema_based_sharding TO ON; + + +-- Create a table for schema based sharding +CREATE TABLE version_sch_based ( + id bigserial NOT NULL, + description varchar(255), + PRIMARY KEY (id) +); + +-- Insert an initial row. +INSERT INTO version_sch_based (description) VALUES ('Version 1'); + +-- Duplicate the row using a CTE and INSERT ... SELECT. +WITH v AS ( + SELECT * FROM version_sch_based WHERE description = 'Version 1' +) +INSERT INTO version_sch_based (description) +SELECT description FROM v; + +-- Expected output: +-- id | description +-- ----+------------- +-- 1 | Version 1 +-- 2 | Version 1 + +-- Query the table and order by id for consistency. +SELECT * FROM version_sch_based ORDER BY id; + +-------------------------------------------------- +-- Case 2: Distributed Table Scenario +-------------------------------------------------- +SET citus.enable_schema_based_sharding TO OFF; + +-- Create a table for the distributed test. +CREATE TABLE version_dist ( + id bigserial NOT NULL, + description varchar(255), + PRIMARY KEY (id) +); + +-- Register the table as distributed using the 'id' column as the distribution key. +SELECT create_distributed_table('version_dist', 'id'); + +-- Insert an initial row. +INSERT INTO version_dist (description) VALUES ('Version 1'); + +-- Duplicate the row using a CTE and INSERT ... SELECT. +WITH v AS ( + SELECT * FROM version_dist WHERE description = 'Version 1' +) +INSERT INTO version_dist (description) +SELECT description FROM v; + +-- Expected output: +-- id | description +-- ----+------------- +-- 1 | Version 1 +-- 2 | Version 1 + +-- Query the table and order by id for consistency. +SELECT * FROM version_dist ORDER BY id; + +------------------------------- +-- Case 3: Distributed INSERT … SELECT with nextval() +-- Verifies that nextval() is evaluated on the coordinator only. +------------------------------- + +-- A fresh sequence for clarity +CREATE SEQUENCE seq_nextval_test START 100; + +-- Table with DEFAULT nextval() +CREATE TABLE version_dist_seq ( + id bigint DEFAULT nextval('seq_nextval_test'), + description text, + PRIMARY KEY (id) +); +SELECT create_distributed_table('version_dist_seq', 'id'); + +-- Seed one row (id = 100) +INSERT INTO version_dist_seq (description) VALUES ('row‑0'); + +-- CTE duplication – should produce **exactly one** new sequence value (id = 101) +WITH v AS ( + SELECT * FROM version_dist_seq WHERE description = 'row‑0' +) +INSERT INTO version_dist_seq (description) +SELECT description FROM v; + +-- Expected: ids are 100 and 101 (no gaps, no duplicates) +SELECT id, description FROM version_dist_seq ORDER BY id; + + +------------------------------- +-- Case 4: UNION ALL + nextval() in distributed INSERT … SELECT +------------------------------- + +CREATE SEQUENCE seq_union_test START 200; + +CREATE TABLE version_dist_union ( + id bigint DEFAULT nextval('seq_union_test'), + val int, + PRIMARY KEY (id) +); +SELECT create_distributed_table('version_dist_union', 'id'); + +-- Seed rows +INSERT INTO version_dist_union (val) VALUES (1), (2); + +-- UNION ALL duplication; each leg returns two rows -> four inserts total +WITH src AS ( + SELECT val FROM version_dist_union + UNION ALL + SELECT val FROM version_dist_union +) +INSERT INTO version_dist_union(val) +SELECT val FROM src; + +-- Expected IDs: 200,201,202,203,204,205 +SELECT id, val FROM version_dist_union ORDER BY id; + +-- End of Issue #7784 + SET client_min_messages TO ERROR; DROP SCHEMA multi_insert_select CASCADE;