Loosen the requirement to pushdown a subquery with ref tables (#4110)

AllTargetExpressionsAreColumnReferences would return false if a query
had an entry that is referencing the outer query. It seems safe to not
have this for non-distributed tables, such as reference tables. We
already have separate checks for other cases such as having limits.
pull/4120/head
SaitTalhaNisanci 2020-08-14 12:11:15 +03:00 committed by GitHub
parent 679bf0d2b2
commit 20c39fae9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 269 additions and 73 deletions

View File

@ -75,7 +75,6 @@ typedef bool (*CheckNodeFunc)(Node *);
static RuleApplyFunction RuleApplyFunctionArray[JOIN_RULE_LAST] = { 0 }; /* join rules */
/* Local functions forward declarations */
static bool AllTargetExpressionsAreColumnReferences(List *targetEntryList);
static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query);
static Oid NodeTryGetRteRelid(Node *node);
static bool FullCompositeFieldList(List *compositeFieldList);
@ -264,14 +263,12 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
/*
* We could still behave as if the target list is on partition column if
* all range table entries are reference tables or intermediate results,
* and all target expressions are column references to the given query level.
* range table entries don't contain a distributed table.
*/
if (!targetListOnPartitionColumn)
{
if (!FindNodeMatchingCheckFunctionInRangeTableList(query->rtable,
IsDistributedTableRTE) &&
AllTargetExpressionsAreColumnReferences(targetEntryList))
IsDistributedTableRTE))
{
targetListOnPartitionColumn = true;
}
@ -281,59 +278,6 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
}
/*
* AllTargetExpressionsAreColumnReferences returns true if none of the
* elements in the target entry list belong to an outer query (for
* example the query is a sublink and references to another query
* in the from list).
*
* The function also returns true if any of the target entries is not
* a column itself. This might be too restrictive, but, given that we're
* handling very specific type of queries, that seems acceptable for now.
*/
static bool
AllTargetExpressionsAreColumnReferences(List *targetEntryList)
{
ListCell *targetEntryCell = NULL;
foreach(targetEntryCell, targetEntryList)
{
TargetEntry *targetEntry = lfirst(targetEntryCell);
Var *candidateColumn = NULL;
Expr *strippedColumnExpression = (Expr *) strip_implicit_coercions(
(Node *) targetEntry->expr);
if (IsA(strippedColumnExpression, Var))
{
candidateColumn = (Var *) strippedColumnExpression;
}
else if (IsA(strippedColumnExpression, FieldSelect))
{
FieldSelect *compositeField = (FieldSelect *) strippedColumnExpression;
Expr *fieldExpression = compositeField->arg;
if (IsA(fieldExpression, Var))
{
candidateColumn = (Var *) fieldExpression;
}
}
/* we don't support target entries that are not columns */
if (candidateColumn == NULL)
{
return false;
}
if (candidateColumn->varlevelsup > 0)
{
return false;
}
}
return true;
}
/*
* FindNodeMatchingCheckFunctionInRangeTableList finds a node for which the checker
* function returns true.

View File

@ -976,14 +976,12 @@ FROM
DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM intermediate_result_pruning.accounts
DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (intermediate_result_pruning.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id))
DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id))
DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id))) inner_query
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be written to local file
DEBUG: Subplan XXX_3 will be written to local file
DEBUG: Subplan XXX_4 will be written to local file
sum
---------------------------------------------------------------------
100
@ -1015,13 +1013,11 @@ FROM
DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM intermediate_result_pruning.accounts
DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (intermediate_result_pruning.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id))
DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id))
DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id))) inner_query
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx
sum
---------------------------------------------------------------------
100

View File

@ -553,7 +553,6 @@ DEBUG: push down of limit count: 5
(5 rows)
SET client_min_messages TO DEFAULT;
-- not supported since GROUP BY references to an upper level query
SELECT
user_id
FROM
@ -573,10 +572,13 @@ GROUP BY user_id
HAVING count(*) > 3
ORDER BY user_id
LIMIT 5;
ERROR: cannot push down this subquery
DETAIL: Group by list without partition column is currently unsupported when a subquery references a column from another query
user_id
---------------------------------------------------------------------
2
5
(2 rows)
-- similar query with slightly more complex group by
-- though the error message is a bit confusing
SELECT
user_id
FROM
@ -596,5 +598,9 @@ GROUP BY user_id
HAVING count(*) > 3
ORDER BY user_id
LIMIT 5;
ERROR: cannot push down this subquery
DETAIL: Group by list without partition column is currently unsupported when a subquery references a column from another query
user_id
---------------------------------------------------------------------
2
5
(2 rows)

View File

@ -526,6 +526,160 @@ FROM
WHERE foo.user_id = bar.user_id
ORDER BY 1 DESC;
ERROR: recursive CTEs are not supported in distributed queries
CREATE TABLE ref_table_1 (a int);
SELECT create_reference_table('ref_table_1');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE ref_table_2 (a int);
SELECT create_reference_table('ref_table_2');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE dist (a int, b text);
SELECT create_distributed_table('dist', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO ref_table_1 SELECT * FROM generate_series(1, 10);
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO ref_table_2 SELECT * FROM generate_series(1, 10);
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO dist SELECT * FROM generate_series(1, 10);
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
SELECT count(*) FROM
(SELECT DISTINCT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a)) as foo
JOIN
dist
ON(dist.a = foo.a);
count
---------------------------------------------------------------------
9
(1 row)
SELECT count(*) FROM
(SELECT DISTINCT ref_table_1.a + 1 +ref_table_2.a + ref_table_1.a as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a)) as foo
JOIN
dist
ON(dist.a = foo.a);
count
---------------------------------------------------------------------
3
(1 row)
SELECT count(*) FROM
(SELECT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + 1) as foo
JOIN
dist
ON(dist.a = foo.a);
count
---------------------------------------------------------------------
9
(1 row)
SELECT count(*) FROM
(SELECT ref_table_1.a + ref_table_2.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + ref_table_2.a + 1) as foo
JOIN
dist
ON(dist.a = foo.a);
count
---------------------------------------------------------------------
4
(1 row)
SELECT count(*) FROM (
SELECT
a, lag(a) OVER my_win as lag_event_type, row_number() OVER my_win as row_no
FROM
ref_table_1 WINDOW my_win AS (PARTITION BY a + 1)) as foo
JOIN
dist
ON(dist.a = foo.a);
count
---------------------------------------------------------------------
10
(1 row)
SET citus.enable_cte_inlining to true;
WITH foo AS (
SELECT DISTINCT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a)
)
SELECT count(*) FROM
foo
JOIN
dist
ON(dist.a = foo.a);
DEBUG: CTE foo is going to be inlined via distributed planning
count
---------------------------------------------------------------------
9
(1 row)
WITH foo AS (
SELECT DISTINCT ref_table_1.a + 1 +ref_table_2.a + ref_table_1.a as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a)
)
SELECT count(*) FROM
foo
JOIN
dist
ON(dist.a = foo.a);
DEBUG: CTE foo is going to be inlined via distributed planning
count
---------------------------------------------------------------------
3
(1 row)
WITH foo AS (
SELECT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + 1
)
SELECT count(*) FROM
foo
JOIN
dist
ON(dist.a = foo.a);
DEBUG: CTE foo is going to be inlined via distributed planning
count
---------------------------------------------------------------------
9
(1 row)
WITH foo AS (
SELECT ref_table_1.a + ref_table_2.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + ref_table_2.a + 1
)
SELECT count(*) FROM
foo
JOIN
dist
ON(dist.a = foo.a);
DEBUG: CTE foo is going to be inlined via distributed planning
count
---------------------------------------------------------------------
4
(1 row)
WITH foo AS (
SELECT
a, lag(a) OVER my_win as lag_event_type, row_number() OVER my_win as row_no
FROM
ref_table_1 WINDOW my_win AS (PARTITION BY a + 1)
)
SELECT count(*) FROM foo JOIN dist ON(dist.a = foo.a);
DEBUG: CTE foo is going to be inlined via distributed planning
count
---------------------------------------------------------------------
10
(1 row)
SET citus.enable_cte_inlining to false;
-- We error-out when there's an error in execution of the query. By repeating it
-- multiple times, we increase the chance of this test failing before PR #1903.
SET client_min_messages TO ERROR;
@ -566,10 +720,13 @@ ERROR: (3/3) failed to execute one of the tasks
CONTEXT: PL/pgSQL function inline_code_block line 31 at RAISE
SET client_min_messages TO DEFAULT;
DROP SCHEMA subquery_and_ctes CASCADE;
NOTICE: drop cascades to 5 other objects
NOTICE: drop cascades to 8 other objects
DETAIL: drop cascades to table users_table
drop cascades to table events_table
drop cascades to table users_table_local
drop cascades to table dist_table
drop cascades to function func()
drop cascades to table ref_table_1
drop cascades to table ref_table_2
drop cascades to table dist
SET search_path TO public;

View File

@ -469,7 +469,6 @@ LIMIT 5;
SET client_min_messages TO DEFAULT;
-- not supported since GROUP BY references to an upper level query
SELECT
user_id
FROM
@ -491,7 +490,6 @@ ORDER BY user_id
LIMIT 5;
-- similar query with slightly more complex group by
-- though the error message is a bit confusing
SELECT
user_id
FROM

View File

@ -381,6 +381,101 @@ FROM
WHERE foo.user_id = bar.user_id
ORDER BY 1 DESC;
CREATE TABLE ref_table_1 (a int);
SELECT create_reference_table('ref_table_1');
CREATE TABLE ref_table_2 (a int);
SELECT create_reference_table('ref_table_2');
CREATE TABLE dist (a int, b text);
SELECT create_distributed_table('dist', 'a');
INSERT INTO ref_table_1 SELECT * FROM generate_series(1, 10);
INSERT INTO ref_table_2 SELECT * FROM generate_series(1, 10);
INSERT INTO dist SELECT * FROM generate_series(1, 10);
SELECT count(*) FROM
(SELECT DISTINCT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a)) as foo
JOIN
dist
ON(dist.a = foo.a);
SELECT count(*) FROM
(SELECT DISTINCT ref_table_1.a + 1 +ref_table_2.a + ref_table_1.a as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a)) as foo
JOIN
dist
ON(dist.a = foo.a);
SELECT count(*) FROM
(SELECT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + 1) as foo
JOIN
dist
ON(dist.a = foo.a);
SELECT count(*) FROM
(SELECT ref_table_1.a + ref_table_2.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + ref_table_2.a + 1) as foo
JOIN
dist
ON(dist.a = foo.a);
SELECT count(*) FROM (
SELECT
a, lag(a) OVER my_win as lag_event_type, row_number() OVER my_win as row_no
FROM
ref_table_1 WINDOW my_win AS (PARTITION BY a + 1)) as foo
JOIN
dist
ON(dist.a = foo.a);
SET citus.enable_cte_inlining to true;
WITH foo AS (
SELECT DISTINCT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a)
)
SELECT count(*) FROM
foo
JOIN
dist
ON(dist.a = foo.a);
WITH foo AS (
SELECT DISTINCT ref_table_1.a + 1 +ref_table_2.a + ref_table_1.a as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a)
)
SELECT count(*) FROM
foo
JOIN
dist
ON(dist.a = foo.a);
WITH foo AS (
SELECT ref_table_1.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + 1
)
SELECT count(*) FROM
foo
JOIN
dist
ON(dist.a = foo.a);
WITH foo AS (
SELECT ref_table_1.a + ref_table_2.a + 1 as a FROM ref_table_1 JOIN ref_table_2 ON (ref_table_1.a = ref_table_2.a) GROUP BY ref_table_1.a + ref_table_2.a + 1
)
SELECT count(*) FROM
foo
JOIN
dist
ON(dist.a = foo.a);
WITH foo AS (
SELECT
a, lag(a) OVER my_win as lag_event_type, row_number() OVER my_win as row_no
FROM
ref_table_1 WINDOW my_win AS (PARTITION BY a + 1)
)
SELECT count(*) FROM foo JOIN dist ON(dist.a = foo.a);
SET citus.enable_cte_inlining to false;
-- We error-out when there's an error in execution of the query. By repeating it
-- multiple times, we increase the chance of this test failing before PR #1903.
SET client_min_messages TO ERROR;