From 20c39fae9a9b7eb9207f96c025473e5f03e326b9 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Fri, 14 Aug 2020 12:11:15 +0300 Subject: [PATCH] Loosen the requirement to pushdown a subquery with ref tables (#4110) AllTargetExpressionsAreColumnReferences would return false if a query had an entry that is referencing the outer query. It seems safe to not have this for non-distributed tables, such as reference tables. We already have separate checks for other cases such as having limits. --- .../planner/multi_logical_planner.c | 60 +------ .../expected/intermediate_result_pruning.out | 8 +- ...lti_subquery_in_where_reference_clause.out | 18 +- .../regress/expected/subquery_and_cte.out | 159 +++++++++++++++++- ...lti_subquery_in_where_reference_clause.sql | 2 - src/test/regress/sql/subquery_and_cte.sql | 95 +++++++++++ 6 files changed, 269 insertions(+), 73 deletions(-) diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index fa2668e88..fc6b7d4d6 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -75,7 +75,6 @@ typedef bool (*CheckNodeFunc)(Node *); static RuleApplyFunction RuleApplyFunctionArray[JOIN_RULE_LAST] = { 0 }; /* join rules */ /* Local functions forward declarations */ -static bool AllTargetExpressionsAreColumnReferences(List *targetEntryList); static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query); static Oid NodeTryGetRteRelid(Node *node); static bool FullCompositeFieldList(List *compositeFieldList); @@ -264,14 +263,12 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) /* * We could still behave as if the target list is on partition column if - * all range table entries are reference tables or intermediate results, - * and all target expressions are column references to the given query level. + * range table entries don't contain a distributed table. */ if (!targetListOnPartitionColumn) { if (!FindNodeMatchingCheckFunctionInRangeTableList(query->rtable, - IsDistributedTableRTE) && - AllTargetExpressionsAreColumnReferences(targetEntryList)) + IsDistributedTableRTE)) { targetListOnPartitionColumn = true; } @@ -281,59 +278,6 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) } -/* - * AllTargetExpressionsAreColumnReferences returns true if none of the - * elements in the target entry list belong to an outer query (for - * example the query is a sublink and references to another query - * in the from list). - * - * The function also returns true if any of the target entries is not - * a column itself. This might be too restrictive, but, given that we're - * handling very specific type of queries, that seems acceptable for now. - */ -static bool -AllTargetExpressionsAreColumnReferences(List *targetEntryList) -{ - ListCell *targetEntryCell = NULL; - - foreach(targetEntryCell, targetEntryList) - { - TargetEntry *targetEntry = lfirst(targetEntryCell); - Var *candidateColumn = NULL; - Expr *strippedColumnExpression = (Expr *) strip_implicit_coercions( - (Node *) targetEntry->expr); - - if (IsA(strippedColumnExpression, Var)) - { - candidateColumn = (Var *) strippedColumnExpression; - } - else if (IsA(strippedColumnExpression, FieldSelect)) - { - FieldSelect *compositeField = (FieldSelect *) strippedColumnExpression; - Expr *fieldExpression = compositeField->arg; - - if (IsA(fieldExpression, Var)) - { - candidateColumn = (Var *) fieldExpression; - } - } - - /* we don't support target entries that are not columns */ - if (candidateColumn == NULL) - { - return false; - } - - if (candidateColumn->varlevelsup > 0) - { - return false; - } - } - - return true; -} - - /* * FindNodeMatchingCheckFunctionInRangeTableList finds a node for which the checker * function returns true. diff --git a/src/test/regress/expected/intermediate_result_pruning.out b/src/test/regress/expected/intermediate_result_pruning.out index dbf6acc02..89641bc48 100644 --- a/src/test/regress/expected/intermediate_result_pruning.out +++ b/src/test/regress/expected/intermediate_result_pruning.out @@ -976,14 +976,12 @@ FROM DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM intermediate_result_pruning.accounts DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (intermediate_result_pruning.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) -DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id)) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id))) inner_query DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be written to local file -DEBUG: Subplan XXX_4 will be written to local file sum --------------------------------------------------------------------- 100 @@ -1015,13 +1013,11 @@ FROM DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM intermediate_result_pruning.accounts DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (intermediate_result_pruning.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id)) -DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id)) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id))) inner_query DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx sum --------------------------------------------------------------------- 100 diff --git a/src/test/regress/expected/multi_subquery_in_where_reference_clause.out b/src/test/regress/expected/multi_subquery_in_where_reference_clause.out index 7a5f8dc2c..ea0882400 100644 --- a/src/test/regress/expected/multi_subquery_in_where_reference_clause.out +++ b/src/test/regress/expected/multi_subquery_in_where_reference_clause.out @@ -553,7 +553,6 @@ DEBUG: push down of limit count: 5 (5 rows) SET client_min_messages TO DEFAULT; --- not supported since GROUP BY references to an upper level query SELECT user_id FROM @@ -573,10 +572,13 @@ GROUP BY user_id HAVING count(*) > 3 ORDER BY user_id LIMIT 5; -ERROR: cannot push down this subquery -DETAIL: Group by list without partition column is currently unsupported when a subquery references a column from another query + user_id +--------------------------------------------------------------------- + 2 + 5 +(2 rows) + -- similar query with slightly more complex group by --- though the error message is a bit confusing SELECT user_id FROM @@ -596,5 +598,9 @@ GROUP BY user_id HAVING count(*) > 3 ORDER BY user_id LIMIT 5; -ERROR: cannot push down this subquery -DETAIL: Group by list without partition column is currently unsupported when a subquery references a column from another query + user_id +--------------------------------------------------------------------- + 2 + 5 +(2 rows) + diff --git a/src/test/regress/expected/subquery_and_cte.out b/src/test/regress/expected/subquery_and_cte.out index 878486d33..f2352bb0b 100644 --- a/src/test/regress/expected/subquery_and_cte.out +++ b/src/test/regress/expected/subquery_and_cte.out @@ -526,6 +526,160 @@ FROM WHERE foo.user_id = bar.user_id ORDER BY 1 DESC; ERROR: recursive CTEs are not supported in distributed queries +CREATE TABLE ref_table_1 (a int); +SELECT create_reference_table('ref_table_1'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE ref_table_2 (a int); +SELECT create_reference_table('ref_table_2'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE dist (a int, b text); +SELECT create_distributed_table('dist', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO ref_table_1 SELECT * FROM generate_series(1, 10); +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO ref_table_2 SELECT * FROM generate_series(1, 10); +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO dist SELECT * FROM generate_series(1, 10); +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +SELECT count(*) FROM + (SELECT DISTINCT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a)) as foo + JOIN + dist + ON(dist.a = foo.a); + count +--------------------------------------------------------------------- + 9 +(1 row) + +SELECT count(*) FROM + (SELECT DISTINCT ref_table_1.a + 1 +ref_table_2.a + ref_table_1.a as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a)) as foo + JOIN + dist + ON(dist.a = foo.a); + count +--------------------------------------------------------------------- + 3 +(1 row) + +SELECT count(*) FROM + (SELECT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + 1) as foo + JOIN + dist + ON(dist.a = foo.a); + count +--------------------------------------------------------------------- + 9 +(1 row) + +SELECT count(*) FROM + (SELECT ref_table_1.a + ref_table_2.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + ref_table_2.a + 1) as foo + JOIN + dist + ON(dist.a = foo.a); + count +--------------------------------------------------------------------- + 4 +(1 row) + +SELECT count(*) FROM ( + SELECT + a, lag(a) OVER my_win as lag_event_type, row_number() OVER my_win as row_no + FROM + ref_table_1 WINDOW my_win AS (PARTITION BY a + 1)) as foo + JOIN + dist + ON(dist.a = foo.a); + count +--------------------------------------------------------------------- + 10 +(1 row) + +SET citus.enable_cte_inlining to true; +WITH foo AS ( + SELECT DISTINCT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) +) +SELECT count(*) FROM + foo + JOIN + dist + ON(dist.a = foo.a); +DEBUG: CTE foo is going to be inlined via distributed planning + count +--------------------------------------------------------------------- + 9 +(1 row) + +WITH foo AS ( + SELECT DISTINCT ref_table_1.a + 1 +ref_table_2.a + ref_table_1.a as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) +) +SELECT count(*) FROM + foo + JOIN + dist + ON(dist.a = foo.a); +DEBUG: CTE foo is going to be inlined via distributed planning + count +--------------------------------------------------------------------- + 3 +(1 row) + +WITH foo AS ( + SELECT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + 1 +) +SELECT count(*) FROM + foo + JOIN + dist + ON(dist.a = foo.a); +DEBUG: CTE foo is going to be inlined via distributed planning + count +--------------------------------------------------------------------- + 9 +(1 row) + +WITH foo AS ( + SELECT ref_table_1.a + ref_table_2.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + ref_table_2.a + 1 +) +SELECT count(*) FROM + foo + JOIN + dist + ON(dist.a = foo.a); +DEBUG: CTE foo is going to be inlined via distributed planning + count +--------------------------------------------------------------------- + 4 +(1 row) + +WITH foo AS ( + SELECT + a, lag(a) OVER my_win as lag_event_type, row_number() OVER my_win as row_no + FROM + ref_table_1 WINDOW my_win AS (PARTITION BY a + 1) +) +SELECT count(*) FROM foo JOIN dist ON(dist.a = foo.a); +DEBUG: CTE foo is going to be inlined via distributed planning + count +--------------------------------------------------------------------- + 10 +(1 row) + +SET citus.enable_cte_inlining to false; -- We error-out when there's an error in execution of the query. By repeating it -- multiple times, we increase the chance of this test failing before PR #1903. SET client_min_messages TO ERROR; @@ -566,10 +720,13 @@ ERROR: (3/3) failed to execute one of the tasks CONTEXT: PL/pgSQL function inline_code_block line 31 at RAISE SET client_min_messages TO DEFAULT; DROP SCHEMA subquery_and_ctes CASCADE; -NOTICE: drop cascades to 5 other objects +NOTICE: drop cascades to 8 other objects DETAIL: drop cascades to table users_table drop cascades to table events_table drop cascades to table users_table_local drop cascades to table dist_table drop cascades to function func() +drop cascades to table ref_table_1 +drop cascades to table ref_table_2 +drop cascades to table dist SET search_path TO public; diff --git a/src/test/regress/sql/multi_subquery_in_where_reference_clause.sql b/src/test/regress/sql/multi_subquery_in_where_reference_clause.sql index 414542e85..a3dd9c06e 100644 --- a/src/test/regress/sql/multi_subquery_in_where_reference_clause.sql +++ b/src/test/regress/sql/multi_subquery_in_where_reference_clause.sql @@ -469,7 +469,6 @@ LIMIT 5; SET client_min_messages TO DEFAULT; --- not supported since GROUP BY references to an upper level query SELECT user_id FROM @@ -491,7 +490,6 @@ ORDER BY user_id LIMIT 5; -- similar query with slightly more complex group by --- though the error message is a bit confusing SELECT user_id FROM diff --git a/src/test/regress/sql/subquery_and_cte.sql b/src/test/regress/sql/subquery_and_cte.sql index 1752a2bb6..cf908e15f 100644 --- a/src/test/regress/sql/subquery_and_cte.sql +++ b/src/test/regress/sql/subquery_and_cte.sql @@ -381,6 +381,101 @@ FROM WHERE foo.user_id = bar.user_id ORDER BY 1 DESC; +CREATE TABLE ref_table_1 (a int); +SELECT create_reference_table('ref_table_1'); + +CREATE TABLE ref_table_2 (a int); +SELECT create_reference_table('ref_table_2'); + +CREATE TABLE dist (a int, b text); +SELECT create_distributed_table('dist', 'a'); + +INSERT INTO ref_table_1 SELECT * FROM generate_series(1, 10); +INSERT INTO ref_table_2 SELECT * FROM generate_series(1, 10); +INSERT INTO dist SELECT * FROM generate_series(1, 10); + +SELECT count(*) FROM + (SELECT DISTINCT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a)) as foo + JOIN + dist + ON(dist.a = foo.a); + +SELECT count(*) FROM + (SELECT DISTINCT ref_table_1.a + 1 +ref_table_2.a + ref_table_1.a as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a)) as foo + JOIN + dist + ON(dist.a = foo.a); + +SELECT count(*) FROM + (SELECT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + 1) as foo + JOIN + dist + ON(dist.a = foo.a); + +SELECT count(*) FROM + (SELECT ref_table_1.a + ref_table_2.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + ref_table_2.a + 1) as foo + JOIN + dist + ON(dist.a = foo.a); + +SELECT count(*) FROM ( + SELECT + a, lag(a) OVER my_win as lag_event_type, row_number() OVER my_win as row_no + FROM + ref_table_1 WINDOW my_win AS (PARTITION BY a + 1)) as foo + + JOIN + dist + ON(dist.a = foo.a); + +SET citus.enable_cte_inlining to true; + +WITH foo AS ( + SELECT DISTINCT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) +) +SELECT count(*) FROM + foo + JOIN + dist + ON(dist.a = foo.a); + +WITH foo AS ( + SELECT DISTINCT ref_table_1.a + 1 +ref_table_2.a + ref_table_1.a as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) +) +SELECT count(*) FROM + foo + JOIN + dist + ON(dist.a = foo.a); + +WITH foo AS ( + SELECT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + 1 +) +SELECT count(*) FROM + foo + JOIN + dist + ON(dist.a = foo.a); + +WITH foo AS ( + SELECT ref_table_1.a + ref_table_2.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + ref_table_2.a + 1 +) +SELECT count(*) FROM + foo + JOIN + dist + ON(dist.a = foo.a); + +WITH foo AS ( + SELECT + a, lag(a) OVER my_win as lag_event_type, row_number() OVER my_win as row_no + FROM + ref_table_1 WINDOW my_win AS (PARTITION BY a + 1) +) +SELECT count(*) FROM foo JOIN dist ON(dist.a = foo.a); + +SET citus.enable_cte_inlining to false; + -- We error-out when there's an error in execution of the query. By repeating it -- multiple times, we increase the chance of this test failing before PR #1903. SET client_min_messages TO ERROR;