From 3ed6fea1cf622bd5a69c425fcdef33a336686fc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96nder=20Kalac=C4=B1?= Date: Thu, 25 Aug 2022 12:27:08 +0200 Subject: [PATCH] Prevent Merge command on distributed tables [PG 15] (#6238) --- .../distributed/planner/distributed_planner.c | 78 ++++++++- .../planner/fast_path_router_planner.c | 11 ++ src/test/regress/expected/pg15.out | 154 +++++++++++++++++- src/test/regress/sql/pg15.sql | 89 ++++++++++ 4 files changed, 326 insertions(+), 6 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index d16aa6785..35664d7c7 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -74,6 +74,8 @@ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ int PlannerLevel = 0; +static void ErrorIfQueryHasMergeCommand(Query *queryTree); +static bool ContainsMergeCommandWalker(Node *node); static bool ListContainsDistributedTableRTE(List *rangeTableList, bool *maybeHasForeignDistributedTable); static bool IsUpdateOrDelete(Query *query); @@ -148,6 +150,12 @@ distributed_planner(Query *parse, /* this cursor flag could only be set when Citus has been loaded */ Assert(CitusHasBeenLoaded()); + /* + * We cannot have merge command for this path as well because + * there cannot be recursively planned merge command. + */ + Assert(!ContainsMergeCommandWalker((Node *) parse)); + needsDistributedPlanning = true; } else if (CitusHasBeenLoaded()) @@ -187,13 +195,20 @@ distributed_planner(Query *parse, planContext.originalQuery = copyObject(parse); - /* - * When there are partitioned tables (not applicable to fast path), - * pretend that they are regular tables to avoid unnecessary work - * in standard_planner. - */ + if (!fastPathRouterQuery) { + /* + * Fast path queries cannot have merge command, and we + * prevent the remaining here. + */ + ErrorIfQueryHasMergeCommand(parse); + + /* + * When there are partitioned tables (not applicable to fast path), + * pretend that they are regular tables to avoid unnecessary work + * in standard_planner. + */ bool setPartitionedTablesInherited = false; AdjustPartitioningForDistributedPlanning(rangeTableList, setPartitionedTablesInherited); @@ -287,6 +302,59 @@ distributed_planner(Query *parse, } +/* + * ErrorIfQueryHasMergeCommand walks over the query tree and throws error + * if there are any Merge command (e.g., CMD_MERGE) in the query tree. + */ +static void +ErrorIfQueryHasMergeCommand(Query *queryTree) +{ + /* + * Postgres currently doesn't support Merge queries inside subqueries and + * ctes, but lets be defensive and do query tree walk anyway. + * + * We do not call this path for fast-path queries to avoid this additional + * overhead. + */ + if (ContainsMergeCommandWalker((Node *) queryTree)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("MERGE command is not supported on Citus tables yet"))); + } +} + + +/* + * ContainsMergeCommandWalker walks over the node and finds if there are any + * Merge command (e.g., CMD_MERGE) in the node. + */ +static bool +ContainsMergeCommandWalker(Node *node) +{ +#if PG_VERSION_NUM >= PG_VERSION_15 + if (node == NULL) + { + return false; + } + + if (IsA(node, Query)) + { + Query *query = (Query *) node; + if (query->commandType == CMD_MERGE) + { + return true; + } + + return query_tree_walker((Query *) node, ContainsMergeCommandWalker, NULL, 0); + } + + return expression_tree_walker(node, ContainsMergeCommandWalker, NULL); +#endif + + return false; +} + + /* * ExtractRangeTableEntryList is a wrapper around ExtractRangeTableEntryWalker. * The function traverses the input query and returns all the range table diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 5d02be07c..5be585c67 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -162,6 +162,17 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) return false; } +#if PG_VERSION_NUM >= PG_VERSION_15 + if (query->commandType == CMD_MERGE) + { + /* + * Citus doesn't support MERGE command, lets return + * early and explicitly for fast-path queries. + */ + return false; + } +#endif + /* * We want to deal with only very simple queries. Some of the * checks might be too restrictive, still we prefer this way. diff --git a/src/test/regress/expected/pg15.out b/src/test/regress/expected/pg15.out index c07ff56f3..f59586b7a 100644 --- a/src/test/regress/expected/pg15.out +++ b/src/test/regress/expected/pg15.out @@ -238,9 +238,159 @@ NOTICE: renaming the new table to pg15.generated_stored_ref (5 rows) ROLLBACK; +SELECT undistribute_table('generated_stored_ref'); +NOTICE: creating a new table for pg15.generated_stored_ref +NOTICE: moving the data of pg15.generated_stored_ref +NOTICE: dropping the old pg15.generated_stored_ref +NOTICE: renaming the new table to pg15.generated_stored_ref + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +-- +-- In PG15, there is a new command called MERGE +-- It is currently not supported for Citus tables +-- Test the behavior with various commands with Citus table types +-- Relevant PG Commit: 7103ebb7aae8ab8076b7e85f335ceb8fe799097c +-- +CREATE TABLE tbl1 +( + x INT +); +CREATE TABLE tbl2 +( + x INT +); +-- on local tables works fine +MERGE INTO tbl1 USING tbl2 ON (true) +WHEN MATCHED THEN DELETE; +-- add coordinator node as a worker +SET client_min_messages to ERROR; +SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +RESET client_min_messages; +-- one table is Citus local table, fails +SELECT citus_add_local_table_to_metadata('tbl1'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO tbl1 USING tbl2 ON (true) +WHEN MATCHED THEN DELETE; +ERROR: MERGE command is not supported on Citus tables yet +SELECT undistribute_table('tbl1'); +NOTICE: creating a new table for pg15.tbl1 +NOTICE: moving the data of pg15.tbl1 +NOTICE: dropping the old pg15.tbl1 +NOTICE: renaming the new table to pg15.tbl1 + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +-- the other table is Citus local table, fails +SELECT citus_add_local_table_to_metadata('tbl2'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO tbl1 USING tbl2 ON (true) +WHEN MATCHED THEN DELETE; +ERROR: MERGE command is not supported on Citus tables yet +-- one table is reference, the other local, not supported +SELECT create_reference_table('tbl2'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO tbl1 USING tbl2 ON (true) +WHEN MATCHED THEN DELETE; +ERROR: MERGE command is not supported on Citus tables yet +-- now, both are reference, still not supported +SELECT create_reference_table('tbl1'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO tbl1 USING tbl2 ON (true) +WHEN MATCHED THEN DELETE; +ERROR: MERGE command is not supported on Citus tables yet +-- now, both distributed, not works +SELECT undistribute_table('tbl1'); +NOTICE: creating a new table for pg15.tbl1 +NOTICE: moving the data of pg15.tbl1 +NOTICE: dropping the old pg15.tbl1 +NOTICE: renaming the new table to pg15.tbl1 + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT undistribute_table('tbl2'); +NOTICE: creating a new table for pg15.tbl2 +NOTICE: moving the data of pg15.tbl2 +NOTICE: dropping the old pg15.tbl2 +NOTICE: renaming the new table to pg15.tbl2 + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT 1 FROM citus_remove_node('localhost', :master_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT create_distributed_table('tbl1', 'x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('tbl2', 'x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO tbl1 USING tbl2 ON (true) +WHEN MATCHED THEN DELETE; +ERROR: MERGE command is not supported on Citus tables yet +-- also, not inside subqueries & ctes +WITH targq AS ( + SELECT * FROM tbl2 +) +MERGE INTO tbl1 USING targq ON (true) +WHEN MATCHED THEN DELETE; +ERROR: MERGE command is not supported on Citus tables yet +-- crashes on beta3, fixed on 15 stable +--WITH foo AS ( +-- MERGE INTO tbl1 USING tbl2 ON (true) +-- WHEN MATCHED THEN DELETE +--) SELECT * FROM foo; +--COPY ( +-- MERGE INTO tbl1 USING tbl2 ON (true) +-- WHEN MATCHED THEN DELETE +--) TO stdout; +MERGE INTO tbl1 t +USING tbl2 +ON (true) +WHEN MATCHED THEN + UPDATE SET x = (SELECT count(*) FROM tbl2); +ERROR: MERGE command is not supported on Citus tables yet -- Clean up DROP SCHEMA pg15 CASCADE; -NOTICE: drop cascades to 7 other objects +NOTICE: drop cascades to 9 other objects DETAIL: drop cascades to collation german_phonebook_test drop cascades to collation default_provider drop cascades to table sale @@ -248,3 +398,5 @@ drop cascades to table record_sale drop cascades to function record_sale() drop cascades to view sale_triggers drop cascades to table generated_stored_ref +drop cascades to table tbl1 +drop cascades to table tbl2 diff --git a/src/test/regress/sql/pg15.sql b/src/test/regress/sql/pg15.sql index 1920d78c0..cefc419e8 100644 --- a/src/test/regress/sql/pg15.sql +++ b/src/test/regress/sql/pg15.sql @@ -153,5 +153,94 @@ BEGIN; SELECT * FROM generated_stored_REF ORDER BY 1; ROLLBACK; +SELECT undistribute_table('generated_stored_ref'); + +-- +-- In PG15, there is a new command called MERGE +-- It is currently not supported for Citus tables +-- Test the behavior with various commands with Citus table types +-- Relevant PG Commit: 7103ebb7aae8ab8076b7e85f335ceb8fe799097c +-- + +CREATE TABLE tbl1 +( + x INT +); + +CREATE TABLE tbl2 +( + x INT +); + +-- on local tables works fine +MERGE INTO tbl1 USING tbl2 ON (true) +WHEN MATCHED THEN DELETE; + +-- add coordinator node as a worker +SET client_min_messages to ERROR; +SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); +RESET client_min_messages; + +-- one table is Citus local table, fails +SELECT citus_add_local_table_to_metadata('tbl1'); + +MERGE INTO tbl1 USING tbl2 ON (true) +WHEN MATCHED THEN DELETE; + +SELECT undistribute_table('tbl1'); + +-- the other table is Citus local table, fails +SELECT citus_add_local_table_to_metadata('tbl2'); + +MERGE INTO tbl1 USING tbl2 ON (true) +WHEN MATCHED THEN DELETE; + +-- one table is reference, the other local, not supported +SELECT create_reference_table('tbl2'); + +MERGE INTO tbl1 USING tbl2 ON (true) +WHEN MATCHED THEN DELETE; + +-- now, both are reference, still not supported +SELECT create_reference_table('tbl1'); + +MERGE INTO tbl1 USING tbl2 ON (true) +WHEN MATCHED THEN DELETE; + +-- now, both distributed, not works +SELECT undistribute_table('tbl1'); +SELECT undistribute_table('tbl2'); +SELECT 1 FROM citus_remove_node('localhost', :master_port); + +SELECT create_distributed_table('tbl1', 'x'); +SELECT create_distributed_table('tbl2', 'x'); + +MERGE INTO tbl1 USING tbl2 ON (true) +WHEN MATCHED THEN DELETE; + +-- also, not inside subqueries & ctes +WITH targq AS ( + SELECT * FROM tbl2 +) +MERGE INTO tbl1 USING targq ON (true) +WHEN MATCHED THEN DELETE; + +-- crashes on beta3, fixed on 15 stable +--WITH foo AS ( +-- MERGE INTO tbl1 USING tbl2 ON (true) +-- WHEN MATCHED THEN DELETE +--) SELECT * FROM foo; + +--COPY ( +-- MERGE INTO tbl1 USING tbl2 ON (true) +-- WHEN MATCHED THEN DELETE +--) TO stdout; + +MERGE INTO tbl1 t +USING tbl2 +ON (true) +WHEN MATCHED THEN + UPDATE SET x = (SELECT count(*) FROM tbl2); + -- Clean up DROP SCHEMA pg15 CASCADE;