From 48f96bf3e54ade9827f0a1173668dd3abc284c13 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 23 Nov 2017 16:07:03 +0200 Subject: [PATCH] Enable non equi joins in subquery pushdown Subquery pushdown planning is based on relation restriction equivalnce. This brings us the opportuneatly to allow any other joins as long as there is an already equi join between the distributed tables. We already allow that for joins with reference tables and this commit allows that for joins among distributed tables. --- .../planner/multi_logical_planner.c | 61 -------- .../multi_subquery_behavioral_analytics.out | 145 ++++++++++++++++++ .../multi_subquery_in_where_clause.out | 53 ++++++- .../multi_subquery_behavioral_analytics.sql | 113 ++++++++++++++ .../sql/multi_subquery_in_where_clause.sql | 44 ++++++ 5 files changed, 354 insertions(+), 62 deletions(-) diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 36dbe4f8f..c6d39bae0 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -122,7 +122,6 @@ static bool RelationInfoContainsRecurringTuples(PlannerInfo *plannerInfo, RecurringTuplesType *recurType); static bool HasRecurringTuples(Node *node, RecurringTuplesType *recurType); static void ValidateClauseList(List *clauseList); -static void ValidateSubqueryPushdownClauseList(List *clauseList); static bool ExtractFromExpressionWalker(Node *node, QualifierWalkerContext *walkerContext); static List * MultiTableNodeList(List *tableEntryList, List *rangeTableList); @@ -132,7 +131,6 @@ static MultiNode * MultiJoinTree(List *joinOrderList, List *collectTableList, static MultiCollect * CollectNodeForTable(List *collectTableList, uint32 rangeTableId); static MultiSelect * MultiSelectNode(List *whereClauseList); static bool IsSelectClause(Node *clause); -static bool IsSublinkClause(Node *clause); static MultiProject * MultiProjectNode(List *targetEntryList); static MultiExtendedOp * MultiExtendedOpNode(Query *queryTree); @@ -2535,12 +2533,6 @@ ValidateClauseList(List *clauseList) { Node *clause = (Node *) lfirst(clauseCell); - /* - * There could never be sublinks here given that it is handled - * in subquery pushdown code-path. - */ - Assert(!IsSublinkClause(clause)); - if (!(IsSelectClause(clause) || IsJoinClause(clause) || or_clause(clause))) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -2550,33 +2542,6 @@ ValidateClauseList(List *clauseList) } -/* - * ValidateSubqueryPushdownClauseList walks over the given list of clauses, - * and checks that we can recognize all the clauses. This function ensures - * that we do not drop an unsupported clause type on the floor, and thus - * prevents erroneous results. - * - * Note that this function is slightly different than ValidateClauseList(), - * additionally allowing sublinks. - */ -static void -ValidateSubqueryPushdownClauseList(List *clauseList) -{ - ListCell *clauseCell = NULL; - foreach(clauseCell, clauseList) - { - Node *clause = (Node *) lfirst(clauseCell); - - if (!(IsSublinkClause(clause) || IsSelectClause(clause) || - IsJoinClause(clause) || or_clause(clause))) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("unsupported clause type"))); - } - } -} - - /* * JoinClauseList finds the join clauses from the given where clause expression * list, and returns them. The function does not iterate into nested OR clauses @@ -3056,23 +3021,6 @@ IsSelectClause(Node *clause) } -/* - * IsSublinkClause determines if the given node is a sublink or subplan. - */ -static bool -IsSublinkClause(Node *clause) -{ - NodeTag nodeTag = nodeTag(clause); - - if (nodeTag == T_SubLink || nodeTag == T_SubPlan) - { - return true; - } - - return false; -} - - /* * MultiProjectNode builds the project node using the target entry information * from the query tree. The project node only encapsulates projected columns, @@ -3722,7 +3670,6 @@ static MultiNode * SubqueryPushdownMultiNodeTree(Query *queryTree) { List *targetEntryList = queryTree->targetList; - List *qualifierList = NIL; List *columnList = NIL; List *targetColumnList = NIL; MultiCollect *subqueryCollectNode = CitusMakeNode(MultiCollect); @@ -3737,14 +3684,6 @@ SubqueryPushdownMultiNodeTree(Query *queryTree) /* verify we can perform distributed planning on this query */ ErrorIfQueryNotSupported(queryTree); - /* - * Extract qualifiers and verify we can plan for them. Note that since - * subquery pushdown join planning is based on restriction equivalence, - * checking for these qualifiers may not be necessary. - */ - qualifierList = QualifierList(queryTree->jointree); - ValidateSubqueryPushdownClauseList(qualifierList); - /* * We would be creating a new Query and pushing down top level query's * contents down to it. Join and filter clauses in higher level query would diff --git a/src/test/regress/expected/multi_subquery_behavioral_analytics.out b/src/test/regress/expected/multi_subquery_behavioral_analytics.out index f47f4b28b..7db8b758c 100644 --- a/src/test/regress/expected/multi_subquery_behavioral_analytics.out +++ b/src/test/regress/expected/multi_subquery_behavioral_analytics.out @@ -2040,6 +2040,21 @@ CREATE FUNCTION test_join_function_2(integer, integer) RETURNS bool LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT; +SELECT run_command_on_workers($f$ + +CREATE FUNCTION test_join_function_2(integer, integer) RETURNS bool + AS 'select $1 > $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; + +$f$); + run_command_on_workers +--------------------------------------- + (localhost,57637,t,"CREATE FUNCTION") + (localhost,57638,t,"CREATE FUNCTION") +(2 rows) + -- we don't support joins via functions SELECT user_id, array_length(events_table, 1) FROM ( @@ -2074,6 +2089,125 @@ FROM WHERE users_table.value_1 < 50 AND test_join_function_2(users_table.user_id, temp.user_id); ERROR: unsupported clause type +-- we do support the following since there is already an equality on the partition +-- key and we have an additional join via a function +SELECT + temp.user_id, users_table.value_1, prob +FROM + users_table + JOIN + (SELECT + ma.user_id, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob, random() + FROM + users_table AS ma, events_table as short_list + WHERE + short_list.user_id = ma.user_id and ma.value_1 < 50 and short_list.event_type < 100 AND + test_join_function_2(ma.value_1, short_list.value_2) + ) temp + ON users_table.user_id = temp.user_id + WHERE + users_table.value_1 < 50 + ORDER BY 2 DESC, 1 DESC + LIMIT 10; + user_id | value_1 | prob +---------+---------+------------------------ + 37 | 49 | 0.50000000000000000000 + 37 | 49 | 0.50000000000000000000 + 37 | 49 | 0.50000000000000000000 + 32 | 49 | 0.50000000000000000000 + 32 | 49 | 0.50000000000000000000 + 98 | 48 | 0.50000000000000000000 + 98 | 48 | 0.50000000000000000000 + 98 | 48 | 0.50000000000000000000 + 96 | 48 | 0.50000000000000000000 + 96 | 48 | 0.50000000000000000000 +(10 rows) + +-- similarly we do support non equi joins on columns if there is aready +-- an equality join +SELECT + count(*) +FROM + (SELECT + event_type, random() + FROM + events_table, users_table + WHERE + events_table.user_id = users_table.user_id AND + events_table.time > users_table.time AND + events_table.value_2 IN (10, 100) + ) as foo; + count +------- + 1015 +(1 row) + +-- the other way around is not supported +SELECT + count(*) +FROM + (SELECT + event_type, random() + FROM + events_table, users_table + WHERE + events_table.user_id > users_table.user_id AND + events_table.time = users_table.time AND + events_table.value_2 IN (10, 100) + ) as foo; +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. +-- we can even allow that on top level joins +SELECT + count(*) +FROM + (SELECT + event_type, random(), events_table.user_id + FROM + events_table, users_table + WHERE + events_table.user_id = users_table.user_id AND + events_table.value_2 IN (10, 100) + ) as foo, +(SELECT + event_type, random(), events_table.user_id + FROM + events_table, users_table + WHERE + events_table.user_id = users_table.user_id AND + events_table.value_2 IN (20, 200) + ) as bar +WHERE foo.event_type > bar.event_type +AND foo.user_id = bar.user_id; + count +------- + 14641 +(1 row) + +-- note that the following is not supported +-- since the top level join is not on the distribution key +SELECT + count(*) +FROM + (SELECT + event_type, random() + FROM + events_table, users_table + WHERE + events_table.user_id = users_table.user_id AND + events_table.value_2 IN (10, 100) + ) as foo, +(SELECT + event_type, random() + FROM + events_table, users_table + WHERE + events_table.user_id = users_table.user_id AND + events_table.value_2 IN (20, 200) + ) as bar +WHERE foo.event_type = bar.event_type; +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. -- DISTINCT in the outer query and DISTINCT in the subquery SELECT DISTINCT users_ids.user_id @@ -2152,5 +2286,16 @@ FROM (5 rows) DROP FUNCTION test_join_function_2(integer, integer); +SELECT run_command_on_workers($f$ + + DROP FUNCTION test_join_function_2(integer, integer); + +$f$); + run_command_on_workers +------------------------------------- + (localhost,57637,t,"DROP FUNCTION") + (localhost,57638,t,"DROP FUNCTION") +(2 rows) + SET citus.enable_router_execution TO TRUE; SET citus.subquery_pushdown to OFF; diff --git a/src/test/regress/expected/multi_subquery_in_where_clause.out b/src/test/regress/expected/multi_subquery_in_where_clause.out index a42ce49f4..d3a222d22 100644 --- a/src/test/regress/expected/multi_subquery_in_where_clause.out +++ b/src/test/regress/expected/multi_subquery_in_where_clause.out @@ -23,7 +23,7 @@ WHERE GROUP BY user_id HAVING count(*) > 66 ORDER BY user_id -LIMIT 5; +LIMIT 5; user_id --------- 49 @@ -32,6 +32,57 @@ LIMIT 5; 63 (4 rows) +-- same query with one additional join on non distribution column +SELECT + user_id +FROM + users_table +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_table + WHERE + users_table.user_id = events_table.user_id AND event_type = 50 AND + users_table.time > events_table.time + GROUP BY + user_id + ) +GROUP BY user_id +HAVING count(*) > 66 +ORDER BY user_id +LIMIT 5; + user_id +--------- + 55 + 56 + 63 +(3 rows) + +-- the other way around is not supported +SELECT + user_id +FROM + users_table +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_table + WHERE + users_table.user_id > events_table.user_id AND event_type = 50 AND + users_table.time = events_table.time + GROUP BY + user_id + ) +GROUP BY user_id +HAVING count(*) > 66 +ORDER BY user_id +LIMIT 5; +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. -- subqueries in where with ALL operator SELECT user_id diff --git a/src/test/regress/sql/multi_subquery_behavioral_analytics.sql b/src/test/regress/sql/multi_subquery_behavioral_analytics.sql index 2d8ed41b1..972867157 100644 --- a/src/test/regress/sql/multi_subquery_behavioral_analytics.sql +++ b/src/test/regress/sql/multi_subquery_behavioral_analytics.sql @@ -1626,6 +1626,16 @@ CREATE FUNCTION test_join_function_2(integer, integer) RETURNS bool IMMUTABLE RETURNS NULL ON NULL INPUT; +SELECT run_command_on_workers($f$ + +CREATE FUNCTION test_join_function_2(integer, integer) RETURNS bool + AS 'select $1 > $2;' + LANGUAGE SQL + IMMUTABLE + RETURNS NULL ON NULL INPUT; + +$f$); + -- we don't support joins via functions SELECT user_id, array_length(events_table, 1) FROM ( @@ -1659,6 +1669,102 @@ FROM WHERE users_table.value_1 < 50 AND test_join_function_2(users_table.user_id, temp.user_id); +-- we do support the following since there is already an equality on the partition +-- key and we have an additional join via a function +SELECT + temp.user_id, users_table.value_1, prob +FROM + users_table + JOIN + (SELECT + ma.user_id, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob, random() + FROM + users_table AS ma, events_table as short_list + WHERE + short_list.user_id = ma.user_id and ma.value_1 < 50 and short_list.event_type < 100 AND + test_join_function_2(ma.value_1, short_list.value_2) + ) temp + ON users_table.user_id = temp.user_id + WHERE + users_table.value_1 < 50 + ORDER BY 2 DESC, 1 DESC + LIMIT 10; + +-- similarly we do support non equi joins on columns if there is aready +-- an equality join +SELECT + count(*) +FROM + (SELECT + event_type, random() + FROM + events_table, users_table + WHERE + events_table.user_id = users_table.user_id AND + events_table.time > users_table.time AND + events_table.value_2 IN (10, 100) + ) as foo; + +-- the other way around is not supported +SELECT + count(*) +FROM + (SELECT + event_type, random() + FROM + events_table, users_table + WHERE + events_table.user_id > users_table.user_id AND + events_table.time = users_table.time AND + events_table.value_2 IN (10, 100) + ) as foo; + +-- we can even allow that on top level joins +SELECT + count(*) +FROM + (SELECT + event_type, random(), events_table.user_id + FROM + events_table, users_table + WHERE + events_table.user_id = users_table.user_id AND + events_table.value_2 IN (10, 100) + ) as foo, +(SELECT + event_type, random(), events_table.user_id + FROM + events_table, users_table + WHERE + events_table.user_id = users_table.user_id AND + events_table.value_2 IN (20, 200) + ) as bar +WHERE foo.event_type > bar.event_type +AND foo.user_id = bar.user_id; + +-- note that the following is not supported +-- since the top level join is not on the distribution key +SELECT + count(*) +FROM + (SELECT + event_type, random() + FROM + events_table, users_table + WHERE + events_table.user_id = users_table.user_id AND + events_table.value_2 IN (10, 100) + ) as foo, +(SELECT + event_type, random() + FROM + events_table, users_table + WHERE + events_table.user_id = users_table.user_id AND + events_table.value_2 IN (20, 200) + ) as bar +WHERE foo.event_type = bar.event_type; + -- DISTINCT in the outer query and DISTINCT in the subquery SELECT DISTINCT users_ids.user_id @@ -1712,7 +1818,14 @@ FROM ORDER BY 1,2 LIMIT 5; + DROP FUNCTION test_join_function_2(integer, integer); +SELECT run_command_on_workers($f$ + + DROP FUNCTION test_join_function_2(integer, integer); + +$f$); + SET citus.enable_router_execution TO TRUE; SET citus.subquery_pushdown to OFF; diff --git a/src/test/regress/sql/multi_subquery_in_where_clause.sql b/src/test/regress/sql/multi_subquery_in_where_clause.sql index 472e1bc3c..708c1ad87 100644 --- a/src/test/regress/sql/multi_subquery_in_where_clause.sql +++ b/src/test/regress/sql/multi_subquery_in_where_clause.sql @@ -24,6 +24,50 @@ WHERE GROUP BY user_id HAVING count(*) > 66 ORDER BY user_id +LIMIT 5; + +-- same query with one additional join on non distribution column +SELECT + user_id +FROM + users_table +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_table + WHERE + users_table.user_id = events_table.user_id AND event_type = 50 AND + users_table.time > events_table.time + GROUP BY + user_id + ) +GROUP BY user_id +HAVING count(*) > 66 +ORDER BY user_id +LIMIT 5; + +-- the other way around is not supported +SELECT + user_id +FROM + users_table +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_table + WHERE + users_table.user_id > events_table.user_id AND event_type = 50 AND + users_table.time = events_table.time + GROUP BY + user_id + ) +GROUP BY user_id +HAVING count(*) > 66 +ORDER BY user_id LIMIT 5; -- subqueries in where with ALL operator