Planner: lift volatile target‑list items in `WrapSubquery` to coordinator (prevents sequence‑leap in distributed `INSERT … SELECT`) (#7976)

This PR fixes #7784 and refactors the `WrapSubquery(Query *subquery)`
function to improve clarity and correctness when handling volatile
expressions in subqueries during Citus insert-select rewriting.

### Background

The `WrapSubquery` function rewrites a query of the form:

```sql
INSERT INTO target_table SELECT ... FROM ...
```

...by wrapping the `SELECT` in a subquery:

```sql
SELECT <outer-TL>
  FROM ( <subquery with volatile expressions replaced with NULL> ) citus_insert_select_subquery
```

This transformation allows:

* **Volatile expressions** (e.g., `nextval`, `now`) **not used in `GROUP
BY` or `ORDER BY`** to be evaluated **exactly once on the coordinator**.
* **Stable/immutable or sort-relevant expressions** to remain in the
worker-executed subquery.
* Placeholder `NULL`s to maintain column alignment in the inner
subquery.

### Fix Details

* Restructured the code into labeled logical sections:

  1. Build wrapper query (`SELECT … FROM (subquery)`)
  2. Rewrite target lists with volatility analysis
  3. Assign and return updated query trees
  
* Preserved existing behavior, focusing on clarity and maintainability.

### How the new code handles volatile items

stage | what we look for | what we do | why
-- | -- | -- | --
scan target list once | 1. `expr_is_volatile(te->expr)` 2.
`te->ressortgroupref != 0` (is the column used in GROUP BY / ORDER BY?)
| decide whether to hoist or keep | we must not hoist an expression the
inner query still needs for sorting/grouping, otherwise its
`SortGroupClause` breaks
volatile & not used in sort/group | deep‑copy the expression into the
outer target list | executes once on the coordinator |  
  | leave a typed `NULL `placeholder (visible, not `resjunk`) in the
inner target list | keeps column numbering stable for helpers that
already ran (reorder, cast); the worker sends a cheap constant |  
stable / immutable, or volatile but used in sort/group | keep the
original expression in the inner list; outer list references it via a
`Var `| workers can evaluate it safely and, if needed, the inner
ORDER BY still works |  

###  Example

Given this query:

```sql
INSERT INTO t SELECT nextval('s'), 42 FROM generate_series(1, 2);
```

The planner rewrites it as:

```sql
SELECT nextval('s'), col2
  FROM (SELECT NULL::bigint AS col1, 42 AS col2 FROM generate_series(1, 2)) citus_insert_select_subquery;
```

This ensures `nextval('s')` is evaluated only once per row on the
**coordinator**, not on each worker node, preserving correct sequence
semantics.

#### **Outer‑Var guard (`FindReferencedTableColumn`)**

Because `WrapSubquery` adds an extra query level, lots of Vars that the
old code never expected become “outer” Vars; without teaching
`FindReferencedTableColumn` to climb that extra level reliably, Citus
would intermittently reject valid foreign keys and even hit asserts.

* Re‑implemented the outer‑Var guard so that the function:

* **Walks deterministically up the query stack** when `skipOuterVars =
false` (default for FK / UNION checks). A new while‑loop copies — rather
than truncates — `parentQueryList` on each hop, eliminating
list‑aliasing that made *issue 5248* fail intermittently in parallel
regressions.

* Handles multi‑level `varlevelsup` in a single loop; never mutates the
caller’s list in place.
pull/7973/merge
Mehmet YILMAZ 2025-05-06 17:45:49 +03:00 committed by GitHub
parent d4dd44e715
commit a4040ba5da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 415 additions and 66 deletions

View File

@ -1247,7 +1247,7 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
RangeTblEntry *subqueryPartitionColumnRelationIdRTE = NULL;
List *parentQueryList = list_make2(query, subquery);
bool skipOuterVars = true;
bool skipOuterVars = false;
FindReferencedTableColumn(selectTargetExpr,
parentQueryList, subquery,
&subqueryPartitionColumn,
@ -1543,71 +1543,147 @@ InsertSelectResultIdPrefix(uint64 planId)
/*
* WrapSubquery wraps the given query as a subquery in a newly constructed
* "SELECT * FROM (...subquery...) citus_insert_select_subquery" query.
* Return true if the expression tree can change value within a single scan
* (i.e. the planner must treat it as VOLATILE).
* We just delegate to PostgreSQLs helper.
*/
static inline bool
expr_is_volatile(Node *node)
{
/* contain_volatile_functions() also returns true for set-returning
* volatile functions and for nextval()/currval(). */
return contain_volatile_functions(node);
}
/*
* WrapSubquery
*
* Build a wrapper query:
*
* SELECT <outer-TL>
* FROM ( <subquery with any volatile items stripped> )
* citus_insert_select_subquery
*
* Purpose:
* - Preserve column numbering while lifting volatile expressions to the coordinator.
* - Volatile (non-deterministic) expressions not used in GROUP BY / ORDER BY
* are lifted to the outer SELECT to ensure they are evaluated only once.
* - Stable/immutable expressions or volatile ones required by GROUP BY / ORDER BY
* stay in the subquery and are accessed via Vars in the outer SELECT.
*/
Query *
WrapSubquery(Query *subquery)
{
/*
* 1. Build the wrapper skeleton: SELECT ... FROM (subquery) alias
*/
ParseState *pstate = make_parsestate(NULL);
List *newTargetList = NIL;
Query *outerQuery = makeNode(Query);
outerQuery->commandType = CMD_SELECT;
/* create range table entries */
Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL);
RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem(
addRangeTableEntryForSubquery(
pstate, subquery,
selectAlias, false, true));
outerQuery->rtable = list_make1(newRangeTableEntry);
Alias *alias = makeAlias("citus_insert_select_subquery", NIL);
RangeTblEntry *rte_subq =
RangeTableEntryFromNSItem(
addRangeTableEntryForSubquery(pstate,
subquery, /* still points to original subquery */
alias,
false, /* not LATERAL */
true)); /* in FROM clause */
outerQuery->rtable = list_make1(rte_subq);
#if PG_VERSION_NUM >= PG_VERSION_16
/*
* This part of the code is more of a sanity check for readability,
* it doesn't really do anything.
* addRangeTableEntryForSubquery doesn't add permission info
* because the range table is set to be RTE_SUBQUERY.
* Hence we should also have no perminfos here.
*/
Assert(newRangeTableEntry->rtekind == RTE_SUBQUERY &&
newRangeTableEntry->perminfoindex == 0);
/* Ensure RTE_SUBQUERY has proper permission handling */
Assert(rte_subq->rtekind == RTE_SUBQUERY &&
rte_subq->perminfoindex == 0);
outerQuery->rteperminfos = NIL;
#endif
/* set the FROM expression to the subquery */
RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
newRangeTableRef->rtindex = 1;
outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
RangeTblRef *rtref = makeNode(RangeTblRef);
rtref->rtindex = 1; /* Only one RTE, so index is 1 */
outerQuery->jointree = makeFromExpr(list_make1(rtref), NULL);
/* create a target list that matches the SELECT */
TargetEntry *selectTargetEntry = NULL;
foreach_declared_ptr(selectTargetEntry, subquery->targetList)
/*
* 2. Create new target lists for inner (worker) and outer (coordinator)
*/
List *newInnerTL = NIL;
List *newOuterTL = NIL;
int nextResno = 1;
TargetEntry *te = NULL;
foreach_declared_ptr(te, subquery->targetList)
{
/* exactly 1 entry in FROM */
int indexInRangeTable = 1;
if (selectTargetEntry->resjunk)
if (te->resjunk)
{
/* Keep resjunk entries only in subquery (not in outer query) */
newInnerTL = lappend(newInnerTL, te);
continue;
}
Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno,
exprType((Node *) selectTargetEntry->expr),
exprTypmod((Node *) selectTargetEntry->expr),
exprCollation((Node *) selectTargetEntry->expr), 0);
bool isVolatile = expr_is_volatile((Node *) te->expr);
bool usedInSort = (te->ressortgroupref != 0);
TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar,
selectTargetEntry->resno,
selectTargetEntry->resname,
selectTargetEntry->resjunk);
if (isVolatile && !usedInSort)
{
/*
* Lift volatile expression to outer query so it's evaluated once.
* In inner query, place a NULL of the same type to preserve column position.
*/
TargetEntry *outerTE =
makeTargetEntry(copyObject(te->expr),
list_length(newOuterTL) + 1,
te->resname,
false);
newOuterTL = lappend(newOuterTL, outerTE);
newTargetList = lappend(newTargetList, newSelectTargetEntry);
Const *nullConst = makeNullConst(exprType((Node *) te->expr),
exprTypmod((Node *) te->expr),
exprCollation((Node *) te->expr));
TargetEntry *placeholder =
makeTargetEntry((Expr *) nullConst,
nextResno++, /* preserve column position */
te->resname,
false); /* visible, not resjunk */
newInnerTL = lappend(newInnerTL, placeholder);
}
else
{
/*
* Either:
* - expression is stable or immutable, or
* - volatile but needed for sorting or grouping
*
* In both cases, keep it in subquery and reference it using a Var.
*/
TargetEntry *innerTE = te; /* reuse original node */
innerTE->resno = nextResno++;
newInnerTL = lappend(newInnerTL, innerTE);
Var *v = makeVar(/* subquery reference index is 1 */
rtref->rtindex, /* same as 1, but selfdocumenting */
innerTE->resno,
exprType((Node *) innerTE->expr),
exprTypmod((Node *) innerTE->expr),
exprCollation((Node *) innerTE->expr),
0);
TargetEntry *outerTE =
makeTargetEntry((Expr *) v,
list_length(newOuterTL) + 1,
innerTE->resname,
false);
newOuterTL = lappend(newOuterTL, outerTE);
}
}
outerQuery->targetList = newTargetList;
/*
* 3. Assign target lists and return the wrapper query
*/
subquery->targetList = newInnerTL;
outerQuery->targetList = newOuterTL;
return outerQuery;
}

View File

@ -4478,46 +4478,43 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
return;
}
if (candidateColumn->varlevelsup > 0)
/* Walk up varlevelsup as many times as needed */
while (candidateColumn->varlevelsup > 0)
{
/* Caller asked us to ignore any outer Vars → just bail out */
if (skipOuterVars)
{
/*
* we don't want to process outer vars, so we return early.
*/
return;
}
/*
* We currently don't support finding partition keys in the subqueries
* that reference outer subqueries. For example, in correlated
* subqueries in WHERE clause, we don't support use of partition keys
* in the subquery that is referred from the outer query.
*/
int parentQueryIndex = list_length(parentQueryList) -
candidateColumn->varlevelsup;
if (!(IsIndexInRange(parentQueryList, parentQueryIndex)))
/* Locate the parent query that owns this Var */
int parentIdx =
list_length(parentQueryList) - candidateColumn->varlevelsup;
if (!IsIndexInRange(parentQueryList, parentIdx))
{
return;
return; /* malformed tree */
}
/*
* Before we recurse into the query tree, we should update the candidateColumn and we use copy of it.
* As we get the query from varlevelsup up, we reset the varlevelsup.
*/
/* Work on a fresh copy of the Var with varlevelsup reset */
candidateColumn = copyObject(candidateColumn);
candidateColumn->varlevelsup = 0;
/*
* We should be careful about these fields because they need to
* be updated correctly based on ctelevelsup and varlevelsup.
* Make a *completely private* copy of parentQueryList for the
* next recursion step. We copy the whole list and then truncate
* so every recursive branch owns its own list cells.
*/
query = list_nth(parentQueryList, parentQueryIndex);
parentQueryList = list_truncate(parentQueryList, parentQueryIndex);
List *newParent =
list_copy(parentQueryList); /* duplicates every cell */
newParent = list_truncate(newParent, parentIdx);
query = list_nth(parentQueryList, parentIdx);
parentQueryList = newParent; /* hand private copy down */
/* Loop again if still pointing to an outer level */
}
if (candidateColumn->varattno == InvalidAttrNumber)
{
/*

View File

@ -3492,5 +3492,150 @@ $$);
Task Count: 1
(2 rows)
---------------------------------------------------------------------
-- Regression Test Script for Issue #7784
-- This script tests INSERT ... SELECT with a CTE for:
-- 1. Schema based sharding.
-- 2. A distributed table.
---------------------------------------------------------------------
-- Enable schema-based sharding
SET citus.enable_schema_based_sharding TO ON;
-- Create a table for schema based sharding
CREATE TABLE version_sch_based (
id bigserial NOT NULL,
description varchar(255),
PRIMARY KEY (id)
);
-- Insert an initial row.
INSERT INTO version_sch_based (description) VALUES ('Version 1');
-- Duplicate the row using a CTE and INSERT ... SELECT.
WITH v AS (
SELECT * FROM version_sch_based WHERE description = 'Version 1'
)
INSERT INTO version_sch_based (description)
SELECT description FROM v;
-- Expected output:
-- id | description
-- ----+-------------
-- 1 | Version 1
-- 2 | Version 1
-- Query the table and order by id for consistency.
SELECT * FROM version_sch_based ORDER BY id;
id | description
---------------------------------------------------------------------
1 | Version 1
2 | Version 1
(2 rows)
---------------------------------------------------------------------
-- Case 2: Distributed Table Scenario
---------------------------------------------------------------------
SET citus.enable_schema_based_sharding TO OFF;
-- Create a table for the distributed test.
CREATE TABLE version_dist (
id bigserial NOT NULL,
description varchar(255),
PRIMARY KEY (id)
);
-- Register the table as distributed using the 'id' column as the distribution key.
SELECT create_distributed_table('version_dist', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Insert an initial row.
INSERT INTO version_dist (description) VALUES ('Version 1');
-- Duplicate the row using a CTE and INSERT ... SELECT.
WITH v AS (
SELECT * FROM version_dist WHERE description = 'Version 1'
)
INSERT INTO version_dist (description)
SELECT description FROM v;
-- Expected output:
-- id | description
-- ----+-------------
-- 1 | Version 1
-- 2 | Version 1
-- Query the table and order by id for consistency.
SELECT * FROM version_dist ORDER BY id;
id | description
---------------------------------------------------------------------
1 | Version 1
2 | Version 1
(2 rows)
---------------------------------------------------------------------
-- Case 3: Distributed INSERT … SELECT with nextval()
-- Verifies that nextval() is evaluated on the coordinator only.
---------------------------------------------------------------------
-- A fresh sequence for clarity
CREATE SEQUENCE seq_nextval_test START 100;
-- Table with DEFAULT nextval()
CREATE TABLE version_dist_seq (
id bigint DEFAULT nextval('seq_nextval_test'),
description text,
PRIMARY KEY (id)
);
SELECT create_distributed_table('version_dist_seq', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Seed one row (id = 100)
INSERT INTO version_dist_seq (description) VALUES ('row0');
-- CTE duplication should produce **exactly one** new sequence value (id = 101)
WITH v AS (
SELECT * FROM version_dist_seq WHERE description = 'row0'
)
INSERT INTO version_dist_seq (description)
SELECT description FROM v;
-- Expected: ids are 100 and 101 (no gaps, no duplicates)
SELECT id, description FROM version_dist_seq ORDER BY id;
id | description
---------------------------------------------------------------------
100 | row0
101 | row0
(2 rows)
---------------------------------------------------------------------
-- Case 4: UNION ALL + nextval() in distributed INSERT … SELECT
---------------------------------------------------------------------
CREATE SEQUENCE seq_union_test START 200;
CREATE TABLE version_dist_union (
id bigint DEFAULT nextval('seq_union_test'),
val int,
PRIMARY KEY (id)
);
SELECT create_distributed_table('version_dist_union', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Seed rows
INSERT INTO version_dist_union (val) VALUES (1), (2);
-- UNION ALL duplication; each leg returns two rows -> four inserts total
WITH src AS (
SELECT val FROM version_dist_union
UNION ALL
SELECT val FROM version_dist_union
)
INSERT INTO version_dist_union(val)
SELECT val FROM src;
-- Expected IDs: 200,201,202,203,204,205
SELECT id, val FROM version_dist_union ORDER BY id;
id | val
---------------------------------------------------------------------
200 | 1
201 | 2
202 | 1
203 | 2
204 | 1
205 | 2
(6 rows)
-- End of Issue #7784
SET client_min_messages TO ERROR;
DROP SCHEMA multi_insert_select CASCADE;

View File

@ -2452,5 +2452,136 @@ SELECT coordinator_plan($$
SELECT id FROM dist_table_5 JOIN cte_1 USING(id);
$$);
-------------------------------
-- Regression Test Script for Issue #7784
-- This script tests INSERT ... SELECT with a CTE for:
-- 1. Schema based sharding.
-- 2. A distributed table.
-------------------------------
-- Enable schema-based sharding
SET citus.enable_schema_based_sharding TO ON;
-- Create a table for schema based sharding
CREATE TABLE version_sch_based (
id bigserial NOT NULL,
description varchar(255),
PRIMARY KEY (id)
);
-- Insert an initial row.
INSERT INTO version_sch_based (description) VALUES ('Version 1');
-- Duplicate the row using a CTE and INSERT ... SELECT.
WITH v AS (
SELECT * FROM version_sch_based WHERE description = 'Version 1'
)
INSERT INTO version_sch_based (description)
SELECT description FROM v;
-- Expected output:
-- id | description
-- ----+-------------
-- 1 | Version 1
-- 2 | Version 1
-- Query the table and order by id for consistency.
SELECT * FROM version_sch_based ORDER BY id;
--------------------------------------------------
-- Case 2: Distributed Table Scenario
--------------------------------------------------
SET citus.enable_schema_based_sharding TO OFF;
-- Create a table for the distributed test.
CREATE TABLE version_dist (
id bigserial NOT NULL,
description varchar(255),
PRIMARY KEY (id)
);
-- Register the table as distributed using the 'id' column as the distribution key.
SELECT create_distributed_table('version_dist', 'id');
-- Insert an initial row.
INSERT INTO version_dist (description) VALUES ('Version 1');
-- Duplicate the row using a CTE and INSERT ... SELECT.
WITH v AS (
SELECT * FROM version_dist WHERE description = 'Version 1'
)
INSERT INTO version_dist (description)
SELECT description FROM v;
-- Expected output:
-- id | description
-- ----+-------------
-- 1 | Version 1
-- 2 | Version 1
-- Query the table and order by id for consistency.
SELECT * FROM version_dist ORDER BY id;
-------------------------------
-- Case 3: Distributed INSERT … SELECT with nextval()
-- Verifies that nextval() is evaluated on the coordinator only.
-------------------------------
-- A fresh sequence for clarity
CREATE SEQUENCE seq_nextval_test START 100;
-- Table with DEFAULT nextval()
CREATE TABLE version_dist_seq (
id bigint DEFAULT nextval('seq_nextval_test'),
description text,
PRIMARY KEY (id)
);
SELECT create_distributed_table('version_dist_seq', 'id');
-- Seed one row (id = 100)
INSERT INTO version_dist_seq (description) VALUES ('row0');
-- CTE duplication should produce **exactly one** new sequence value (id = 101)
WITH v AS (
SELECT * FROM version_dist_seq WHERE description = 'row0'
)
INSERT INTO version_dist_seq (description)
SELECT description FROM v;
-- Expected: ids are 100 and 101 (no gaps, no duplicates)
SELECT id, description FROM version_dist_seq ORDER BY id;
-------------------------------
-- Case 4: UNION ALL + nextval() in distributed INSERT … SELECT
-------------------------------
CREATE SEQUENCE seq_union_test START 200;
CREATE TABLE version_dist_union (
id bigint DEFAULT nextval('seq_union_test'),
val int,
PRIMARY KEY (id)
);
SELECT create_distributed_table('version_dist_union', 'id');
-- Seed rows
INSERT INTO version_dist_union (val) VALUES (1), (2);
-- UNION ALL duplication; each leg returns two rows -> four inserts total
WITH src AS (
SELECT val FROM version_dist_union
UNION ALL
SELECT val FROM version_dist_union
)
INSERT INTO version_dist_union(val)
SELECT val FROM src;
-- Expected IDs: 200,201,202,203,204,205
SELECT id, val FROM version_dist_union ORDER BY id;
-- End of Issue #7784
SET client_min_messages TO ERROR;
DROP SCHEMA multi_insert_select CASCADE;