Add regression tests for INSERT ... SELECT with CTE for schema-based and distributed tables

mehmet/issue_7784
Mehmet Yilmaz 2025-04-14 15:46:40 +00:00
parent bb9d90ecc3
commit 93ab49702d
4 changed files with 262 additions and 12 deletions

View File

@ -108,6 +108,24 @@ static void ProcessEntryPair(TargetEntry *insertEntry, TargetEntry *selectEntry,
Form_pg_attribute attr, int targetEntryIndex,
List **projectedEntries, List **nonProjectedEntries);
/*
* DecrementCteLevelWalkerContext
*
* 'offset' is how much we shift ctelevelsup by (e.g. -1),
* 'level' tracks the current query nesting level,
* so we know if RTE_CTE is referencing this level.
*/
typedef struct DecrementCteLevelWalkerContext
{
int oldLevel;
int newLevel;
} DecrementCteLevelWalkerContext;
static void
DecrementCteLevelForQuery(Query *query, int oldLevel, int newLevel);
static bool
DecrementCteLevelWalker(Node *node, DecrementCteLevelWalkerContext *context);
/* depth of current insert/select planner. */
static int insertSelectPlannerLevel = 0;
@ -537,23 +555,89 @@ PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery)
if (list_length(insertSelectQuery->cteList) > 0)
{
if (!isWrapped)
{
/*
* By wrapping the SELECT in a subquery, we can avoid adjusting
* ctelevelsup in RTE's that point to the CTEs.
/* we physically unify ctes from top-level into subquery,
* then want references in the subquery from ctelevelsup=1 => 0
*/
selectRte->subquery = WrapSubquery(selectRte->subquery);
elog(DEBUG1, "Unifying top-level cteList with subquery cteList");
selectRte->subquery->cteList =
list_concat(selectRte->subquery->cteList,
copyObject(insertSelectQuery->cteList));
insertSelectQuery->cteList = NIL;
DecrementCteLevelForQuery(selectRte->subquery, 1, 0);
elog(DEBUG1, "Done shifting ctelevelsup 1->0 for subquery references");
}
/* copy CTEs from the INSERT ... SELECT statement into outer SELECT */
selectRte->subquery->cteList = copyObject(insertSelectQuery->cteList);
selectRte->subquery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE;
insertSelectQuery->cteList = NIL;
}
/*
* DecrementCteLevelWalker
*
*/
static bool
DecrementCteLevelWalker(Node *node, DecrementCteLevelWalkerContext *context)
{
if (node == NULL)
return false;
if (IsA(node, RangeTblEntry))
{
RangeTblEntry *rte = (RangeTblEntry *) node;
if (rte->rtekind == RTE_CTE && rte->ctelevelsup == context->oldLevel)
{
rte->ctelevelsup = context->newLevel;
}
return false;
}
else if (IsA(node, Query))
{
Query *query = (Query *) node;
/*
* Use QTW_EXAMINE_RTES_BEFORE so that this walker is called
* on each RangeTblEntry in query->rtable, giving us a chance
* to adjust ctelevelsup before we do the rest of the query tree.
*/
query_tree_walker(query,
DecrementCteLevelWalker,
(void *) context,
QTW_EXAMINE_RTES_BEFORE);
return false;
}
else
{
/* fallback for expression nodes, etc. */
return expression_tree_walker(node,
DecrementCteLevelWalker,
(void *) context);
}
}
/*
* DecrementCteLevelForQuery
*/
static void
DecrementCteLevelForQuery(Query *query, int oldLevel, int newLevel)
{
DecrementCteLevelWalkerContext ctx;
ctx.oldLevel = oldLevel;
ctx.newLevel = newLevel;
query_tree_walker(query,
DecrementCteLevelWalker,
(void *) &ctx,
QTW_EXAMINE_RTES_BEFORE);
}
/*
* CreateCombineQueryForRouterPlan is used for creating a dummy combineQuery
* for a router plan, since router plans normally don't have one.

View File

@ -0,0 +1,85 @@
---------------------------------------------------------------------
-- Regression Test Script for Issue 7784
-- This script tests INSERT ... SELECT with a CTE for:
-- 1. Schema based sharding.
-- 2. A distributed table.
---------------------------------------------------------------------
-- Enable schema-based sharding
SET citus.shard_replication_factor TO 1;
SET citus.enable_schema_based_sharding TO ON;
---------------------------------------------------------------------
-- Case 1: Schema based sharding
---------------------------------------------------------------------
CREATE SCHEMA issue_7784_schema_based;
SET search_path = issue_7784_schema_based, public;
-- Create a table for schema based sharding
CREATE TABLE version_local (
id bigserial NOT NULL,
description varchar(255),
PRIMARY KEY (id)
);
-- Insert an initial row.
INSERT INTO version_local (description) VALUES ('Version 1');
-- Duplicate the row using a CTE and INSERT ... SELECT.
WITH v AS (
SELECT * FROM version_local WHERE description = 'Version 1'
)
INSERT INTO version_local (description)
SELECT description FROM v;
-- Expected output:
-- id | description
-- ----+-------------
-- 1 | Version 1
-- 2 | Version 1
-- Query the table and order by id for consistency.
SELECT * FROM version_local ORDER BY id;
id | description
---------------------------------------------------------------------
1 | Version 1
2 | Version 1
(2 rows)
---------------------------------------------------------------------
-- Case 2: Distributed Table Scenario
---------------------------------------------------------------------
SET citus.enable_schema_based_sharding TO OFF;
CREATE SCHEMA issue_7784_distributed;
SET search_path = issue_7784_distributed, public;
-- Create a table for the distributed test.
CREATE TABLE version_dist (
id bigserial NOT NULL,
description varchar(255),
PRIMARY KEY (id)
);
-- Register the table as distributed using the 'id' column as the distribution key.
SELECT create_distributed_table('version_dist', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Insert an initial row.
INSERT INTO version_dist (description) VALUES ('Version 1');
-- Duplicate the row using a CTE and INSERT ... SELECT.
WITH v AS (
SELECT * FROM version_dist WHERE description = 'Version 1'
)
INSERT INTO version_dist (description)
SELECT description FROM v;
-- Expected output:
-- id | description
-- ----+-------------
-- 1 | Version 1
-- 2 | Version 1
-- Query the table and order by id for consistency.
SELECT * FROM version_dist ORDER BY id;
id | description
---------------------------------------------------------------------
1 | Version 1
2 | Version 1
(2 rows)
DROP SCHEMA issue_7784_schema_based CASCADE;
NOTICE: drop cascades to table issue_7784_schema_based.version_local
DROP SCHEMA issue_7784_distributed CASCADE;
NOTICE: drop cascades to table version_dist

View File

@ -104,7 +104,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement
test: binary_protocol
test: alter_table_set_access_method
test: alter_distributed_table
test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891
test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891 issue_7784
test: object_propagation_debug
test: undistribute_table
test: run_command_on_all_nodes

View File

@ -0,0 +1,81 @@
-------------------------------
-- Regression Test Script for Issue 7784
-- This script tests INSERT ... SELECT with a CTE for:
-- 1. Schema based sharding.
-- 2. A distributed table.
-------------------------------
-- Enable schema-based sharding
SET citus.shard_replication_factor TO 1;
SET citus.enable_schema_based_sharding TO ON;
--------------------------------------------------
-- Case 1: Schema based sharding
--------------------------------------------------
CREATE SCHEMA issue_7784_schema_based;
SET search_path = issue_7784_schema_based, public;
-- Create a table for schema based sharding
CREATE TABLE version_local (
id bigserial NOT NULL,
description varchar(255),
PRIMARY KEY (id)
);
-- Insert an initial row.
INSERT INTO version_local (description) VALUES ('Version 1');
-- Duplicate the row using a CTE and INSERT ... SELECT.
WITH v AS (
SELECT * FROM version_local WHERE description = 'Version 1'
)
INSERT INTO version_local (description)
SELECT description FROM v;
-- Expected output:
-- id | description
-- ----+-------------
-- 1 | Version 1
-- 2 | Version 1
-- Query the table and order by id for consistency.
SELECT * FROM version_local ORDER BY id;
--------------------------------------------------
-- Case 2: Distributed Table Scenario
--------------------------------------------------
SET citus.enable_schema_based_sharding TO OFF;
CREATE SCHEMA issue_7784_distributed;
SET search_path = issue_7784_distributed, public;
-- Create a table for the distributed test.
CREATE TABLE version_dist (
id bigserial NOT NULL,
description varchar(255),
PRIMARY KEY (id)
);
-- Register the table as distributed using the 'id' column as the distribution key.
SELECT create_distributed_table('version_dist', 'id');
-- Insert an initial row.
INSERT INTO version_dist (description) VALUES ('Version 1');
-- Duplicate the row using a CTE and INSERT ... SELECT.
WITH v AS (
SELECT * FROM version_dist WHERE description = 'Version 1'
)
INSERT INTO version_dist (description)
SELECT description FROM v;
-- Expected output:
-- id | description
-- ----+-------------
-- 1 | Version 1
-- 2 | Version 1
-- Query the table and order by id for consistency.
SELECT * FROM version_dist ORDER BY id;
DROP SCHEMA issue_7784_schema_based CASCADE;
DROP SCHEMA issue_7784_distributed CASCADE;