diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 906ae4a49..d28103d81 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -373,8 +373,7 @@ AssignRTEIdentities(List *rangeTableList, int rteIdCounter) * Note that we're only interested in RTE_RELATIONs and thus assigning * identifiers to those RTEs only. */ - if (rangeTableEntry->rtekind == RTE_RELATION && - rangeTableEntry->values_lists == NIL) + if (rangeTableEntry->rtekind == RTE_RELATION && rangeTableEntry->values_lists == NIL) { AssignRTEIdentity(rangeTableEntry, rteIdCounter++); } diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 632b9dd72..cf7314e94 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -582,7 +582,7 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery, */ if (ContainsUnionSubquery(originalQuery)) { - if (!SafeToPushdownUnionSubquery(plannerRestrictionContext)) + if (!SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "cannot pushdown the subquery since not all subqueries " diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 209714f74..b2a8140da 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -1013,7 +1013,7 @@ ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *contex PlannerRestrictionContext *filteredRestrictionContext = FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query); - if (!SafeToPushdownUnionSubquery(filteredRestrictionContext)) + if (!SafeToPushdownUnionSubquery(query, filteredRestrictionContext)) { /* * The distribution column is not in the same place in all sides diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index fb6a6a306..c00ce5821 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -59,6 +59,11 @@ typedef struct AttributeEquivalenceClass Index unionQueryPartitionKeyIndex; } AttributeEquivalenceClass; +typedef struct FindRteIdentityContext { + uint32 rteIdentity; + Query* query; +}FindRteIdentityContext; + /* * AttributeEquivalenceClassMember - one member expression of an * AttributeEquivalenceClass. The important thing to consider is that @@ -142,8 +147,8 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass firstClass, AttributeEquivalenceClass * secondClass); -static Index RelationRestrictionPartitionKeyIndex(RelationRestriction * - relationRestriction); +static Var* RelationRestrictionPartitionKeyIndex(Query* query, RelationRestriction * + relationRestriction, Index* partitionKeyIndex); static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext * restrictionContext); static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node); @@ -155,6 +160,9 @@ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEn queryRteIdentities); static Relids QueryRteIdentities(Query *queryTree); +static bool +findRteIdentityWalker(Node *node, FindRteIdentityContext* context); + #if PG_VERSION_NUM >= PG_VERSION_13 static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo); #endif @@ -194,7 +202,7 @@ AllDistributionKeysInQueryAreEqual(Query *originalQuery, if (originalQuery->setOperations || ContainsUnionSubquery(originalQuery)) { - return SafeToPushdownUnionSubquery(plannerRestrictionContext); + return SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext); } return false; @@ -244,7 +252,7 @@ ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext) * safe to push down, the function would fail to return true. */ bool -SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext) +SafeToPushdownUnionSubquery(Query* originalQuery, PlannerRestrictionContext *plannerRestrictionContext) { RelationRestrictionContext *restrictionContext = plannerRestrictionContext->relationRestrictionContext; @@ -267,10 +275,8 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); Index partitionKeyIndex = InvalidAttrNumber; PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo; - List *targetList = relationPlannerRoot->parse->targetList; List *appendRelList = relationPlannerRoot->append_rel_list; Var *varToBeAdded = NULL; - TargetEntry *targetEntryToAdd = NULL; /* * We first check whether UNION ALLs are pulled up or not. Note that Postgres @@ -294,8 +300,8 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext } else { - partitionKeyIndex = - RelationRestrictionPartitionKeyIndex(relationRestriction); + varToBeAdded = + RelationRestrictionPartitionKeyIndex(originalQuery, relationRestriction, &partitionKeyIndex); /* union does not have partition key in the target list */ if (partitionKeyIndex == 0) @@ -303,13 +309,12 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext continue; } - targetEntryToAdd = list_nth(targetList, partitionKeyIndex - 1); - if (!IsA(targetEntryToAdd->expr, Var)) - { - continue; - } - - varToBeAdded = (Var *) targetEntryToAdd->expr; + /* + * we update the varno because we use the original parse tree for finding the + * var. However the rest of the code relies on a query tree that might be different + * than the original parse tree because of postgres optimizations. + */ + varToBeAdded->varno = relationRestriction->index; } /* @@ -1760,15 +1765,23 @@ ContainsUnionSubquery(Query *queryTree) * index that the partition key of the relation exists in the query. The query is * found in the planner info of the relation restriction. */ -static Index -RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) +static Var* +RelationRestrictionPartitionKeyIndex(Query* originalQuery, RelationRestriction *relationRestriction, Index* partitionKeyIndex) { ListCell *targetEntryCell = NULL; Index partitionKeyTargetAttrIndex = 0; - PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo; - Query *relationPlannerParseQuery = relationPlannerRoot->parse; - List *relationTargetList = relationPlannerParseQuery->targetList; + + FindRteIdentityContext* findRteIdentityContext = palloc0(sizeof(FindRteIdentityContext)); + findRteIdentityContext->rteIdentity = GetRTEIdentity(relationRestriction->rte); + findRteIdentityWalker((Node*) originalQuery, findRteIdentityContext); + if (findRteIdentityContext->query == NULL) { + return NULL; + } + List *relationTargetList = findRteIdentityContext->query->targetList; + + Query *relationPlannerParseQuery = findRteIdentityContext->query; + foreach(targetEntryCell, relationTargetList) { @@ -1782,15 +1795,47 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) IsPartitionColumn(targetExpression, relationPlannerParseQuery)) { Var *targetColumn = (Var *) targetExpression; - - if (targetColumn->varno == relationRestriction->index) + RangeTblEntry* res = (RangeTblEntry*) list_nth(relationPlannerParseQuery->rtable, targetColumn->varno - 1); + if (res->rtekind == RTE_RELATION && GetRTEIdentity(res) == findRteIdentityContext->rteIdentity) { - return partitionKeyTargetAttrIndex; + *partitionKeyIndex = partitionKeyTargetAttrIndex; + return (Var*)copyObject(targetColumn); } } } - return InvalidAttrNumber; + return NULL; +} + +/* + * findRteIdentityWalker walks on the given node to find a query + * which has an RTE that has a given rteIdentity. + */ +static bool +findRteIdentityWalker(Node *node, FindRteIdentityContext* context) +{ + if (node == NULL) + { + return false; + } + if (IsA(node, Query)) + { + Query *query = (Query *) node; + RangeTblEntry* rte = NULL; + foreach_ptr(rte, query->rtable) { + if (rte->rtekind == RTE_RELATION) { + if (GetRTEIdentity(rte) == context->rteIdentity) { + context->query = query; + return true; + } + } + } + query_tree_walker(query, findRteIdentityWalker, context, + QTW_EXAMINE_RTES_BEFORE); + + return false; + } + return false; } diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index e1ece87e6..7375f0aa2 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -20,7 +20,7 @@ extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); -extern bool SafeToPushdownUnionSubquery(PlannerRestrictionContext * +extern bool SafeToPushdownUnionSubquery(Query* originalQuery, PlannerRestrictionContext * plannerRestrictionContext); extern bool ContainsUnionSubquery(Query *queryTree); extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext * diff --git a/src/test/regress/expected/local_dist_join_mixed.out b/src/test/regress/expected/local_dist_join_mixed.out index 2baf2064d..0e3e0aad4 100644 --- a/src/test/regress/expected/local_dist_join_mixed.out +++ b/src/test/regress/expected/local_dist_join_mixed.out @@ -740,10 +740,7 @@ DEBUG: Wrapping relation "local" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true DEBUG: Wrapping relation "local" to a subquery DEBUG: generating subplan XXX_2 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true -DEBUG: generating subplan XXX_3 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo -DEBUG: generating subplan XXX_4 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba -DEBUG: generating subplan XXX_5 for subquery SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone) UNION ALL SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) bar +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT fo.id, fo.name, fo.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo UNION ALL SELECT ba.id, ba.name, ba.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba) bar count --------------------------------------------------------------------- 202 diff --git a/src/test/regress/expected/union_oushdown.out b/src/test/regress/expected/union_oushdown.out new file mode 100644 index 000000000..e69de29bb diff --git a/src/test/regress/expected/union_pushdown.out b/src/test/regress/expected/union_pushdown.out index ec56f3949..c1d5b0fa1 100644 --- a/src/test/regress/expected/union_pushdown.out +++ b/src/test/regress/expected/union_pushdown.out @@ -236,12 +236,8 @@ FROM UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 8 GROUP BY user_id)) AS bar WHERE user_id < 2000 ) as level_1 ) as level_2 ) as level_3 ORDER BY 1 DESC LIMIT 10; -DEBUG: push down of limit count: 10 - count ---------------------------------------------------------------------- - 78 -(1 row) - +DEBUG: generating subplan XXX_1 for subquery SELECT user_id FROM union_pushdown.users_table_part WHERE (value_2 OPERATOR(pg_catalog.=) 3) +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns -- safe to pushdown SELECT DISTINCT user_id FROM ( SELECT * FROM @@ -912,14 +908,13 @@ $$); f (1 row) --- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781 SELECT public.explain_has_distributed_subplan($$ EXPLAIN SELECT COUNT(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ @@ -928,7 +923,7 @@ SELECT AVG(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ @@ -937,7 +932,7 @@ SELECT SUM(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ @@ -946,7 +941,81 @@ SELECT MAX(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f +(1 row) + +CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f (1 row) -- order by prevents postgres from optimizing fields so can be pushed down @@ -966,7 +1035,7 @@ SELECT k, COUNT(*) FROM v GROUP BY k ORDER BY k; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) RESET client_min_messages; diff --git a/src/test/regress/sql/union_pushdown.sql b/src/test/regress/sql/union_pushdown.sql index c40f7c6fd..04e28f584 100644 --- a/src/test/regress/sql/union_pushdown.sql +++ b/src/test/regress/sql/union_pushdown.sql @@ -682,7 +682,6 @@ EXPLAIN SELECT MAX(id) FROM v; $$); --- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781 SELECT public.explain_has_distributed_subplan($$ EXPLAIN SELECT COUNT(k) FROM v; @@ -703,6 +702,49 @@ EXPLAIN SELECT MAX(k) FROM v; $$); +CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1; + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + +CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + -- order by prevents postgres from optimizing fields so can be pushed down SELECT public.explain_has_distributed_subplan($$ EXPLAIN