diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 632b9dd72..cf7314e94 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -582,7 +582,7 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery, */ if (ContainsUnionSubquery(originalQuery)) { - if (!SafeToPushdownUnionSubquery(plannerRestrictionContext)) + if (!SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "cannot pushdown the subquery since not all subqueries " diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 209714f74..b2a8140da 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -1013,7 +1013,7 @@ ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *contex PlannerRestrictionContext *filteredRestrictionContext = FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query); - if (!SafeToPushdownUnionSubquery(filteredRestrictionContext)) + if (!SafeToPushdownUnionSubquery(query, filteredRestrictionContext)) { /* * The distribution column is not in the same place in all sides diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index fb6a6a306..6a2c1521c 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -59,6 +59,13 @@ typedef struct AttributeEquivalenceClass Index unionQueryPartitionKeyIndex; } AttributeEquivalenceClass; +typedef struct FindQueryContainingRteIdentityContext +{ + int targetRTEIdentity; + Query *query; + Query *tempQuery; +}FindQueryContainingRteIdentityContext; + /* * AttributeEquivalenceClassMember - one member expression of an * AttributeEquivalenceClass. The important thing to consider is that @@ -142,8 +149,9 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass firstClass, AttributeEquivalenceClass * secondClass); -static Index RelationRestrictionPartitionKeyIndex(RelationRestriction * - relationRestriction); +static Var * RelationRestrictionPartitionKeyIndex(Query *query, RelationRestriction * + relationRestriction, + Index *partitionKeyIndex); static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext * restrictionContext); static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node); @@ -155,6 +163,11 @@ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEn queryRteIdentities); static Relids QueryRteIdentities(Query *queryTree); +static bool FindQueryContainingRTEIdentityInternal(Node *node, + FindQueryContainingRteIdentityContext * + context); +static Query * FindQueryContainingRTEIdentity(Query *mainQuery, int rteIndex); + #if PG_VERSION_NUM >= PG_VERSION_13 static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo); #endif @@ -194,7 +207,7 @@ AllDistributionKeysInQueryAreEqual(Query *originalQuery, if (originalQuery->setOperations || ContainsUnionSubquery(originalQuery)) { - return SafeToPushdownUnionSubquery(plannerRestrictionContext); + return SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext); } return false; @@ -244,7 +257,8 @@ ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext) * safe to push down, the function would fail to return true. */ bool -SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext) +SafeToPushdownUnionSubquery(Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext) { RelationRestrictionContext *restrictionContext = plannerRestrictionContext->relationRestrictionContext; @@ -267,10 +281,8 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); Index partitionKeyIndex = InvalidAttrNumber; PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo; - List *targetList = relationPlannerRoot->parse->targetList; List *appendRelList = relationPlannerRoot->append_rel_list; Var *varToBeAdded = NULL; - TargetEntry *targetEntryToAdd = NULL; /* * We first check whether UNION ALLs are pulled up or not. Note that Postgres @@ -294,8 +306,9 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext } else { - partitionKeyIndex = - RelationRestrictionPartitionKeyIndex(relationRestriction); + varToBeAdded = + RelationRestrictionPartitionKeyIndex(originalQuery, relationRestriction, + &partitionKeyIndex); /* union does not have partition key in the target list */ if (partitionKeyIndex == 0) @@ -303,13 +316,12 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext continue; } - targetEntryToAdd = list_nth(targetList, partitionKeyIndex - 1); - if (!IsA(targetEntryToAdd->expr, Var)) - { - continue; - } - - varToBeAdded = (Var *) targetEntryToAdd->expr; + /* + * we update the varno because we use the original parse tree for finding the + * var. However the rest of the code relies on a query tree that might be different + * than the original parse tree because of postgres optimizations. + */ + varToBeAdded->varno = relationRestriction->index; } /* @@ -1760,16 +1772,29 @@ ContainsUnionSubquery(Query *queryTree) * index that the partition key of the relation exists in the query. The query is * found in the planner info of the relation restriction. */ -static Index -RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) +static Var * +RelationRestrictionPartitionKeyIndex(Query *originalQuery, + RelationRestriction *relationRestriction, + Index *partitionKeyIndex) { + int targetRTEIndex = GetRTEIdentity(relationRestriction->rte); + Query *originalQueryContainingRTEIdentity = + FindQueryContainingRTEIdentity(originalQuery, targetRTEIndex); + if (!originalQueryContainingRTEIdentity) + { + /* + * We should always find the query but we have this check for sanity. + * This check makes sure that if there is a bug while finding the query, + * we don't get a crash etc. and the only downside will be we might be recursively + * planning a query that could be pushed down. + */ + return NULL; + } + + List *relationTargetList = originalQueryContainingRTEIdentity->targetList; + ListCell *targetEntryCell = NULL; Index partitionKeyTargetAttrIndex = 0; - - PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo; - Query *relationPlannerParseQuery = relationPlannerRoot->parse; - List *relationTargetList = relationPlannerParseQuery->targetList; - foreach(targetEntryCell, relationTargetList) { TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); @@ -1779,18 +1804,86 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) if (!targetEntry->resjunk && IsA(targetExpression, Var) && - IsPartitionColumn(targetExpression, relationPlannerParseQuery)) + IsPartitionColumn(targetExpression, originalQueryContainingRTEIdentity)) { Var *targetColumn = (Var *) targetExpression; - if (targetColumn->varno == relationRestriction->index) + Oid relationId = InvalidOid; + Var *column = NULL; + FindReferencedTableColumn(targetExpression, NIL, + originalQueryContainingRTEIdentity, &relationId, + &column); + + targetColumn = column; + RangeTblEntry *resRTE = + (RangeTblEntry *) list_nth(originalQueryContainingRTEIdentity->rtable, + targetColumn->varno - 1); + if (resRTE->rtekind == RTE_RELATION && GetRTEIdentity(resRTE) == + targetRTEIndex) { - return partitionKeyTargetAttrIndex; + *partitionKeyIndex = partitionKeyTargetAttrIndex; + return (Var *) copyObject(targetColumn); } } } - return InvalidAttrNumber; + return NULL; +} + + +/* + * FindQueryContainingRTEIdentity finds the query/subquery that has an RTE + * with rteIndex in its rtable. + */ +static Query * +FindQueryContainingRTEIdentity(Query *mainQuery, int rteIndex) +{ + FindQueryContainingRteIdentityContext *findRteIdentityContext = + palloc0(sizeof(FindQueryContainingRteIdentityContext)); + findRteIdentityContext->targetRTEIdentity = rteIndex; + FindQueryContainingRTEIdentityInternal((Node *) mainQuery, findRteIdentityContext); + return findRteIdentityContext->query; +} + + +/* + * FindQueryContainingRTEIdentityInternal walks on the given node to find a query + * which has an RTE that has a given rteIdentity. + */ +static bool +FindQueryContainingRTEIdentityInternal(Node *node, + FindQueryContainingRteIdentityContext *context) +{ + if (node == NULL) + { + return false; + } + if (IsA(node, Query)) + { + Query *query = (Query *) node; + Query *prevQuery = context->tempQuery; + context->tempQuery = query; + query_tree_walker(query, FindQueryContainingRTEIdentityInternal, context, + QTW_EXAMINE_RTES_BEFORE); + context->tempQuery = prevQuery; + return false; + } + + if (!IsA(node, RangeTblEntry)) + { + return expression_tree_walker(node, FindQueryContainingRTEIdentityInternal, + context); + } + RangeTblEntry *rte = (RangeTblEntry *) node; + if (rte->rtekind == RTE_RELATION) + { + if (GetRTEIdentity(rte) == context->targetRTEIdentity) + { + context->query = context->tempQuery; + return true; + } + } + return false; } diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index e1ece87e6..ccd50a6db 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -20,7 +20,7 @@ extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); -extern bool SafeToPushdownUnionSubquery(PlannerRestrictionContext * +extern bool SafeToPushdownUnionSubquery(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); extern bool ContainsUnionSubquery(Query *queryTree); extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext * diff --git a/src/test/regress/expected/local_dist_join_mixed.out b/src/test/regress/expected/local_dist_join_mixed.out index 2baf2064d..0e3e0aad4 100644 --- a/src/test/regress/expected/local_dist_join_mixed.out +++ b/src/test/regress/expected/local_dist_join_mixed.out @@ -740,10 +740,7 @@ DEBUG: Wrapping relation "local" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true DEBUG: Wrapping relation "local" to a subquery DEBUG: generating subplan XXX_2 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true -DEBUG: generating subplan XXX_3 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo -DEBUG: generating subplan XXX_4 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba -DEBUG: generating subplan XXX_5 for subquery SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone) UNION ALL SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) bar +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT fo.id, fo.name, fo.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo UNION ALL SELECT ba.id, ba.name, ba.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba) bar count --------------------------------------------------------------------- 202 diff --git a/src/test/regress/expected/union_pushdown.out b/src/test/regress/expected/union_pushdown.out index ec56f3949..183c53e26 100644 --- a/src/test/regress/expected/union_pushdown.out +++ b/src/test/regress/expected/union_pushdown.out @@ -912,14 +912,13 @@ $$); f (1 row) --- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781 SELECT public.explain_has_distributed_subplan($$ EXPLAIN SELECT COUNT(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ @@ -928,7 +927,7 @@ SELECT AVG(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ @@ -937,7 +936,7 @@ SELECT SUM(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ @@ -946,7 +945,81 @@ SELECT MAX(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f +(1 row) + +CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f (1 row) -- order by prevents postgres from optimizing fields so can be pushed down @@ -966,7 +1039,7 @@ SELECT k, COUNT(*) FROM v GROUP BY k ORDER BY k; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) RESET client_min_messages; diff --git a/src/test/regress/sql/union_pushdown.sql b/src/test/regress/sql/union_pushdown.sql index c40f7c6fd..04e28f584 100644 --- a/src/test/regress/sql/union_pushdown.sql +++ b/src/test/regress/sql/union_pushdown.sql @@ -682,7 +682,6 @@ EXPLAIN SELECT MAX(id) FROM v; $$); --- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781 SELECT public.explain_has_distributed_subplan($$ EXPLAIN SELECT COUNT(k) FROM v; @@ -703,6 +702,49 @@ EXPLAIN SELECT MAX(k) FROM v; $$); +CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1; + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + +CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + -- order by prevents postgres from optimizing fields so can be pushed down SELECT public.explain_has_distributed_subplan($$ EXPLAIN