From 93ab49702d0f0d37083e2a9e47ae2a0ab07140fe Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Mon, 14 Apr 2025 15:46:40 +0000 Subject: [PATCH] Add regression tests for INSERT ... SELECT with CTE for schema-based and distributed tables --- .../planner/insert_select_planner.c | 106 ++++++++++++++++-- src/test/regress/expected/issue_7784.out | 85 ++++++++++++++ src/test/regress/multi_schedule | 2 +- src/test/regress/sql/issue_7784.sql | 81 +++++++++++++ 4 files changed, 262 insertions(+), 12 deletions(-) create mode 100644 src/test/regress/expected/issue_7784.out create mode 100644 src/test/regress/sql/issue_7784.sql diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index db5c3d4ff..3455e6aea 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -108,6 +108,24 @@ static void ProcessEntryPair(TargetEntry *insertEntry, TargetEntry *selectEntry, Form_pg_attribute attr, int targetEntryIndex, List **projectedEntries, List **nonProjectedEntries); +/* + * DecrementCteLevelWalkerContext + * + * 'offset' is how much we shift ctelevelsup by (e.g. -1), + * 'level' tracks the current query nesting level, + * so we know if RTE_CTE is referencing this level. + */ +typedef struct DecrementCteLevelWalkerContext +{ + int oldLevel; + int newLevel; +} DecrementCteLevelWalkerContext; + + +static void +DecrementCteLevelForQuery(Query *query, int oldLevel, int newLevel); +static bool +DecrementCteLevelWalker(Node *node, DecrementCteLevelWalkerContext *context); /* depth of current insert/select planner. */ static int insertSelectPlannerLevel = 0; @@ -537,22 +555,88 @@ PrepareInsertSelectForCitusPlanner(Query *insertSelectQuery) if (list_length(insertSelectQuery->cteList) > 0) { - if (!isWrapped) - { - /* - * By wrapping the SELECT in a subquery, we can avoid adjusting - * ctelevelsup in RTE's that point to the CTEs. - */ - selectRte->subquery = WrapSubquery(selectRte->subquery); - } + /* we physically unify ctes from top-level into subquery, + * then want references in the subquery from ctelevelsup=1 => 0 + */ + elog(DEBUG1, "Unifying top-level cteList with subquery cteList"); - /* copy CTEs from the INSERT ... SELECT statement into outer SELECT */ - selectRte->subquery->cteList = copyObject(insertSelectQuery->cteList); - selectRte->subquery->hasModifyingCTE = insertSelectQuery->hasModifyingCTE; + selectRte->subquery->cteList = + list_concat(selectRte->subquery->cteList, + copyObject(insertSelectQuery->cteList)); insertSelectQuery->cteList = NIL; + + DecrementCteLevelForQuery(selectRte->subquery, 1, 0); + elog(DEBUG1, "Done shifting ctelevelsup 1->0 for subquery references"); } + } +/* + * DecrementCteLevelWalker + * + */ +static bool +DecrementCteLevelWalker(Node *node, DecrementCteLevelWalkerContext *context) +{ + if (node == NULL) + return false; + + if (IsA(node, RangeTblEntry)) + { + RangeTblEntry *rte = (RangeTblEntry *) node; + + if (rte->rtekind == RTE_CTE && rte->ctelevelsup == context->oldLevel) + { + rte->ctelevelsup = context->newLevel; + } + + return false; + } + else if (IsA(node, Query)) + { + Query *query = (Query *) node; + + /* + * Use QTW_EXAMINE_RTES_BEFORE so that this walker is called + * on each RangeTblEntry in query->rtable, giving us a chance + * to adjust ctelevelsup before we do the rest of the query tree. + */ + query_tree_walker(query, + DecrementCteLevelWalker, + (void *) context, + QTW_EXAMINE_RTES_BEFORE); + + return false; + } + else + { + /* fallback for expression nodes, etc. */ + return expression_tree_walker(node, + DecrementCteLevelWalker, + (void *) context); + } +} + + +/* + * DecrementCteLevelForQuery + */ +static void +DecrementCteLevelForQuery(Query *query, int oldLevel, int newLevel) +{ + DecrementCteLevelWalkerContext ctx; + + ctx.oldLevel = oldLevel; + ctx.newLevel = newLevel; + + query_tree_walker(query, + DecrementCteLevelWalker, + (void *) &ctx, + QTW_EXAMINE_RTES_BEFORE); +} + + + /* * CreateCombineQueryForRouterPlan is used for creating a dummy combineQuery diff --git a/src/test/regress/expected/issue_7784.out b/src/test/regress/expected/issue_7784.out new file mode 100644 index 000000000..f3ee3074c --- /dev/null +++ b/src/test/regress/expected/issue_7784.out @@ -0,0 +1,85 @@ +--------------------------------------------------------------------- +-- Regression Test Script for Issue 7784 +-- This script tests INSERT ... SELECT with a CTE for: +-- 1. Schema based sharding. +-- 2. A distributed table. +--------------------------------------------------------------------- +-- Enable schema-based sharding +SET citus.shard_replication_factor TO 1; +SET citus.enable_schema_based_sharding TO ON; +--------------------------------------------------------------------- +-- Case 1: Schema based sharding +--------------------------------------------------------------------- +CREATE SCHEMA issue_7784_schema_based; +SET search_path = issue_7784_schema_based, public; +-- Create a table for schema based sharding +CREATE TABLE version_local ( + id bigserial NOT NULL, + description varchar(255), + PRIMARY KEY (id) +); +-- Insert an initial row. +INSERT INTO version_local (description) VALUES ('Version 1'); +-- Duplicate the row using a CTE and INSERT ... SELECT. +WITH v AS ( + SELECT * FROM version_local WHERE description = 'Version 1' +) +INSERT INTO version_local (description) +SELECT description FROM v; +-- Expected output: +-- id | description +-- ----+------------- +-- 1 | Version 1 +-- 2 | Version 1 +-- Query the table and order by id for consistency. +SELECT * FROM version_local ORDER BY id; + id | description +--------------------------------------------------------------------- + 1 | Version 1 + 2 | Version 1 +(2 rows) + +--------------------------------------------------------------------- +-- Case 2: Distributed Table Scenario +--------------------------------------------------------------------- +SET citus.enable_schema_based_sharding TO OFF; +CREATE SCHEMA issue_7784_distributed; +SET search_path = issue_7784_distributed, public; +-- Create a table for the distributed test. +CREATE TABLE version_dist ( + id bigserial NOT NULL, + description varchar(255), + PRIMARY KEY (id) +); +-- Register the table as distributed using the 'id' column as the distribution key. +SELECT create_distributed_table('version_dist', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Insert an initial row. +INSERT INTO version_dist (description) VALUES ('Version 1'); +-- Duplicate the row using a CTE and INSERT ... SELECT. +WITH v AS ( + SELECT * FROM version_dist WHERE description = 'Version 1' +) +INSERT INTO version_dist (description) +SELECT description FROM v; +-- Expected output: +-- id | description +-- ----+------------- +-- 1 | Version 1 +-- 2 | Version 1 +-- Query the table and order by id for consistency. +SELECT * FROM version_dist ORDER BY id; + id | description +--------------------------------------------------------------------- + 1 | Version 1 + 2 | Version 1 +(2 rows) + +DROP SCHEMA issue_7784_schema_based CASCADE; +NOTICE: drop cascades to table issue_7784_schema_based.version_local +DROP SCHEMA issue_7784_distributed CASCADE; +NOTICE: drop cascades to table version_dist diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 3d7bd6e98..a5b81ebd2 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -104,7 +104,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement test: binary_protocol test: alter_table_set_access_method test: alter_distributed_table -test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891 +test: issue_5248 issue_5099 issue_5763 issue_6543 issue_6758 issue_7477 issue_7891 issue_7784 test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes diff --git a/src/test/regress/sql/issue_7784.sql b/src/test/regress/sql/issue_7784.sql new file mode 100644 index 000000000..c8cb0c4c0 --- /dev/null +++ b/src/test/regress/sql/issue_7784.sql @@ -0,0 +1,81 @@ +------------------------------- +-- Regression Test Script for Issue 7784 +-- This script tests INSERT ... SELECT with a CTE for: +-- 1. Schema based sharding. +-- 2. A distributed table. +------------------------------- + +-- Enable schema-based sharding +SET citus.shard_replication_factor TO 1; +SET citus.enable_schema_based_sharding TO ON; + +-------------------------------------------------- +-- Case 1: Schema based sharding +-------------------------------------------------- +CREATE SCHEMA issue_7784_schema_based; +SET search_path = issue_7784_schema_based, public; + +-- Create a table for schema based sharding +CREATE TABLE version_local ( + id bigserial NOT NULL, + description varchar(255), + PRIMARY KEY (id) +); + +-- Insert an initial row. +INSERT INTO version_local (description) VALUES ('Version 1'); + +-- Duplicate the row using a CTE and INSERT ... SELECT. +WITH v AS ( + SELECT * FROM version_local WHERE description = 'Version 1' +) +INSERT INTO version_local (description) +SELECT description FROM v; + +-- Expected output: +-- id | description +-- ----+------------- +-- 1 | Version 1 +-- 2 | Version 1 + +-- Query the table and order by id for consistency. +SELECT * FROM version_local ORDER BY id; + +-------------------------------------------------- +-- Case 2: Distributed Table Scenario +-------------------------------------------------- +SET citus.enable_schema_based_sharding TO OFF; +CREATE SCHEMA issue_7784_distributed; +SET search_path = issue_7784_distributed, public; + +-- Create a table for the distributed test. +CREATE TABLE version_dist ( + id bigserial NOT NULL, + description varchar(255), + PRIMARY KEY (id) +); + +-- Register the table as distributed using the 'id' column as the distribution key. +SELECT create_distributed_table('version_dist', 'id'); + +-- Insert an initial row. +INSERT INTO version_dist (description) VALUES ('Version 1'); + +-- Duplicate the row using a CTE and INSERT ... SELECT. +WITH v AS ( + SELECT * FROM version_dist WHERE description = 'Version 1' +) +INSERT INTO version_dist (description) +SELECT description FROM v; + +-- Expected output: +-- id | description +-- ----+------------- +-- 1 | Version 1 +-- 2 | Version 1 + +-- Query the table and order by id for consistency. +SELECT * FROM version_dist ORDER BY id; + +DROP SCHEMA issue_7784_schema_based CASCADE; +DROP SCHEMA issue_7784_distributed CASCADE; \ No newline at end of file