Pushdown filters on recursive relation planning

PostgreSQL already keeps track of the restrictions that are on the relation.
With this commit, Citus uses that information and pushes down the filters
to the subquery that is recursively planned for the table that is in
considiration.
recursively_plan_tables_w
Onder Kalaci 2018-11-26 00:05:28 +03:00
parent 4d1165848b
commit de5e36e850
4 changed files with 121 additions and 6 deletions

View File

@ -76,6 +76,7 @@
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "nodes/primnodes.h" #include "nodes/primnodes.h"
#include "nodes/relation.h" #include "nodes/relation.h"
#include "optimizer/clauses.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/guc.h" #include "utils/guc.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
@ -478,6 +479,7 @@ RecursivelyPlanNonColocatedJoinWalker(Node *joinNode,
if (rte->rtekind == RTE_RELATION) if (rte->rtekind == RTE_RELATION)
{ {
subquery = WrapRteRelationIntoSubquery(rte); subquery = WrapRteRelationIntoSubquery(rte);
if (!SubqueryColocated(subquery, colocatedJoinChecker)) if (!SubqueryColocated(subquery, colocatedJoinChecker))
{ {
RecursivelyPlanRTERelation(rte, recursivePlanningContext); RecursivelyPlanRTERelation(rte, recursivePlanningContext);
@ -1090,15 +1092,24 @@ IsLocalTableRTE(Node *node)
* (e.g., users_table becomes (SELECT * FROM users_table) as users_table). * (e.g., users_table becomes (SELECT * FROM users_table) as users_table).
* *
* Later, the subquery is recursively planned via RecursivelyPlanSubquery(). * Later, the subquery is recursively planned via RecursivelyPlanSubquery().
*
* The function pushes down all the restriction appering on the table to the
* subquery in order to restrict the total amount of data that'd be pulled and
* pushed back.
*/ */
static void static void
RecursivelyPlanRTERelation(RangeTblEntry *relationRte, RecursivelyPlanRTERelation(RangeTblEntry *relationRte,
RecursivePlanningContext *planningContext) RecursivePlanningContext *planningContext)
{ {
PlannerRestrictionContext *plannerRestrictionContext =
planningContext->plannerRestrictionContext;
List *restrictionExprListOnTable =
GetRestrictInfoListForRelation(relationRte, plannerRestrictionContext);
Query *subquery = WrapRteRelationIntoSubquery(relationRte); Query *subquery = WrapRteRelationIntoSubquery(relationRte);
RangeTblEntry *wrappedSubqueryEntry = makeNode(RangeTblEntry); RangeTblEntry *wrappedSubqueryEntry = makeNode(RangeTblEntry);
ListCell *columnListCell = NULL; ListCell *columnListCell = NULL;
List *colNames = NIL; List *columnNames = NIL;
char *relationName = get_rel_name(relationRte->relid); char *relationName = get_rel_name(relationRte->relid);
wrappedSubqueryEntry->rtekind = RTE_SUBQUERY; wrappedSubqueryEntry->rtekind = RTE_SUBQUERY;
@ -1108,17 +1119,23 @@ RecursivelyPlanRTERelation(RangeTblEntry *relationRte,
{ {
TargetEntry *tle = (TargetEntry *) lfirst(columnListCell); TargetEntry *tle = (TargetEntry *) lfirst(columnListCell);
colNames = columnNames = lappend(columnNames, makeString(pstrdup(tle->resname)));
lappend(colNames, makeString(pstrdup(tle->resname)));
} }
wrappedSubqueryEntry->eref = makeAlias(pstrdup(relationName), copyObject(colNames)); wrappedSubqueryEntry->eref =
wrappedSubqueryEntry->alias = makeAlias(pstrdup(relationName), copyObject(colNames)); makeAlias(pstrdup(relationName), copyObject(columnNames));
wrappedSubqueryEntry->alias =
makeAlias(pstrdup(relationName), copyObject(columnNames));
wrappedSubqueryEntry->inFromCl = true; wrappedSubqueryEntry->inFromCl = true;
wrappedSubqueryEntry->subquery->jointree->quals =
(Node *) make_ands_explicit(restrictionExprListOnTable);
/* replace RTE_RELATION with the corresponding RTE_SUBQUERY */
memcpy(relationRte, wrappedSubqueryEntry, sizeof(RangeTblEntry)); memcpy(relationRte, wrappedSubqueryEntry, sizeof(RangeTblEntry));
/* simply call the main logic for recursive query planning */
RecursivelyPlanSubquery(wrappedSubqueryEntry->subquery, planningContext); RecursivelyPlanSubquery(wrappedSubqueryEntry->subquery, planningContext);
} }

View File

@ -22,6 +22,9 @@
#include "nodes/relation.h" #include "nodes/relation.h"
#include "parser/parsetree.h" #include "parser/parsetree.h"
#include "optimizer/pathnode.h" #include "optimizer/pathnode.h"
#include "optimizer/var.h"
#include "optimizer/paths.h"
static uint32 attributeEquivalenceId = 1; static uint32 attributeEquivalenceId = 1;
@ -141,6 +144,7 @@ static Relids QueryRteIdentities(Query *queryTree);
static bool JoinRestrictionListExistsInContext(JoinRestriction *joinRestrictionInput, static bool JoinRestrictionListExistsInContext(JoinRestriction *joinRestrictionInput,
JoinRestrictionContext * JoinRestrictionContext *
joinRestrictionContext); joinRestrictionContext);
static bool IsParam(Node *node);
/* /*
@ -1978,3 +1982,94 @@ JoinRestrictionListExistsInContext(JoinRestriction *joinRestrictionInput,
return false; return false;
} }
/*
* GetRestrictInfoListForRelation gets a range table entry and planner restriction context.
* The function returns a list of expressions that appear in the restriction context for
* only the given relation. And, all the varnos are set to 1 since the returned list can
* only be used with a single range table in a query.
*/
List *
GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry,
PlannerRestrictionContext *plannerRestrictionContext)
{
int rteIdentity = GetRTEIdentity(rangeTblEntry);
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
Relids queryRteIdentities = bms_make_singleton(rteIdentity);
RelationRestrictionContext *filteredRelationRestrictionContext =
FilterRelationRestrictionContext(relationRestrictionContext, queryRteIdentities);
List *filteredRelationRestrictionList =
filteredRelationRestrictionContext->relationRestrictionList;
RelationRestriction *relationRestriction =
(RelationRestriction *) linitial(filteredRelationRestrictionList);
RelOptInfo *relOptInfo = relationRestriction->relOptInfo;
List *baseRestrictInfo = relOptInfo->baserestrictinfo;
ListCell *restrictCell = NULL;
List *restrictExprList = NIL;
foreach(restrictCell, baseRestrictInfo)
{
RestrictInfo *restrictInfo = (RestrictInfo *) lfirst(restrictCell);
Expr *restrictionClause = restrictInfo->clause;
List *varClauses = NIL;
ListCell *varClauseCell = NULL;
Relids varnos = NULL;
Expr *copyOfRestrictClause = NULL;
/* we cannot process Params beacuse they are not known at this point */
if (FindNodeCheck((Node *) restrictionClause, IsParam))
{
continue;
}
/*
* If the restriction involves multiple tables, we cannot add it to
* input relation's expression list.
*/
varnos = pull_varnos((Node *) restrictionClause);
if (bms_num_members(varnos) != 1)
{
continue;
}
/*
* We're going to add this restriction expression to a subquery
* which consists of only one relation in its jointree. Thus,
* simply set the varnos accordingly.
*/
copyOfRestrictClause = (Expr *) copyObject((Node *) restrictionClause);
varClauses = pull_var_clause_default((Node *) copyOfRestrictClause);
foreach(varClauseCell, varClauses)
{
Var *column = (Var *) lfirst(varClauseCell);
column->varno = 1;
column->varnoold = 1;
}
restrictExprList = lappend(restrictExprList, copyOfRestrictClause);
}
return restrictExprList;
}
/*
* IsParam determines whether the given node is a param.
*/
static bool
IsParam(Node *node)
{
if (IsA(node, Param))
{
return true;
}
return false;
}

View File

@ -42,4 +42,7 @@ extern bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceL
RelationRestrictionContext * RelationRestrictionContext *
restrictionContext); restrictionContext);
List * GetRestrictInfoListForRelation(RangeTblEntry *rte,
PlannerRestrictionContext *plannerRestrictionContext);
#endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */

View File

@ -642,7 +642,7 @@ SELECT true AS valid FROM explain_json_2($$
FROM FROM
(users_table u1 JOIN users_table u2 using(value_1)) a JOIN (SELECT value_1, random() FROM users_table) as u3 USING (value_1); (users_table u1 JOIN users_table u2 using(value_1)) a JOIN (SELECT value_1, random() FROM users_table) as u3 USING (value_1);
$$); $$);
DEBUG: generating subplan 66_1 for subquery SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.users_table u2 DEBUG: generating subplan 66_1 for subquery SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.users_table u2 WHERE true
DEBUG: generating subplan 66_2 for subquery SELECT value_1, random() AS random FROM public.users_table DEBUG: generating subplan 66_2 for subquery SELECT value_1, random() AS random FROM public.users_table
DEBUG: Plan 66 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((public.users_table u1 JOIN (SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.value_1, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('66_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, value_1 integer, value_2 integer, value_3 double precision, value_4 bigint)) users_table(user_id, "time", value_1, value_2, value_3, value_4) USING (value_1)) a(value_1, user_id, "time", value_2, value_3, value_4, user_id_1, time_1, value_2_1, value_3_1, value_4_1) JOIN (SELECT intermediate_result.value_1, intermediate_result.random FROM read_intermediate_result('66_2'::text, 'binary'::citus_copy_format) intermediate_result(value_1 integer, random double precision)) u3 USING (value_1)) DEBUG: Plan 66 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((public.users_table u1 JOIN (SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.value_1, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('66_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, value_1 integer, value_2 integer, value_3 double precision, value_4 bigint)) users_table(user_id, "time", value_1, value_2, value_3, value_4) USING (value_1)) a(value_1, user_id, "time", value_2, value_3, value_4, user_id_1, time_1, value_2_1, value_3_1, value_4_1) JOIN (SELECT intermediate_result.value_1, intermediate_result.random FROM read_intermediate_result('66_2'::text, 'binary'::citus_copy_format) intermediate_result(value_1 integer, random double precision)) u3 USING (value_1))
valid valid