From 3ac4c1c3a2d59213133ec0f47b585ad4c6a5bce6 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 26 Nov 2018 00:05:28 +0300 Subject: [PATCH] Pushdown filters on recursive relation planning PostgreSQL already keeps track of the restrictions that are on the relation. With this commit, Citus uses that information and pushes down the filters to the subquery that is recursively planned for the table that is in considiration. --- .../distributed/planner/recursive_planning.c | 27 +++++- .../relation_restriction_equivalence.c | 95 +++++++++++++++++++ .../relation_restriction_equivalence.h | 3 + .../expected/non_colocated_subquery_joins.out | 2 +- 4 files changed, 121 insertions(+), 6 deletions(-) diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 663cfd5cb..c87aa816c 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -76,6 +76,7 @@ #include "nodes/pg_list.h" #include "nodes/primnodes.h" #include "nodes/relation.h" +#include "optimizer/clauses.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/lsyscache.h" @@ -480,6 +481,7 @@ RecursivelyPlanNonColocatedJoinWalker(Node *joinNode, if (rte->rtekind == RTE_RELATION) { subquery = WrapRteRelationIntoSubquery(rte); + if (!SubqueryColocated(subquery, colocatedJoinChecker)) { RecursivelyPlanRTERelation(rte, recursivePlanningContext); @@ -1076,15 +1078,24 @@ IsLocalTableRTE(Node *node) * (e.g., users_table becomes (SELECT * FROM users_table) as users_table). * * Later, the subquery is recursively planned via RecursivelyPlanSubquery(). + * + * The function pushes down all the restriction appering on the table to the + * subquery in order to restrict the total amount of data that'd be pulled and + * pushed back. */ static void RecursivelyPlanRTERelation(RangeTblEntry *relationRte, RecursivePlanningContext *planningContext) { + PlannerRestrictionContext *plannerRestrictionContext = + planningContext->plannerRestrictionContext; + List *restrictionExprListOnTable = + GetRestrictInfoListForRelation(relationRte, plannerRestrictionContext); + Query *subquery = WrapRteRelationIntoSubquery(relationRte); RangeTblEntry *wrappedSubqueryEntry = makeNode(RangeTblEntry); ListCell *columnListCell = NULL; - List *colNames = NIL; + List *columnNames = NIL; char *relationName = get_rel_name(relationRte->relid); wrappedSubqueryEntry->rtekind = RTE_SUBQUERY; @@ -1094,17 +1105,23 @@ RecursivelyPlanRTERelation(RangeTblEntry *relationRte, { TargetEntry *tle = (TargetEntry *) lfirst(columnListCell); - colNames = - lappend(colNames, makeString(pstrdup(tle->resname))); + columnNames = lappend(columnNames, makeString(pstrdup(tle->resname))); } - wrappedSubqueryEntry->eref = makeAlias(pstrdup(relationName), copyObject(colNames)); - wrappedSubqueryEntry->alias = makeAlias(pstrdup(relationName), copyObject(colNames)); + wrappedSubqueryEntry->eref = + makeAlias(pstrdup(relationName), copyObject(columnNames)); + wrappedSubqueryEntry->alias = + makeAlias(pstrdup(relationName), copyObject(columnNames)); wrappedSubqueryEntry->inFromCl = true; + wrappedSubqueryEntry->subquery->jointree->quals = + (Node *) make_ands_explicit(restrictionExprListOnTable); + + /* replace RTE_RELATION with the corresponding RTE_SUBQUERY */ memcpy(relationRte, wrappedSubqueryEntry, sizeof(RangeTblEntry)); + /* simply call the main logic for recursive query planning */ RecursivelyPlanSubquery(wrappedSubqueryEntry->subquery, planningContext); } diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 42196933b..3ff606dfb 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -22,6 +22,9 @@ #include "nodes/relation.h" #include "parser/parsetree.h" #include "optimizer/pathnode.h" +#include "optimizer/var.h" +#include "optimizer/paths.h" + static uint32 attributeEquivalenceId = 1; @@ -141,6 +144,7 @@ static Relids QueryRteIdentities(Query *queryTree); static bool JoinRestrictionListExistsInContext(JoinRestriction *joinRestrictionInput, JoinRestrictionContext * joinRestrictionContext); +static bool IsParam(Node *node); /* @@ -1978,3 +1982,94 @@ JoinRestrictionListExistsInContext(JoinRestriction *joinRestrictionInput, return false; } + + +/* + * GetRestrictInfoListForRelation gets a range table entry and planner restriction context. + * The function returns a list of expressions that appear in the restriction context for + * only the given relation. And, all the varnos are set to 1 since the returned list can + * only be used with a single range table in a query. + */ +List * +GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, + PlannerRestrictionContext *plannerRestrictionContext) +{ + int rteIdentity = GetRTEIdentity(rangeTblEntry); + RelationRestrictionContext *relationRestrictionContext = + plannerRestrictionContext->relationRestrictionContext; + + Relids queryRteIdentities = bms_make_singleton(rteIdentity); + RelationRestrictionContext *filteredRelationRestrictionContext = + FilterRelationRestrictionContext(relationRestrictionContext, queryRteIdentities); + List *filteredRelationRestrictionList = + filteredRelationRestrictionContext->relationRestrictionList; + RelationRestriction *relationRestriction = + (RelationRestriction *) linitial(filteredRelationRestrictionList); + + RelOptInfo *relOptInfo = relationRestriction->relOptInfo; + List *baseRestrictInfo = relOptInfo->baserestrictinfo; + ListCell *restrictCell = NULL; + + List *restrictExprList = NIL; + + foreach(restrictCell, baseRestrictInfo) + { + RestrictInfo *restrictInfo = (RestrictInfo *) lfirst(restrictCell); + Expr *restrictionClause = restrictInfo->clause; + List *varClauses = NIL; + ListCell *varClauseCell = NULL; + Relids varnos = NULL; + + Expr *copyOfRestrictClause = NULL; + + /* we cannot process Params beacuse they are not known at this point */ + if (FindNodeCheck((Node *) restrictionClause, IsParam)) + { + continue; + } + + /* + * If the restriction involves multiple tables, we cannot add it to + * input relation's expression list. + */ + varnos = pull_varnos((Node *) restrictionClause); + if (bms_num_members(varnos) != 1) + { + continue; + } + + /* + * We're going to add this restriction expression to a subquery + * which consists of only one relation in its jointree. Thus, + * simply set the varnos accordingly. + */ + copyOfRestrictClause = (Expr *) copyObject((Node *) restrictionClause); + varClauses = pull_var_clause_default((Node *) copyOfRestrictClause); + foreach(varClauseCell, varClauses) + { + Var *column = (Var *) lfirst(varClauseCell); + + column->varno = 1; + column->varnoold = 1; + } + + restrictExprList = lappend(restrictExprList, copyOfRestrictClause); + } + + return restrictExprList; +} + + +/* + * IsParam determines whether the given node is a param. + */ +static bool +IsParam(Node *node) +{ + if (IsA(node, Param)) + { + return true; + } + + return false; +} diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index 13fda045f..2e291b175 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -42,4 +42,7 @@ extern bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceL RelationRestrictionContext * restrictionContext); +List * GetRestrictInfoListForRelation(RangeTblEntry *rte, + PlannerRestrictionContext *plannerRestrictionContext); + #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ diff --git a/src/test/regress/expected/non_colocated_subquery_joins.out b/src/test/regress/expected/non_colocated_subquery_joins.out index 37c1060a2..bfa9cfe9c 100644 --- a/src/test/regress/expected/non_colocated_subquery_joins.out +++ b/src/test/regress/expected/non_colocated_subquery_joins.out @@ -638,7 +638,7 @@ SELECT true AS valid FROM explain_json_2($$ FROM (users_table u1 JOIN users_table u2 using(value_1)) a JOIN (SELECT value_1, random() FROM users_table) as u3 USING (value_1); $$); -DEBUG: generating subplan 66_1 for subquery SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.users_table u2 +DEBUG: generating subplan 66_1 for subquery SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.users_table u2 WHERE true DEBUG: generating subplan 66_2 for subquery SELECT value_1, random() AS random FROM public.users_table DEBUG: Plan 66 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((public.users_table u1 JOIN (SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.value_1, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('66_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, value_1 integer, value_2 integer, value_3 double precision, value_4 bigint)) users_table(user_id, "time", value_1, value_2, value_3, value_4) USING (value_1)) a(value_1, user_id, "time", value_2, value_3, value_4, user_id_1, time_1, value_2_1, value_3_1, value_4_1) JOIN (SELECT intermediate_result.value_1, intermediate_result.random FROM read_intermediate_result('66_2'::text, 'binary'::citus_copy_format) intermediate_result(value_1 integer, random double precision)) u3 USING (value_1)) valid