From 4e3c6224fb1687e97b23eb7912179ac6b833313a Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Sat, 19 Jun 2021 00:51:27 +0300 Subject: [PATCH] Fix UNION not being pushdown Postgres optimizes column fields that are not needed in the output. We were relying on these fields to understand if it is safe to push down a union query. This fix looks at the parse query, which has the original column fields to detect if it is safe to push down a union query. --- .../planner/query_pushdown_planning.c | 2 +- .../distributed/planner/recursive_planning.c | 2 +- .../relation_restriction_equivalence.c | 145 ++++++++++++++---- .../relation_restriction_equivalence.h | 2 +- .../expected/local_dist_join_mixed.out | 5 +- src/test/regress/expected/union_pushdown.out | 85 +++++++++- src/test/regress/sql/union_pushdown.sql | 44 +++++- 7 files changed, 245 insertions(+), 40 deletions(-) diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 632b9dd72..cf7314e94 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -582,7 +582,7 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery, */ if (ContainsUnionSubquery(originalQuery)) { - if (!SafeToPushdownUnionSubquery(plannerRestrictionContext)) + if (!SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "cannot pushdown the subquery since not all subqueries " diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 209714f74..b2a8140da 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -1013,7 +1013,7 @@ ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *contex PlannerRestrictionContext *filteredRestrictionContext = FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query); - if (!SafeToPushdownUnionSubquery(filteredRestrictionContext)) + if (!SafeToPushdownUnionSubquery(query, filteredRestrictionContext)) { /* * The distribution column is not in the same place in all sides diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index fb6a6a306..6a2c1521c 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -59,6 +59,13 @@ typedef struct AttributeEquivalenceClass Index unionQueryPartitionKeyIndex; } AttributeEquivalenceClass; +typedef struct FindQueryContainingRteIdentityContext +{ + int targetRTEIdentity; + Query *query; + Query *tempQuery; +}FindQueryContainingRteIdentityContext; + /* * AttributeEquivalenceClassMember - one member expression of an * AttributeEquivalenceClass. The important thing to consider is that @@ -142,8 +149,9 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass firstClass, AttributeEquivalenceClass * secondClass); -static Index RelationRestrictionPartitionKeyIndex(RelationRestriction * - relationRestriction); +static Var * RelationRestrictionPartitionKeyIndex(Query *query, RelationRestriction * + relationRestriction, + Index *partitionKeyIndex); static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext * restrictionContext); static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node); @@ -155,6 +163,11 @@ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEn queryRteIdentities); static Relids QueryRteIdentities(Query *queryTree); +static bool FindQueryContainingRTEIdentityInternal(Node *node, + FindQueryContainingRteIdentityContext * + context); +static Query * FindQueryContainingRTEIdentity(Query *mainQuery, int rteIndex); + #if PG_VERSION_NUM >= PG_VERSION_13 static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo); #endif @@ -194,7 +207,7 @@ AllDistributionKeysInQueryAreEqual(Query *originalQuery, if (originalQuery->setOperations || ContainsUnionSubquery(originalQuery)) { - return SafeToPushdownUnionSubquery(plannerRestrictionContext); + return SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext); } return false; @@ -244,7 +257,8 @@ ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext) * safe to push down, the function would fail to return true. */ bool -SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext) +SafeToPushdownUnionSubquery(Query *originalQuery, + PlannerRestrictionContext *plannerRestrictionContext) { RelationRestrictionContext *restrictionContext = plannerRestrictionContext->relationRestrictionContext; @@ -267,10 +281,8 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); Index partitionKeyIndex = InvalidAttrNumber; PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo; - List *targetList = relationPlannerRoot->parse->targetList; List *appendRelList = relationPlannerRoot->append_rel_list; Var *varToBeAdded = NULL; - TargetEntry *targetEntryToAdd = NULL; /* * We first check whether UNION ALLs are pulled up or not. Note that Postgres @@ -294,8 +306,9 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext } else { - partitionKeyIndex = - RelationRestrictionPartitionKeyIndex(relationRestriction); + varToBeAdded = + RelationRestrictionPartitionKeyIndex(originalQuery, relationRestriction, + &partitionKeyIndex); /* union does not have partition key in the target list */ if (partitionKeyIndex == 0) @@ -303,13 +316,12 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext continue; } - targetEntryToAdd = list_nth(targetList, partitionKeyIndex - 1); - if (!IsA(targetEntryToAdd->expr, Var)) - { - continue; - } - - varToBeAdded = (Var *) targetEntryToAdd->expr; + /* + * we update the varno because we use the original parse tree for finding the + * var. However the rest of the code relies on a query tree that might be different + * than the original parse tree because of postgres optimizations. + */ + varToBeAdded->varno = relationRestriction->index; } /* @@ -1760,16 +1772,29 @@ ContainsUnionSubquery(Query *queryTree) * index that the partition key of the relation exists in the query. The query is * found in the planner info of the relation restriction. */ -static Index -RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) +static Var * +RelationRestrictionPartitionKeyIndex(Query *originalQuery, + RelationRestriction *relationRestriction, + Index *partitionKeyIndex) { + int targetRTEIndex = GetRTEIdentity(relationRestriction->rte); + Query *originalQueryContainingRTEIdentity = + FindQueryContainingRTEIdentity(originalQuery, targetRTEIndex); + if (!originalQueryContainingRTEIdentity) + { + /* + * We should always find the query but we have this check for sanity. + * This check makes sure that if there is a bug while finding the query, + * we don't get a crash etc. and the only downside will be we might be recursively + * planning a query that could be pushed down. + */ + return NULL; + } + + List *relationTargetList = originalQueryContainingRTEIdentity->targetList; + ListCell *targetEntryCell = NULL; Index partitionKeyTargetAttrIndex = 0; - - PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo; - Query *relationPlannerParseQuery = relationPlannerRoot->parse; - List *relationTargetList = relationPlannerParseQuery->targetList; - foreach(targetEntryCell, relationTargetList) { TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); @@ -1779,18 +1804,86 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) if (!targetEntry->resjunk && IsA(targetExpression, Var) && - IsPartitionColumn(targetExpression, relationPlannerParseQuery)) + IsPartitionColumn(targetExpression, originalQueryContainingRTEIdentity)) { Var *targetColumn = (Var *) targetExpression; - if (targetColumn->varno == relationRestriction->index) + Oid relationId = InvalidOid; + Var *column = NULL; + FindReferencedTableColumn(targetExpression, NIL, + originalQueryContainingRTEIdentity, &relationId, + &column); + + targetColumn = column; + RangeTblEntry *resRTE = + (RangeTblEntry *) list_nth(originalQueryContainingRTEIdentity->rtable, + targetColumn->varno - 1); + if (resRTE->rtekind == RTE_RELATION && GetRTEIdentity(resRTE) == + targetRTEIndex) { - return partitionKeyTargetAttrIndex; + *partitionKeyIndex = partitionKeyTargetAttrIndex; + return (Var *) copyObject(targetColumn); } } } - return InvalidAttrNumber; + return NULL; +} + + +/* + * FindQueryContainingRTEIdentity finds the query/subquery that has an RTE + * with rteIndex in its rtable. + */ +static Query * +FindQueryContainingRTEIdentity(Query *mainQuery, int rteIndex) +{ + FindQueryContainingRteIdentityContext *findRteIdentityContext = + palloc0(sizeof(FindQueryContainingRteIdentityContext)); + findRteIdentityContext->targetRTEIdentity = rteIndex; + FindQueryContainingRTEIdentityInternal((Node *) mainQuery, findRteIdentityContext); + return findRteIdentityContext->query; +} + + +/* + * FindQueryContainingRTEIdentityInternal walks on the given node to find a query + * which has an RTE that has a given rteIdentity. + */ +static bool +FindQueryContainingRTEIdentityInternal(Node *node, + FindQueryContainingRteIdentityContext *context) +{ + if (node == NULL) + { + return false; + } + if (IsA(node, Query)) + { + Query *query = (Query *) node; + Query *prevQuery = context->tempQuery; + context->tempQuery = query; + query_tree_walker(query, FindQueryContainingRTEIdentityInternal, context, + QTW_EXAMINE_RTES_BEFORE); + context->tempQuery = prevQuery; + return false; + } + + if (!IsA(node, RangeTblEntry)) + { + return expression_tree_walker(node, FindQueryContainingRTEIdentityInternal, + context); + } + RangeTblEntry *rte = (RangeTblEntry *) node; + if (rte->rtekind == RTE_RELATION) + { + if (GetRTEIdentity(rte) == context->targetRTEIdentity) + { + context->query = context->tempQuery; + return true; + } + } + return false; } diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index e1ece87e6..ccd50a6db 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -20,7 +20,7 @@ extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); -extern bool SafeToPushdownUnionSubquery(PlannerRestrictionContext * +extern bool SafeToPushdownUnionSubquery(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); extern bool ContainsUnionSubquery(Query *queryTree); extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext * diff --git a/src/test/regress/expected/local_dist_join_mixed.out b/src/test/regress/expected/local_dist_join_mixed.out index 2baf2064d..0e3e0aad4 100644 --- a/src/test/regress/expected/local_dist_join_mixed.out +++ b/src/test/regress/expected/local_dist_join_mixed.out @@ -740,10 +740,7 @@ DEBUG: Wrapping relation "local" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true DEBUG: Wrapping relation "local" to a subquery DEBUG: generating subplan XXX_2 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true -DEBUG: generating subplan XXX_3 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo -DEBUG: generating subplan XXX_4 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba -DEBUG: generating subplan XXX_5 for subquery SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone) UNION ALL SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) bar +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT fo.id, fo.name, fo.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo UNION ALL SELECT ba.id, ba.name, ba.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba) bar count --------------------------------------------------------------------- 202 diff --git a/src/test/regress/expected/union_pushdown.out b/src/test/regress/expected/union_pushdown.out index ec56f3949..183c53e26 100644 --- a/src/test/regress/expected/union_pushdown.out +++ b/src/test/regress/expected/union_pushdown.out @@ -912,14 +912,13 @@ $$); f (1 row) --- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781 SELECT public.explain_has_distributed_subplan($$ EXPLAIN SELECT COUNT(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ @@ -928,7 +927,7 @@ SELECT AVG(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ @@ -937,7 +936,7 @@ SELECT SUM(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) SELECT public.explain_has_distributed_subplan($$ @@ -946,7 +945,81 @@ SELECT MAX(k) FROM v; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f +(1 row) + +CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f (1 row) -- order by prevents postgres from optimizing fields so can be pushed down @@ -966,7 +1039,7 @@ SELECT k, COUNT(*) FROM v GROUP BY k ORDER BY k; $$); explain_has_distributed_subplan --------------------------------------------------------------------- - t + f (1 row) RESET client_min_messages; diff --git a/src/test/regress/sql/union_pushdown.sql b/src/test/regress/sql/union_pushdown.sql index c40f7c6fd..04e28f584 100644 --- a/src/test/regress/sql/union_pushdown.sql +++ b/src/test/regress/sql/union_pushdown.sql @@ -682,7 +682,6 @@ EXPLAIN SELECT MAX(id) FROM v; $$); --- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781 SELECT public.explain_has_distributed_subplan($$ EXPLAIN SELECT COUNT(k) FROM v; @@ -703,6 +702,49 @@ EXPLAIN SELECT MAX(k) FROM v; $$); +CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1; + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + +CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1; +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT COUNT(*) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT AVG(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT SUM(k) FROM v; +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT MAX(k) FROM v; +$$); + -- order by prevents postgres from optimizing fields so can be pushed down SELECT public.explain_has_distributed_subplan($$ EXPLAIN