diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 663cfd5cb..c87aa816c 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -76,6 +76,7 @@ #include "nodes/pg_list.h" #include "nodes/primnodes.h" #include "nodes/relation.h" +#include "optimizer/clauses.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/lsyscache.h" @@ -480,6 +481,7 @@ RecursivelyPlanNonColocatedJoinWalker(Node *joinNode, if (rte->rtekind == RTE_RELATION) { subquery = WrapRteRelationIntoSubquery(rte); + if (!SubqueryColocated(subquery, colocatedJoinChecker)) { RecursivelyPlanRTERelation(rte, recursivePlanningContext); @@ -1076,15 +1078,24 @@ IsLocalTableRTE(Node *node) * (e.g., users_table becomes (SELECT * FROM users_table) as users_table). * * Later, the subquery is recursively planned via RecursivelyPlanSubquery(). + * + * The function pushes down all the restriction appering on the table to the + * subquery in order to restrict the total amount of data that'd be pulled and + * pushed back. */ static void RecursivelyPlanRTERelation(RangeTblEntry *relationRte, RecursivePlanningContext *planningContext) { + PlannerRestrictionContext *plannerRestrictionContext = + planningContext->plannerRestrictionContext; + List *restrictionExprListOnTable = + GetRestrictInfoListForRelation(relationRte, plannerRestrictionContext); + Query *subquery = WrapRteRelationIntoSubquery(relationRte); RangeTblEntry *wrappedSubqueryEntry = makeNode(RangeTblEntry); ListCell *columnListCell = NULL; - List *colNames = NIL; + List *columnNames = NIL; char *relationName = get_rel_name(relationRte->relid); wrappedSubqueryEntry->rtekind = RTE_SUBQUERY; @@ -1094,17 +1105,23 @@ RecursivelyPlanRTERelation(RangeTblEntry *relationRte, { TargetEntry *tle = (TargetEntry *) lfirst(columnListCell); - colNames = - lappend(colNames, makeString(pstrdup(tle->resname))); + columnNames = lappend(columnNames, makeString(pstrdup(tle->resname))); } - wrappedSubqueryEntry->eref = makeAlias(pstrdup(relationName), copyObject(colNames)); - wrappedSubqueryEntry->alias = makeAlias(pstrdup(relationName), copyObject(colNames)); + wrappedSubqueryEntry->eref = + makeAlias(pstrdup(relationName), copyObject(columnNames)); + wrappedSubqueryEntry->alias = + makeAlias(pstrdup(relationName), copyObject(columnNames)); wrappedSubqueryEntry->inFromCl = true; + wrappedSubqueryEntry->subquery->jointree->quals = + (Node *) make_ands_explicit(restrictionExprListOnTable); + + /* replace RTE_RELATION with the corresponding RTE_SUBQUERY */ memcpy(relationRte, wrappedSubqueryEntry, sizeof(RangeTblEntry)); + /* simply call the main logic for recursive query planning */ RecursivelyPlanSubquery(wrappedSubqueryEntry->subquery, planningContext); } diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 42196933b..3ff606dfb 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -22,6 +22,9 @@ #include "nodes/relation.h" #include "parser/parsetree.h" #include "optimizer/pathnode.h" +#include "optimizer/var.h" +#include "optimizer/paths.h" + static uint32 attributeEquivalenceId = 1; @@ -141,6 +144,7 @@ static Relids QueryRteIdentities(Query *queryTree); static bool JoinRestrictionListExistsInContext(JoinRestriction *joinRestrictionInput, JoinRestrictionContext * joinRestrictionContext); +static bool IsParam(Node *node); /* @@ -1978,3 +1982,94 @@ JoinRestrictionListExistsInContext(JoinRestriction *joinRestrictionInput, return false; } + + +/* + * GetRestrictInfoListForRelation gets a range table entry and planner restriction context. + * The function returns a list of expressions that appear in the restriction context for + * only the given relation. And, all the varnos are set to 1 since the returned list can + * only be used with a single range table in a query. + */ +List * +GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, + PlannerRestrictionContext *plannerRestrictionContext) +{ + int rteIdentity = GetRTEIdentity(rangeTblEntry); + RelationRestrictionContext *relationRestrictionContext = + plannerRestrictionContext->relationRestrictionContext; + + Relids queryRteIdentities = bms_make_singleton(rteIdentity); + RelationRestrictionContext *filteredRelationRestrictionContext = + FilterRelationRestrictionContext(relationRestrictionContext, queryRteIdentities); + List *filteredRelationRestrictionList = + filteredRelationRestrictionContext->relationRestrictionList; + RelationRestriction *relationRestriction = + (RelationRestriction *) linitial(filteredRelationRestrictionList); + + RelOptInfo *relOptInfo = relationRestriction->relOptInfo; + List *baseRestrictInfo = relOptInfo->baserestrictinfo; + ListCell *restrictCell = NULL; + + List *restrictExprList = NIL; + + foreach(restrictCell, baseRestrictInfo) + { + RestrictInfo *restrictInfo = (RestrictInfo *) lfirst(restrictCell); + Expr *restrictionClause = restrictInfo->clause; + List *varClauses = NIL; + ListCell *varClauseCell = NULL; + Relids varnos = NULL; + + Expr *copyOfRestrictClause = NULL; + + /* we cannot process Params beacuse they are not known at this point */ + if (FindNodeCheck((Node *) restrictionClause, IsParam)) + { + continue; + } + + /* + * If the restriction involves multiple tables, we cannot add it to + * input relation's expression list. + */ + varnos = pull_varnos((Node *) restrictionClause); + if (bms_num_members(varnos) != 1) + { + continue; + } + + /* + * We're going to add this restriction expression to a subquery + * which consists of only one relation in its jointree. Thus, + * simply set the varnos accordingly. + */ + copyOfRestrictClause = (Expr *) copyObject((Node *) restrictionClause); + varClauses = pull_var_clause_default((Node *) copyOfRestrictClause); + foreach(varClauseCell, varClauses) + { + Var *column = (Var *) lfirst(varClauseCell); + + column->varno = 1; + column->varnoold = 1; + } + + restrictExprList = lappend(restrictExprList, copyOfRestrictClause); + } + + return restrictExprList; +} + + +/* + * IsParam determines whether the given node is a param. + */ +static bool +IsParam(Node *node) +{ + if (IsA(node, Param)) + { + return true; + } + + return false; +} diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index 13fda045f..2e291b175 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -42,4 +42,7 @@ extern bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceL RelationRestrictionContext * restrictionContext); +List * GetRestrictInfoListForRelation(RangeTblEntry *rte, + PlannerRestrictionContext *plannerRestrictionContext); + #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ diff --git a/src/test/regress/expected/non_colocated_subquery_joins.out b/src/test/regress/expected/non_colocated_subquery_joins.out index 37c1060a2..bfa9cfe9c 100644 --- a/src/test/regress/expected/non_colocated_subquery_joins.out +++ b/src/test/regress/expected/non_colocated_subquery_joins.out @@ -638,7 +638,7 @@ SELECT true AS valid FROM explain_json_2($$ FROM (users_table u1 JOIN users_table u2 using(value_1)) a JOIN (SELECT value_1, random() FROM users_table) as u3 USING (value_1); $$); -DEBUG: generating subplan 66_1 for subquery SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.users_table u2 +DEBUG: generating subplan 66_1 for subquery SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.users_table u2 WHERE true DEBUG: generating subplan 66_2 for subquery SELECT value_1, random() AS random FROM public.users_table DEBUG: Plan 66 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((public.users_table u1 JOIN (SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.value_1, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('66_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, value_1 integer, value_2 integer, value_3 double precision, value_4 bigint)) users_table(user_id, "time", value_1, value_2, value_3, value_4) USING (value_1)) a(value_1, user_id, "time", value_2, value_3, value_4, user_id_1, time_1, value_2_1, value_3_1, value_4_1) JOIN (SELECT intermediate_result.value_1, intermediate_result.random FROM read_intermediate_result('66_2'::text, 'binary'::citus_copy_format) intermediate_result(value_1 integer, random double precision)) u3 USING (value_1)) valid