From 4d70c86645bfb25818bae81b85fd44b26b6bfc66 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 13 Feb 2018 16:22:35 +0200 Subject: [PATCH] Leaf level recursive planning for non colocated subqueries With this commit, we enable recursive planning for the subqueries that are not joined on the distribution keys. --- .../distributed/planner/recursive_planning.c | 75 ++++++++++++++++++- .../relation_restriction_equivalence.c | 58 +++++++++----- src/test/regress/expected/multi_subquery.out | 58 +++++++++++--- .../multi_subquery_complex_queries.out | 67 ++++++++++++++--- src/test/regress/expected/multi_view.out | 2 +- src/test/regress/expected/set_operations.out | 6 +- src/test/regress/sql/multi_subquery.sql | 23 +++++- .../sql/multi_subquery_complex_queries.sql | 39 ++++++++-- 8 files changed, 279 insertions(+), 49 deletions(-) diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 5b55afce2..f1afe8e54 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -86,6 +86,7 @@ typedef struct RecursivePlanningContext { int level; uint64 planId; + bool queryContainsDistributionKeyEquality; /* used for some optimizations */ List *subPlanList; PlannerRestrictionContext *plannerRestrictionContext; } RecursivePlanningContext; @@ -121,7 +122,11 @@ static bool RecursivelyPlanAllSubqueries(Node *node, static DeferredErrorMessage * RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *context); static bool RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context); -static bool ShouldRecursivelyPlanSubquery(Query *subquery); +static bool ShouldRecursivelyPlanSubquery(Query *subquery, + RecursivePlanningContext *context); +static bool SubqueryContainsDistributionKeyEquality(Query *subquery, + PlannerRestrictionContext * + restrictionContext); static bool ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *context); static void RecursivelyPlanSetOperations(Query *query, Node *node, @@ -162,6 +167,21 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, context.subPlanList = NIL; context.plannerRestrictionContext = plannerRestrictionContext; + /* + * Calculating the distribution key equality upfront is a trade-off for us. + * + * When the originalQuery contains the distribution key equality, we'd be + * able to skip further checks for each lower level subqueries (i.e., if the + * all query contains distribution key equality, each subquery also contains + * distribution key equality.) + * + * When the originalQuery doesn't contain the distribution key equality, + * calculating this wouldn't help us at all, we should individually check + * each each subquery and subquery joins among subqueries. + */ + context.queryContainsDistributionKeyEquality = + QueryContainsDistributionKeyEquality(plannerRestrictionContext, originalQuery); + error = RecursivelyPlanSubqueriesAndCTEs(originalQuery, &context); if (error != NULL) { @@ -497,7 +517,7 @@ RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context) * Recursively plan this subquery if it cannot be pushed down and is * eligible for recursive planning. */ - if (ShouldRecursivelyPlanSubquery(query)) + if (ShouldRecursivelyPlanSubquery(query, context)) { RecursivelyPlanSubquery(query, context); } @@ -517,7 +537,7 @@ RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context) * For the details, see the cases in the function. */ static bool -ShouldRecursivelyPlanSubquery(Query *subquery) +ShouldRecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *context) { if (FindNodeCheckInRangeTableList(subquery->rtable, IsLocalTableRTE)) { @@ -533,6 +553,22 @@ ShouldRecursivelyPlanSubquery(Query *subquery) } else if (DeferErrorIfCannotPushdownSubquery(subquery, false) == NULL) { + /* + * We should do one more check for the distribution key equality. + * + * If the input query to the planner doesn't contain distribution key equality, + * we should further check whether this individual subquery contains or not. + * + * If all the relations are not joined on their distribution keys for the + * subquery, we cannot pushdown the it, thus, recursively plan it. + */ + if (!context->queryContainsDistributionKeyEquality && + !SubqueryContainsDistributionKeyEquality(subquery, + context->plannerRestrictionContext)) + { + return true; + } + /* * Citus can pushdown this subquery, no need to recursively * plan which is much expensive than pushdown. @@ -553,6 +589,39 @@ ShouldRecursivelyPlanSubquery(Query *subquery) } +/* + * SubqueryContainsDistributionKeyEquality is a wrapper function + * for QueryContainsDistributionKeyEquality(). Here, we filter the + * planner restrictions for the given subquery and do the restriction + * equality checks on the filtered restriction. + */ +static bool +SubqueryContainsDistributionKeyEquality(Query *subquery, + PlannerRestrictionContext *restrictionContext) +{ + bool queryContainsDistributionKeyEquality = false; + PlannerRestrictionContext *filteredRestrictionContext = NULL; + + /* we don't support distribution eq. checks for CTEs yet */ + if (subquery->cteList != NIL) + { + return false; + } + + filteredRestrictionContext = + FilterPlannerRestrictionForQuery(restrictionContext, subquery); + + queryContainsDistributionKeyEquality = + QueryContainsDistributionKeyEquality(filteredRestrictionContext, subquery); + if (!queryContainsDistributionKeyEquality) + { + return false; + } + + return true; +} + + /* * ShouldRecursivelyPlanSetOperation determines whether the leaf queries of a * set operations tree need to be recursively planned in order to support the diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index b558844ca..717f825c9 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -63,6 +63,7 @@ typedef struct AttributeEquivalenceClassMember } AttributeEquivalenceClassMember; +static bool ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext); static Var * FindTranslatedVar(List *appendRelList, Oid relationOid, Index relationRteIndex, Index *partitionKeyIndex); static bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList, @@ -154,9 +155,24 @@ bool QueryContainsDistributionKeyEquality(PlannerRestrictionContext *plannerRestrictionContext, Query *originalQuery) { - bool restrictionEquivalenceForPartitionKeys = - RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext); + bool restrictionEquivalenceForPartitionKeys = false; + RelationRestrictionContext *restrictionContext = NULL; + /* we don't support distribution key equality checks for CTEs yet */ + if (originalQuery->cteList != NIL) + { + return false; + } + + /* we don't support distribution key equality checks for local tables */ + restrictionContext = plannerRestrictionContext->relationRestrictionContext; + if (ContextContainsLocalRelation(restrictionContext)) + { + return false; + } + + restrictionEquivalenceForPartitionKeys = + RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext); if (restrictionEquivalenceForPartitionKeys) { return true; @@ -171,6 +187,29 @@ QueryContainsDistributionKeyEquality(PlannerRestrictionContext *plannerRestricti } +/* + * ContextContainsLocalRelation determines whether the given + * RelationRestrictionContext contains any local tables. + */ +static bool +ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext) +{ + ListCell *relationRestrictionCell = NULL; + + foreach(relationRestrictionCell, restrictionContext->relationRestrictionList) + { + RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); + + if (!relationRestriction->distributedRelation) + { + return true; + } + } + + return false; +} + + /* * SafeToPushdownUnionSubquery returns true if all the relations are returns * partition keys in the same ordinal position and there is no reference table @@ -215,7 +254,6 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext foreach(relationRestrictionCell, restrictionContext->relationRestrictionList) { RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); - Oid relationId = relationRestriction->relationId; Index partitionKeyIndex = InvalidAttrNumber; PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo; List *targetList = relationPlannerRoot->parse->targetList; @@ -223,20 +261,6 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext Var *varToBeAdded = NULL; TargetEntry *targetEntryToAdd = NULL; - /* - * Although it is not the best place to error out when facing with reference - * tables, we decide to error out here. Otherwise, we need to add equality - * for each reference table and it is more complex to implement. In the - * future implementation all checks will be gathered to single point. - */ - if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot pushdown this query"), - errdetail( - "Reference tables are not allowed with set operations"))); - } - /* * We first check whether UNION ALLs are pulled up or not. Note that Postgres * planner creates AppendRelInfos per each UNION ALL query that is pulled up. diff --git a/src/test/regress/expected/multi_subquery.out b/src/test/regress/expected/multi_subquery.out index 283117669..838999f2f 100644 --- a/src/test/regress/expected/multi_subquery.out +++ b/src/test/regress/expected/multi_subquery.out @@ -27,6 +27,7 @@ SET shardmaxvalue = '14947' WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'orders_subquery'::regclass ORDER BY shardid DESC LIMIT 1); +SET client_min_messages TO DEBUG1; -- If group by is not on partition column then we recursively plan SELECT avg(order_count) @@ -38,12 +39,14 @@ FROM lineitem_subquery GROUP BY l_suppkey) AS order_counts; +DEBUG: generating subplan 2_1 for subquery SELECT l_suppkey, count(*) AS order_count FROM public.lineitem_subquery GROUP BY l_suppkey +DEBUG: Plan 2 query after replacing subqueries and CTEs: SELECT avg(order_count) AS avg FROM (SELECT intermediate_result.l_suppkey, intermediate_result.order_count FROM read_intermediate_result('2_1'::text, 'binary'::citus_copy_format) intermediate_result(l_suppkey integer, order_count bigint)) order_counts avg -------------------- 1.7199369356456930 (1 row) --- Check that we error out if join is not on partition columns. +-- Check that we recursively plan if join is not on partition columns. SELECT avg(unit_price) FROM @@ -55,7 +58,35 @@ FROM orders_subquery GROUP BY l_orderkey) AS unit_prices; -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +ERROR: cannot perform distributed planning on this query +DETAIL: Cartesian products are currently unsupported +-- this query is only required to execute +-- the following query given that recursive planning +-- (in general real-time queries in transactions) +-- do not execute shard fetch tasks and the next +-- query relies on that +SELECT + l_orderkey, + avg(o_totalprice / l_quantity) AS unit_price + FROM + lineitem_subquery, + orders_subquery + WHERE + l_orderkey = o_custkey + GROUP BY + l_orderkey + ORDER BY 2 DESC, 1 DESC + LIMIT 5; +DEBUG: push down of limit count: 5 + l_orderkey | unit_price +------------+------------------------ + 1124 | 56102.2804738959822181 + 230 | 54613.8568599033816703 + 935 | 51688.6111227238944448 + 451 | 51673.9297867063492063 + 646 | 50919.3957476807927619 +(5 rows) + SELECT avg(unit_price) FROM @@ -69,7 +100,14 @@ FROM l_orderkey = o_custkey GROUP BY l_orderkey) AS unit_prices; -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +DEBUG: generating subplan 7_1 for subquery SELECT lineitem_subquery.l_orderkey, avg((orders_subquery.o_totalprice / lineitem_subquery.l_quantity)) AS unit_price FROM public.lineitem_subquery, public.orders_subquery WHERE (lineitem_subquery.l_orderkey = orders_subquery.o_custkey) GROUP BY lineitem_subquery.l_orderkey +DEBUG: Plan 7 query after replacing subqueries and CTEs: SELECT avg(unit_price) AS avg FROM (SELECT intermediate_result.l_orderkey, intermediate_result.unit_price FROM read_intermediate_result('7_1'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint, unit_price numeric)) unit_prices + avg +------------------------ + 12973.7343244916919367 +(1 row) + +RESET client_min_messages; -- Subqueries without relation with a volatile functions (non-constant) are planned recursively SELECT count(*) FROM ( SELECT l_orderkey FROM lineitem_subquery JOIN (SELECT random()::int r) sub ON (l_orderkey = r) WHERE r > 10 @@ -86,11 +124,11 @@ SELECT count(*) FROM (SELECT l_orderkey FROM lineitem_subquery) UNION ALL (SELECT 1::bigint) ) b; -DEBUG: generating subplan 7_1 for subquery SELECT l_orderkey FROM public.lineitem_subquery +DEBUG: generating subplan 10_1 for subquery SELECT l_orderkey FROM public.lineitem_subquery DEBUG: Creating router plan DEBUG: Plan is router executable -DEBUG: generating subplan 7_2 for subquery SELECT intermediate_result.l_orderkey FROM read_intermediate_result('7_1'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint) UNION ALL SELECT (1)::bigint AS int8 -DEBUG: Plan 7 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.l_orderkey FROM read_intermediate_result('7_2'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint)) b +DEBUG: generating subplan 10_2 for subquery SELECT intermediate_result.l_orderkey FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint) UNION ALL SELECT (1)::bigint AS int8 +DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.l_orderkey FROM read_intermediate_result('10_2'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint)) b DEBUG: Creating router plan DEBUG: Plan is router executable count @@ -104,12 +142,12 @@ SELECT count(*) FROM (SELECT l_orderkey FROM lineitem_subquery) UNION (SELECT l_partkey FROM lineitem_subquery) ) b; -DEBUG: generating subplan 10_1 for subquery SELECT l_orderkey FROM public.lineitem_subquery -DEBUG: generating subplan 10_2 for subquery SELECT l_partkey FROM public.lineitem_subquery +DEBUG: generating subplan 13_1 for subquery SELECT l_orderkey FROM public.lineitem_subquery +DEBUG: generating subplan 13_2 for subquery SELECT l_partkey FROM public.lineitem_subquery DEBUG: Creating router plan DEBUG: Plan is router executable -DEBUG: generating subplan 10_3 for subquery SELECT intermediate_result.l_orderkey FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint) UNION SELECT intermediate_result.l_partkey FROM read_intermediate_result('10_2'::text, 'binary'::citus_copy_format) intermediate_result(l_partkey integer) -DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.l_orderkey FROM read_intermediate_result('10_3'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint)) b +DEBUG: generating subplan 13_3 for subquery SELECT intermediate_result.l_orderkey FROM read_intermediate_result('13_1'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint) UNION SELECT intermediate_result.l_partkey FROM read_intermediate_result('13_2'::text, 'binary'::citus_copy_format) intermediate_result(l_partkey integer) +DEBUG: Plan 13 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.l_orderkey FROM read_intermediate_result('13_3'::text, 'binary'::citus_copy_format) intermediate_result(l_orderkey bigint)) b DEBUG: Creating router plan DEBUG: Plan is router executable count diff --git a/src/test/regress/expected/multi_subquery_complex_queries.out b/src/test/regress/expected/multi_subquery_complex_queries.out index 1fb18774f..b139ee711 100644 --- a/src/test/regress/expected/multi_subquery_complex_queries.out +++ b/src/test/regress/expected/multi_subquery_complex_queries.out @@ -352,7 +352,9 @@ ORDER BY 3 | 75 (3 rows) --- not supported since events_subquery_5 is not joined on partition key +SET citus.enable_repartition_joins to ON; +SET client_min_messages TO DEBUG1; +-- recursively planned since events_subquery_5 is not joined on partition key SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType FROM ( SELECT @@ -423,7 +425,23 @@ GROUP BY types ORDER BY types; -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +DEBUG: cannot use real time executor with repartition jobs +HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. +DEBUG: generating subplan 16_1 for subquery SELECT max(events."time") AS max, 0 AS event, events.user_id FROM public.events_table events, public.users_table users WHERE ((events.user_id = users.value_2) AND (events.event_type = ANY (ARRAY[1, 2]))) GROUP BY events.user_id +DEBUG: generating subplan 16_2 for subquery SELECT "time", event, user_id FROM (SELECT events."time", 0 AS event, events.user_id FROM public.events_table events WHERE (events.event_type = ANY (ARRAY[1, 2]))) events_subquery_1 +DEBUG: generating subplan 16_3 for subquery SELECT "time", event, user_id FROM (SELECT events."time", 2 AS event, events.user_id FROM public.events_table events WHERE (events.event_type = ANY (ARRAY[3, 4]))) events_subquery_3 +DEBUG: generating subplan 16_4 for subquery SELECT "time", event, user_id FROM (SELECT events."time", 3 AS event, events.user_id FROM public.events_table events WHERE (events.event_type = ANY (ARRAY[5, 6]))) events_subquery_4 +DEBUG: generating subplan 16_5 for subquery SELECT intermediate_result."time", intermediate_result.event, intermediate_result.user_id FROM read_intermediate_result('16_2'::text, 'binary'::citus_copy_format) intermediate_result("time" timestamp without time zone, event integer, user_id integer) UNION SELECT events_subquery_2.max, events_subquery_2.event, events_subquery_2.user_id FROM (SELECT events_subquery_5.max, events_subquery_5.event, events_subquery_5.user_id FROM (SELECT intermediate_result.max, intermediate_result.event, intermediate_result.user_id FROM read_intermediate_result('16_1'::text, 'binary'::citus_copy_format) intermediate_result(max timestamp without time zone, event integer, user_id integer)) events_subquery_5) events_subquery_2 UNION SELECT intermediate_result."time", intermediate_result.event, intermediate_result.user_id FROM read_intermediate_result('16_3'::text, 'binary'::citus_copy_format) intermediate_result("time" timestamp without time zone, event integer, user_id integer) UNION SELECT intermediate_result."time", intermediate_result.event, intermediate_result.user_id FROM read_intermediate_result('16_4'::text, 'binary'::citus_copy_format) intermediate_result("time" timestamp without time zone, event integer, user_id integer) +DEBUG: Plan 16 query after replacing subqueries and CTEs: SELECT event_types AS types, count(*) AS sumofeventtype FROM (SELECT q.user_id, q."time", q.event_types, t.user_id, random() AS random FROM ((SELECT t_1.user_id, t_1."time", unnest(t_1.collected_events) AS event_types FROM (SELECT t1.user_id, min(t1."time") AS "time", array_agg(t1.event ORDER BY t1."time", t1.event DESC) AS collected_events FROM (SELECT intermediate_result."time", intermediate_result.event, intermediate_result.user_id FROM read_intermediate_result('16_5'::text, 'binary'::citus_copy_format) intermediate_result("time" timestamp without time zone, event integer, user_id integer)) t1 GROUP BY t1.user_id) t_1) q JOIN (SELECT users.user_id FROM public.users_table users WHERE ((users.value_1 > 0) AND (users.value_1 < 4))) t ON ((t.user_id = q.user_id)))) final_query(user_id, "time", event_types, user_id_1, random) GROUP BY event_types ORDER BY event_types + types | sumofeventtype +-------+---------------- + 0 | 449 + 2 | 433 + 3 | 75 +(3 rows) + +RESET client_min_messages; +SET citus.enable_repartition_joins to OFF; -- not supported since the join is not equi join SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType FROM @@ -1461,7 +1479,9 @@ ORDER BY user_id DESC LIMIT 10; ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator --- not supported since the inner JOIN is not on the partition key +SET citus.enable_repartition_joins to ON; +SET client_min_messages TO DEBUG1; +-- recursively planner since the inner JOIN is not on the partition key SELECT user_id, lastseen FROM (SELECT @@ -1514,7 +1534,13 @@ FROM ORDER BY user_id DESC LIMIT 10; -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +DEBUG: cannot use real time executor with repartition jobs +HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. +DEBUG: generating subplan 55_1 for subquery SELECT user_where_1_1.user_id FROM ((SELECT users.user_id FROM public.users_table users WHERE ((users.user_id > 1) AND (users.user_id < 4) AND (users.value_1 > 2))) user_where_1_1 JOIN (SELECT users.user_id, users.value_1 FROM public.users_table users WHERE ((users.user_id > 1) AND (users.user_id < 4) AND (users.value_2 > 3))) user_where_1_join_1 ON ((user_where_1_1.user_id = user_where_1_join_1.value_1))) +ERROR: cannot push down this subquery +DETAIL: Limit in subquery is currently unsupported when a subquery references a column from another query +SET citus.enable_repartition_joins to OFF; +RESET client_min_messages; -- not supported since upper LATERAL JOIN is not equi join SELECT user_id, lastseen FROM @@ -1676,7 +1702,9 @@ ORDER BY 1 | 0 (4 rows) --- not supported since the first inner join is not on the partition key +SET citus.enable_repartition_joins to ON; +SET client_min_messages TO DEBUG1; +-- recursively planned since the first inner join is not on the partition key SELECT count(*) AS value, "generated_group_field" FROM @@ -1718,8 +1746,20 @@ GROUP BY "generated_group_field" ORDER BY generated_group_field DESC, value DESC; -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator --- not supported since the first inner join is not an equi join +DEBUG: cannot use real time executor with repartition jobs +HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. +DEBUG: generating subplan 63_1 for subquery SELECT user_where_1_1.real_user_id FROM ((SELECT users.user_id AS real_user_id FROM public.users_table users WHERE ((users.user_id > 1) AND (users.user_id < 4) AND (users.value_2 > 3))) user_where_1_1 JOIN (SELECT users.user_id, users.value_2 FROM public.users_table users WHERE ((users.user_id > 1) AND (users.user_id < 4) AND (users.value_3 > (3)::double precision))) user_where_1_join_1 ON ((user_where_1_1.real_user_id = user_where_1_join_1.value_2))) +DEBUG: generating subplan 63_2 for subquery SELECT DISTINCT real_user_id, generated_group_field FROM (SELECT "eventQuery".real_user_id, "eventQuery"."time", random() AS random, "eventQuery".value_2 AS generated_group_field FROM (SELECT temp_data_queries."time", temp_data_queries.user_id, temp_data_queries.value_2, user_filters_1.real_user_id FROM ((SELECT events."time", events.user_id, events.value_2 FROM public.events_table events WHERE ((events.user_id > 1) AND (events.user_id < 4) AND (events.event_type = ANY (ARRAY[4, 5])))) temp_data_queries JOIN (SELECT intermediate_result.real_user_id FROM read_intermediate_result('63_1'::text, 'binary'::citus_copy_format) intermediate_result(real_user_id integer)) user_filters_1 ON ((temp_data_queries.user_id = user_filters_1.real_user_id)))) "eventQuery") "pushedDownQuery" +DEBUG: Plan 63 query after replacing subqueries and CTEs: SELECT count(*) AS value, generated_group_field FROM (SELECT intermediate_result.real_user_id, intermediate_result.generated_group_field FROM read_intermediate_result('63_2'::text, 'binary'::citus_copy_format) intermediate_result(real_user_id integer, generated_group_field integer)) "pushedDownQuery" GROUP BY generated_group_field ORDER BY generated_group_field DESC, (count(*)) DESC + value | generated_group_field +-------+----------------------- + 1 | 5 + 2 | 2 + 2 | 1 + 1 | 0 +(4 rows) + +-- recursive planning didn't kick-in since the non-equi join is among subqueries SELECT count(*) AS value, "generated_group_field" FROM @@ -1762,6 +1802,9 @@ GROUP BY ORDER BY generated_group_field DESC, value DESC; ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator + +SET citus.enable_repartition_joins to OFF; +RESET client_min_messages; -- single level inner joins SELECT "value_3", count(*) AS cnt @@ -1809,7 +1852,10 @@ ORDER BY cnt, value_3 DESC LIMIT 10; 2 | 35 (6 rows) --- not supported since there is no partition column equality at all +SET citus.enable_repartition_joins to ON; +SET client_min_messages TO DEBUG1; +-- not supported since there is no column equality at all +-- but still recursive planning is tried SELECT "value_3", count(*) AS cnt FROM @@ -1846,7 +1892,10 @@ FROM ) segmentalias_1) "tempQuery" GROUP BY "value_3" ORDER BY cnt, value_3 DESC LIMIT 10; -ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator +ERROR: cannot perform distributed planning on this query +DETAIL: Cartesian products are currently unsupported +SET citus.enable_repartition_joins to OFF; +RESET client_min_messages; -- nested LATERAL JOINs SET citus.subquery_pushdown to ON; SELECT * diff --git a/src/test/regress/expected/multi_view.out b/src/test/regress/expected/multi_view.out index aefa72866..a34c38b16 100644 --- a/src/test/regress/expected/multi_view.out +++ b/src/test/regress/expected/multi_view.out @@ -845,7 +845,7 @@ EXPLAIN (COSTS FALSE) SELECT et.* FROM recent_10_users JOIN events_table et USIN -> Sort Sort Key: remote_scan."time" DESC -> Custom Scan (Citus Real-Time) - -> Distributed Subplan 91_1 + -> Distributed Subplan 93_1 -> Limit -> Sort Sort Key: max((max(remote_scan.lastseen))) DESC diff --git a/src/test/regress/expected/set_operations.out b/src/test/regress/expected/set_operations.out index 9d53a7d50..b976ac7fd 100644 --- a/src/test/regress/expected/set_operations.out +++ b/src/test/regress/expected/set_operations.out @@ -776,7 +776,6 @@ DEBUG: Plan is router executable -- repartition is recursively planned with the set operation (SELECT x FROM test) INTERSECT (SELECT t1.x FROM test as t1, test as t2 WHERE t1.x = t2.y) ORDER BY 1 DESC; -DEBUG: generating subplan 164_1 for subquery SELECT x FROM recursive_union.test DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 3 @@ -807,8 +806,9 @@ DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 24 DEBUG: cannot use real time executor with repartition jobs HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. -DEBUG: generating subplan 164_2 for subquery SELECT t1.x FROM recursive_union.test t1, recursive_union.test t2 WHERE (t1.x = t2.y) -DEBUG: Plan 164 query after replacing subqueries and CTEs: SELECT intermediate_result.x FROM read_intermediate_result('164_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer) INTERSECT SELECT intermediate_result.x FROM read_intermediate_result('164_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer) ORDER BY 1 DESC +DEBUG: generating subplan 164_1 for subquery SELECT t1.x FROM recursive_union.test t1, recursive_union.test t2 WHERE (t1.x = t2.y) +DEBUG: generating subplan 164_2 for subquery SELECT x FROM recursive_union.test +DEBUG: Plan 164 query after replacing subqueries and CTEs: SELECT intermediate_result.x FROM read_intermediate_result('164_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer) INTERSECT SELECT intermediate_result.x FROM read_intermediate_result('164_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer) ORDER BY 1 DESC DEBUG: Creating router plan DEBUG: Plan is router executable x diff --git a/src/test/regress/sql/multi_subquery.sql b/src/test/regress/sql/multi_subquery.sql index 070162447..c22f436e7 100644 --- a/src/test/regress/sql/multi_subquery.sql +++ b/src/test/regress/sql/multi_subquery.sql @@ -29,6 +29,7 @@ SET WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'orders_subquery'::regclass ORDER BY shardid DESC LIMIT 1); +SET client_min_messages TO DEBUG1; -- If group by is not on partition column then we recursively plan SELECT avg(order_count) @@ -41,7 +42,7 @@ FROM GROUP BY l_suppkey) AS order_counts; --- Check that we error out if join is not on partition columns. +-- Check that we recursively plan if join is not on partition columns. SELECT avg(unit_price) FROM @@ -54,6 +55,24 @@ FROM GROUP BY l_orderkey) AS unit_prices; +-- this query is only required to execute +-- the following query given that recursive planning +-- (in general real-time queries in transactions) +-- do not execute shard fetch tasks and the next +-- query relies on that +SELECT + l_orderkey, + avg(o_totalprice / l_quantity) AS unit_price + FROM + lineitem_subquery, + orders_subquery + WHERE + l_orderkey = o_custkey + GROUP BY + l_orderkey + ORDER BY 2 DESC, 1 DESC + LIMIT 5; + SELECT avg(unit_price) FROM @@ -68,6 +87,8 @@ FROM GROUP BY l_orderkey) AS unit_prices; +RESET client_min_messages; + -- Subqueries without relation with a volatile functions (non-constant) are planned recursively SELECT count(*) FROM ( SELECT l_orderkey FROM lineitem_subquery JOIN (SELECT random()::int r) sub ON (l_orderkey = r) WHERE r > 10 diff --git a/src/test/regress/sql/multi_subquery_complex_queries.sql b/src/test/regress/sql/multi_subquery_complex_queries.sql index 8502b66a5..b5b089c8a 100644 --- a/src/test/regress/sql/multi_subquery_complex_queries.sql +++ b/src/test/regress/sql/multi_subquery_complex_queries.sql @@ -319,7 +319,10 @@ GROUP BY ORDER BY types; --- not supported since events_subquery_5 is not joined on partition key +SET citus.enable_repartition_joins to ON; +SET client_min_messages TO DEBUG1; + +-- recursively planned since events_subquery_5 is not joined on partition key SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType FROM ( SELECT @@ -391,6 +394,9 @@ GROUP BY ORDER BY types; +RESET client_min_messages; +SET citus.enable_repartition_joins to OFF; + -- not supported since the join is not equi join SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType FROM @@ -1357,7 +1363,11 @@ ORDER BY user_id DESC LIMIT 10; --- not supported since the inner JOIN is not on the partition key + +SET citus.enable_repartition_joins to ON; +SET client_min_messages TO DEBUG1; + +-- recursively planner since the inner JOIN is not on the partition key SELECT user_id, lastseen FROM (SELECT @@ -1412,6 +1422,9 @@ ORDER BY LIMIT 10; +SET citus.enable_repartition_joins to OFF; +RESET client_min_messages; + -- not supported since upper LATERAL JOIN is not equi join SELECT user_id, lastseen FROM @@ -1565,7 +1578,11 @@ GROUP BY ORDER BY generated_group_field DESC, value DESC; --- not supported since the first inner join is not on the partition key + +SET citus.enable_repartition_joins to ON; +SET client_min_messages TO DEBUG1; + +-- recursively planned since the first inner join is not on the partition key SELECT count(*) AS value, "generated_group_field" FROM @@ -1608,7 +1625,7 @@ GROUP BY ORDER BY generated_group_field DESC, value DESC; --- not supported since the first inner join is not an equi join +-- recursive planning didn't kick-in since the non-equi join is among subqueries SELECT count(*) AS value, "generated_group_field" FROM @@ -1651,6 +1668,10 @@ GROUP BY ORDER BY generated_group_field DESC, value DESC; + +SET citus.enable_repartition_joins to OFF; +RESET client_min_messages; + -- single level inner joins SELECT "value_3", count(*) AS cnt @@ -1690,7 +1711,12 @@ GROUP BY "value_3" ORDER BY cnt, value_3 DESC LIMIT 10; --- not supported since there is no partition column equality at all + +SET citus.enable_repartition_joins to ON; +SET client_min_messages TO DEBUG1; + +-- not supported since there is no column equality at all +-- but still recursive planning is tried SELECT "value_3", count(*) AS cnt FROM @@ -1728,6 +1754,9 @@ FROM GROUP BY "value_3" ORDER BY cnt, value_3 DESC LIMIT 10; +SET citus.enable_repartition_joins to OFF; +RESET client_min_messages; + -- nested LATERAL JOINs SET citus.subquery_pushdown to ON; SELECT *