INSERT ... SELECT with CTE for schema-based and distributed tables

Add test cases for distributed INSERT ... SELECT with nextval() and UNION ALL

Refactor comments to improve clarity on lifting volatile expressions in the insert-select planner
pull/7976/head
Mehmet Yilmaz 2025-04-29 13:15:32 +00:00
parent d4dd44e715
commit 742745e6e6
4 changed files with 416 additions and 67 deletions

View File

@ -1247,7 +1247,7 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
RangeTblEntry *subqueryPartitionColumnRelationIdRTE = NULL;
List *parentQueryList = list_make2(query, subquery);
bool skipOuterVars = true;
bool skipOuterVars = false;
FindReferencedTableColumn(selectTargetExpr,
parentQueryList, subquery,
&subqueryPartitionColumn,
@ -1543,71 +1543,147 @@ InsertSelectResultIdPrefix(uint64 planId)
/*
* WrapSubquery wraps the given query as a subquery in a newly constructed
* "SELECT * FROM (...subquery...) citus_insert_select_subquery" query.
* Return true if the expression tree can change value within a single scan
* (i.e. the planner must treat it as VOLATILE).
* We just delegate to PostgreSQLs helper.
*/
static inline bool
expr_is_volatile(Node *node)
{
/* contain_volatile_functions() also returns true for set-returning
* volatile functions and for nextval()/currval(). */
return contain_volatile_functions(node);
}
/*
* WrapSubquery
*
* Build a wrapper query:
*
* SELECT <outer-TL>
* FROM ( <subquery with any volatile items stripped> )
* citus_insert_select_subquery
*
* Purpose:
* - Preserve column numbering while lifting volatile expressions to the coordinator.
* - Volatile (non-deterministic) expressions not used in GROUP BY / ORDER BY
* are lifted to the outer SELECT to ensure they are evaluated only once.
* - Stable/immutable expressions or volatile ones required by GROUP BY / ORDER BY
* stay in the subquery and are accessed via Vars in the outer SELECT.
*/
Query *
WrapSubquery(Query *subquery)
{
/*
* 1. Build the wrapper skeleton: SELECT ... FROM (subquery) alias
*/
ParseState *pstate = make_parsestate(NULL);
List *newTargetList = NIL;
Query *outerQuery = makeNode(Query);
outerQuery->commandType = CMD_SELECT;
/* create range table entries */
Alias *selectAlias = makeAlias("citus_insert_select_subquery", NIL);
RangeTblEntry *newRangeTableEntry = RangeTableEntryFromNSItem(
addRangeTableEntryForSubquery(
pstate, subquery,
selectAlias, false, true));
outerQuery->rtable = list_make1(newRangeTableEntry);
Alias *alias = makeAlias("citus_insert_select_subquery", NIL);
RangeTblEntry *rte_subq =
RangeTableEntryFromNSItem(
addRangeTableEntryForSubquery(pstate,
subquery, /* still points to original subquery */
alias,
false, /* not LATERAL */
true)); /* in FROM clause */
#if PG_VERSION_NUM >= PG_VERSION_16
outerQuery->rtable = list_make1(rte_subq);
/*
* This part of the code is more of a sanity check for readability,
* it doesn't really do anything.
* addRangeTableEntryForSubquery doesn't add permission info
* because the range table is set to be RTE_SUBQUERY.
* Hence we should also have no perminfos here.
*/
Assert(newRangeTableEntry->rtekind == RTE_SUBQUERY &&
newRangeTableEntry->perminfoindex == 0);
#if PG_VERSION_NUM >= 160000
/* Ensure RTE_SUBQUERY has proper permission handling */
Assert(rte_subq->rtekind == RTE_SUBQUERY &&
rte_subq->perminfoindex == 0);
outerQuery->rteperminfos = NIL;
#endif
/* set the FROM expression to the subquery */
RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
newRangeTableRef->rtindex = 1;
outerQuery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
RangeTblRef *rtref = makeNode(RangeTblRef);
rtref->rtindex = 1; /* Only one RTE, so index is 1 */
outerQuery->jointree = makeFromExpr(list_make1(rtref), NULL);
/* create a target list that matches the SELECT */
TargetEntry *selectTargetEntry = NULL;
foreach_declared_ptr(selectTargetEntry, subquery->targetList)
{
/* exactly 1 entry in FROM */
int indexInRangeTable = 1;
/*
* 2. Create new target lists for inner (worker) and outer (coordinator)
*/
List *newInnerTL = NIL;
List *newOuterTL = NIL;
int nextResno = 1;
if (selectTargetEntry->resjunk)
TargetEntry *te = NULL;
foreach_declared_ptr(te, subquery->targetList)
{
if (te->resjunk)
{
/* Keep resjunk entries only in subquery (not in outer query) */
newInnerTL = lappend(newInnerTL, te);
continue;
}
Var *newSelectVar = makeVar(indexInRangeTable, selectTargetEntry->resno,
exprType((Node *) selectTargetEntry->expr),
exprTypmod((Node *) selectTargetEntry->expr),
exprCollation((Node *) selectTargetEntry->expr), 0);
bool isVolatile = expr_is_volatile((Node *) te->expr);
bool usedInSort = (te->ressortgroupref != 0);
TargetEntry *newSelectTargetEntry = makeTargetEntry((Expr *) newSelectVar,
selectTargetEntry->resno,
selectTargetEntry->resname,
selectTargetEntry->resjunk);
if (isVolatile && !usedInSort)
{
/*
* Lift volatile expression to outer query so it's evaluated once.
* In inner query, place a NULL of the same type to preserve column position.
*/
TargetEntry *outerTE =
makeTargetEntry(copyObject(te->expr),
list_length(newOuterTL) + 1,
te->resname,
false);
newOuterTL = lappend(newOuterTL, outerTE);
newTargetList = lappend(newTargetList, newSelectTargetEntry);
Const *nullConst = makeNullConst(exprType((Node *) te->expr),
exprTypmod((Node *) te->expr),
exprCollation((Node *) te->expr));
TargetEntry *placeholder =
makeTargetEntry((Expr *) nullConst,
nextResno++, /* preserve column position */
te->resname,
false); /* visible, not resjunk */
newInnerTL = lappend(newInnerTL, placeholder);
}
else
{
/*
* Either:
* - expression is stable or immutable, or
* - volatile but needed for sorting or grouping
*
* In both cases, keep it in subquery and reference it using a Var.
*/
TargetEntry *innerTE = te; /* reuse original node */
innerTE->resno = nextResno++;
newInnerTL = lappend(newInnerTL, innerTE);
Var *v = makeVar(/* subquery reference index is 1 */
1,
innerTE->resno,
exprType((Node *) innerTE->expr),
exprTypmod((Node *) innerTE->expr),
exprCollation((Node *) innerTE->expr),
0);
TargetEntry *outerTE =
makeTargetEntry((Expr *) v,
list_length(newOuterTL) + 1,
innerTE->resname,
false);
newOuterTL = lappend(newOuterTL, outerTE);
}
}
outerQuery->targetList = newTargetList;
/*
* 3. Assign target lists and return the wrapper query
*/
subquery->targetList = newInnerTL;
outerQuery->targetList = newOuterTL;
return outerQuery;
}

View File

@ -4478,46 +4478,43 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
return;
}
if (candidateColumn->varlevelsup > 0)
/* Walk up varlevelsup as many times as needed */
while (candidateColumn->varlevelsup > 0)
{
/* Caller asked us to ignore any outer Vars → just bail out */
if (skipOuterVars)
{
/*
* we don't want to process outer vars, so we return early.
*/
return;
}
/*
* We currently don't support finding partition keys in the subqueries
* that reference outer subqueries. For example, in correlated
* subqueries in WHERE clause, we don't support use of partition keys
* in the subquery that is referred from the outer query.
*/
int parentQueryIndex = list_length(parentQueryList) -
candidateColumn->varlevelsup;
if (!(IsIndexInRange(parentQueryList, parentQueryIndex)))
/* Locate the parent query that owns this Var */
int parentIdx =
list_length(parentQueryList) - candidateColumn->varlevelsup;
if (!IsIndexInRange(parentQueryList, parentIdx))
{
return;
return; /* malformed tree */
}
/*
* Before we recurse into the query tree, we should update the candidateColumn and we use copy of it.
* As we get the query from varlevelsup up, we reset the varlevelsup.
*/
/* Work on a fresh copy of the Var with varlevelsup reset */
candidateColumn = copyObject(candidateColumn);
candidateColumn->varlevelsup = 0;
/*
* We should be careful about these fields because they need to
* be updated correctly based on ctelevelsup and varlevelsup.
* Make a *completely private* copy of parentQueryList for the
* next recursion step. We copy the whole list and then truncate
* so every recursive branch owns its own list cells.
*/
query = list_nth(parentQueryList, parentQueryIndex);
parentQueryList = list_truncate(parentQueryList, parentQueryIndex);
List *newParent =
list_copy(parentQueryList); /* duplicates every cell */
newParent = list_truncate(newParent, parentIdx);
query = list_nth(parentQueryList, parentIdx);
parentQueryList = newParent; /* hand private copy down */
/* Loop again if still pointing to an outer level */
}
if (candidateColumn->varattno == InvalidAttrNumber)
{
/*

View File

@ -3492,5 +3492,150 @@ $$);
Task Count: 1
(2 rows)
---------------------------------------------------------------------
-- Regression Test Script for Issue #7784
-- This script tests INSERT ... SELECT with a CTE for:
-- 1. Schema based sharding.
-- 2. A distributed table.
---------------------------------------------------------------------
-- Enable schema-based sharding
SET citus.enable_schema_based_sharding TO ON;
-- Create a table for schema based sharding
CREATE TABLE version_sch_based (
id bigserial NOT NULL,
description varchar(255),
PRIMARY KEY (id)
);
-- Insert an initial row.
INSERT INTO version_sch_based (description) VALUES ('Version 1');
-- Duplicate the row using a CTE and INSERT ... SELECT.
WITH v AS (
SELECT * FROM version_sch_based WHERE description = 'Version 1'
)
INSERT INTO version_sch_based (description)
SELECT description FROM v;
-- Expected output:
-- id | description
-- ----+-------------
-- 1 | Version 1
-- 2 | Version 1
-- Query the table and order by id for consistency.
SELECT * FROM version_sch_based ORDER BY id;
id | description
---------------------------------------------------------------------
1 | Version 1
2 | Version 1
(2 rows)
---------------------------------------------------------------------
-- Case 2: Distributed Table Scenario
---------------------------------------------------------------------
SET citus.enable_schema_based_sharding TO OFF;
-- Create a table for the distributed test.
CREATE TABLE version_dist (
id bigserial NOT NULL,
description varchar(255),
PRIMARY KEY (id)
);
-- Register the table as distributed using the 'id' column as the distribution key.
SELECT create_distributed_table('version_dist', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Insert an initial row.
INSERT INTO version_dist (description) VALUES ('Version 1');
-- Duplicate the row using a CTE and INSERT ... SELECT.
WITH v AS (
SELECT * FROM version_dist WHERE description = 'Version 1'
)
INSERT INTO version_dist (description)
SELECT description FROM v;
-- Expected output:
-- id | description
-- ----+-------------
-- 1 | Version 1
-- 2 | Version 1
-- Query the table and order by id for consistency.
SELECT * FROM version_dist ORDER BY id;
id | description
---------------------------------------------------------------------
1 | Version 1
2 | Version 1
(2 rows)
---------------------------------------------------------------------
-- Case 3: Distributed INSERT … SELECT with nextval()
-- Verifies that nextval() is evaluated on the coordinator only.
---------------------------------------------------------------------
-- A fresh sequence for clarity
CREATE SEQUENCE seq_nextval_test START 100;
-- Table with DEFAULT nextval()
CREATE TABLE version_dist_seq (
id bigint DEFAULT nextval('seq_nextval_test'),
description text,
PRIMARY KEY (id)
);
SELECT create_distributed_table('version_dist_seq', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Seed one row (id = 100)
INSERT INTO version_dist_seq (description) VALUES ('row0');
-- CTE duplication should produce **exactly one** new sequence value (id = 101)
WITH v AS (
SELECT * FROM version_dist_seq WHERE description = 'row0'
)
INSERT INTO version_dist_seq (description)
SELECT description FROM v;
-- Expected: ids are 100 and 101 (no gaps, no duplicates)
SELECT id, description FROM version_dist_seq ORDER BY id;
id | description
---------------------------------------------------------------------
100 | row0
101 | row0
(2 rows)
---------------------------------------------------------------------
-- Case 4: UNION ALL + nextval() in distributed INSERT … SELECT
---------------------------------------------------------------------
CREATE SEQUENCE seq_union_test START 200;
CREATE TABLE version_dist_union (
id bigint DEFAULT nextval('seq_union_test'),
val int,
PRIMARY KEY (id)
);
SELECT create_distributed_table('version_dist_union', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Seed rows
INSERT INTO version_dist_union (val) VALUES (1), (2);
-- UNION ALL duplication; each leg returns two rows -> four inserts total
WITH src AS (
SELECT val FROM version_dist_union
UNION ALL
SELECT val FROM version_dist_union
)
INSERT INTO version_dist_union(val)
SELECT val FROM src;
-- Expected IDs: 200,201,202,203,204,205
SELECT id, val FROM version_dist_union ORDER BY id;
id | val
---------------------------------------------------------------------
200 | 1
201 | 2
202 | 1
203 | 2
204 | 1
205 | 2
(6 rows)
-- End of Issue #7784
SET client_min_messages TO ERROR;
DROP SCHEMA multi_insert_select CASCADE;

View File

@ -2452,5 +2452,136 @@ SELECT coordinator_plan($$
SELECT id FROM dist_table_5 JOIN cte_1 USING(id);
$$);
-------------------------------
-- Regression Test Script for Issue #7784
-- This script tests INSERT ... SELECT with a CTE for:
-- 1. Schema based sharding.
-- 2. A distributed table.
-------------------------------
-- Enable schema-based sharding
SET citus.enable_schema_based_sharding TO ON;
-- Create a table for schema based sharding
CREATE TABLE version_sch_based (
id bigserial NOT NULL,
description varchar(255),
PRIMARY KEY (id)
);
-- Insert an initial row.
INSERT INTO version_sch_based (description) VALUES ('Version 1');
-- Duplicate the row using a CTE and INSERT ... SELECT.
WITH v AS (
SELECT * FROM version_sch_based WHERE description = 'Version 1'
)
INSERT INTO version_sch_based (description)
SELECT description FROM v;
-- Expected output:
-- id | description
-- ----+-------------
-- 1 | Version 1
-- 2 | Version 1
-- Query the table and order by id for consistency.
SELECT * FROM version_sch_based ORDER BY id;
--------------------------------------------------
-- Case 2: Distributed Table Scenario
--------------------------------------------------
SET citus.enable_schema_based_sharding TO OFF;
-- Create a table for the distributed test.
CREATE TABLE version_dist (
id bigserial NOT NULL,
description varchar(255),
PRIMARY KEY (id)
);
-- Register the table as distributed using the 'id' column as the distribution key.
SELECT create_distributed_table('version_dist', 'id');
-- Insert an initial row.
INSERT INTO version_dist (description) VALUES ('Version 1');
-- Duplicate the row using a CTE and INSERT ... SELECT.
WITH v AS (
SELECT * FROM version_dist WHERE description = 'Version 1'
)
INSERT INTO version_dist (description)
SELECT description FROM v;
-- Expected output:
-- id | description
-- ----+-------------
-- 1 | Version 1
-- 2 | Version 1
-- Query the table and order by id for consistency.
SELECT * FROM version_dist ORDER BY id;
-------------------------------
-- Case 3: Distributed INSERT … SELECT with nextval()
-- Verifies that nextval() is evaluated on the coordinator only.
-------------------------------
-- A fresh sequence for clarity
CREATE SEQUENCE seq_nextval_test START 100;
-- Table with DEFAULT nextval()
CREATE TABLE version_dist_seq (
id bigint DEFAULT nextval('seq_nextval_test'),
description text,
PRIMARY KEY (id)
);
SELECT create_distributed_table('version_dist_seq', 'id');
-- Seed one row (id = 100)
INSERT INTO version_dist_seq (description) VALUES ('row0');
-- CTE duplication should produce **exactly one** new sequence value (id = 101)
WITH v AS (
SELECT * FROM version_dist_seq WHERE description = 'row0'
)
INSERT INTO version_dist_seq (description)
SELECT description FROM v;
-- Expected: ids are 100 and 101 (no gaps, no duplicates)
SELECT id, description FROM version_dist_seq ORDER BY id;
-------------------------------
-- Case 4: UNION ALL + nextval() in distributed INSERT … SELECT
-------------------------------
CREATE SEQUENCE seq_union_test START 200;
CREATE TABLE version_dist_union (
id bigint DEFAULT nextval('seq_union_test'),
val int,
PRIMARY KEY (id)
);
SELECT create_distributed_table('version_dist_union', 'id');
-- Seed rows
INSERT INTO version_dist_union (val) VALUES (1), (2);
-- UNION ALL duplication; each leg returns two rows -> four inserts total
WITH src AS (
SELECT val FROM version_dist_union
UNION ALL
SELECT val FROM version_dist_union
)
INSERT INTO version_dist_union(val)
SELECT val FROM src;
-- Expected IDs: 200,201,202,203,204,205
SELECT id, val FROM version_dist_union ORDER BY id;
-- End of Issue #7784
SET client_min_messages TO ERROR;
DROP SCHEMA multi_insert_select CASCADE;