mirror of https://github.com/citusdata/citus.git
Merge pull request #1833 from citusdata/granular_subquery_pushdown
Refactor relation restriction equivalence checks to be more granular for subqueriespull/1826/head
commit
74b9bc409c
|
@ -45,9 +45,11 @@ static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *origin
|
|||
Query *query, ParamListInfo boundParams,
|
||||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext);
|
||||
static void AdjustParseTree(Query *parse, bool assignRTEIdentities,
|
||||
bool setPartitionedTablesInherited);
|
||||
|
||||
static void AssignRTEIdentities(Query *queryTree);
|
||||
static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
|
||||
static void AdjustPartitioningForDistributedPlanning(Query *parse,
|
||||
bool setPartitionedTablesInherited);
|
||||
static PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
|
||||
DistributedPlan *distributedPlan);
|
||||
static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan,
|
||||
|
@ -71,20 +73,23 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
bool needsDistributedPlanning = NeedsDistributedPlanning(parse);
|
||||
Query *originalQuery = NULL;
|
||||
PlannerRestrictionContext *plannerRestrictionContext = NULL;
|
||||
bool assignRTEIdentities = false;
|
||||
bool setPartitionedTablesInherited = false;
|
||||
|
||||
/*
|
||||
* standard_planner scribbles on it's input, but for deparsing we need the
|
||||
* unmodified form. So copy once we're sure it's a distributed query.
|
||||
* unmodified form. Note that we keep RTE_RELATIONs with their identities
|
||||
* set, which doesn't break our goals, but, prevents us keeping an extra copy
|
||||
* of the query tree. Note that we copy the query tree once we're sure it's a
|
||||
* distributed query.
|
||||
*/
|
||||
if (needsDistributedPlanning)
|
||||
{
|
||||
originalQuery = copyObject(parse);
|
||||
assignRTEIdentities = true;
|
||||
setPartitionedTablesInherited = false;
|
||||
|
||||
AdjustParseTree(parse, assignRTEIdentities, setPartitionedTablesInherited);
|
||||
AssignRTEIdentities(parse);
|
||||
originalQuery = copyObject(parse);
|
||||
|
||||
AdjustPartitioningForDistributedPlanning(parse, setPartitionedTablesInherited);
|
||||
}
|
||||
|
||||
/* create a restriction context and put it at the end if context list */
|
||||
|
@ -114,10 +119,9 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
|
||||
if (needsDistributedPlanning)
|
||||
{
|
||||
assignRTEIdentities = false;
|
||||
setPartitionedTablesInherited = true;
|
||||
|
||||
AdjustParseTree(parse, assignRTEIdentities, setPartitionedTablesInherited);
|
||||
AdjustPartitioningForDistributedPlanning(parse, setPartitionedTablesInherited);
|
||||
}
|
||||
|
||||
/* remove the context from the context list */
|
||||
|
@ -144,18 +148,15 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
|||
|
||||
|
||||
/*
|
||||
* AdjustParseTree function modifies query tree by adding RTE identities to the
|
||||
* RTE_RELATIONs and changing inh flag and relkind of partitioned tables. We
|
||||
* perform these operations to ensure PostgreSQL's standard planner behaves as
|
||||
* we need.
|
||||
* AssignRTEIdentities function modifies query tree by adding RTE identities to the
|
||||
* RTE_RELATIONs.
|
||||
*
|
||||
* Please note that, we want to avoid modifying query tree as much as possible
|
||||
* because if PostgreSQL changes the way it uses modified fields, that may break
|
||||
* our logic.
|
||||
*/
|
||||
static void
|
||||
AdjustParseTree(Query *queryTree, bool assignRTEIdentities,
|
||||
bool setPartitionedTablesInherited)
|
||||
AssignRTEIdentities(Query *queryTree)
|
||||
{
|
||||
List *rangeTableList = NIL;
|
||||
ListCell *rangeTableCell = NULL;
|
||||
|
@ -177,10 +178,38 @@ AdjustParseTree(Query *queryTree, bool assignRTEIdentities,
|
|||
* Note that we're only interested in RTE_RELATIONs and thus assigning
|
||||
* identifiers to those RTEs only.
|
||||
*/
|
||||
if (assignRTEIdentities && rangeTableEntry->rtekind == RTE_RELATION)
|
||||
if (rangeTableEntry->rtekind == RTE_RELATION)
|
||||
{
|
||||
AssignRTEIdentity(rangeTableEntry, rteIdentifier++);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AdjustPartitioningForDistributedPlanning function modifies query tree by
|
||||
* changing inh flag and relkind of partitioned tables. We want Postgres to
|
||||
* treat partitioned tables as regular relations (i.e. we do not want to
|
||||
* expand them to their partitions) since it breaks Citus planning in different
|
||||
* ways. We let anything related to partitioning happen on the shards.
|
||||
*
|
||||
* Please note that, we want to avoid modifying query tree as much as possible
|
||||
* because if PostgreSQL changes the way it uses modified fields, that may break
|
||||
* our logic.
|
||||
*/
|
||||
static void
|
||||
AdjustPartitioningForDistributedPlanning(Query *queryTree,
|
||||
bool setPartitionedTablesInherited)
|
||||
{
|
||||
List *rangeTableList = NIL;
|
||||
ListCell *rangeTableCell = NULL;
|
||||
|
||||
/* extract range table entries for simple relations only */
|
||||
ExtractRangeTableEntryWalker((Node *) queryTree, &rangeTableList);
|
||||
|
||||
foreach(rangeTableCell, rangeTableList)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||
|
||||
/*
|
||||
* We want Postgres to behave partitioned tables as regular relations
|
||||
|
|
|
@ -85,6 +85,17 @@ static DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryPushdown(Query *
|
|||
PlannerRestrictionContext
|
||||
*
|
||||
plannerRestrictionContext);
|
||||
static RelationRestrictionContext * FilterRelationRestrictionContext(
|
||||
RelationRestrictionContext *relationRestrictionContext,
|
||||
Relids
|
||||
queryRteIdentities);
|
||||
static JoinRestrictionContext * FilterJoinRestrictionContext(
|
||||
JoinRestrictionContext *joinRestrictionContext, Relids
|
||||
queryRteIdentities);
|
||||
static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int
|
||||
rangeTableArrayLength, Relids
|
||||
queryRteIdentities);
|
||||
static Relids QueryRteIdentities(Query *queryTree);
|
||||
static DeferredErrorMessage * DeferErrorIfUnsupportedSublinkAndReferenceTable(
|
||||
Query *queryTree);
|
||||
static DeferredErrorMessage * DeferErrorIfUnsupportedFilters(Query *subquery);
|
||||
|
@ -704,6 +715,210 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* FilterPlannerRestrictionForQuery gets a planner restriction context and
|
||||
* set of rte identities. It returns the restrictions that that appear
|
||||
* in the queryRteIdentities and returns a newly allocated
|
||||
* PlannerRestrictionContext. The function also sets all the other fields of
|
||||
* the PlannerRestrictionContext with respect to the filtered restrictions.
|
||||
*/
|
||||
PlannerRestrictionContext *
|
||||
FilterPlannerRestrictionForQuery(PlannerRestrictionContext *plannerRestrictionContext,
|
||||
Query *query)
|
||||
{
|
||||
PlannerRestrictionContext *filteredPlannerRestrictionContext = NULL;
|
||||
int referenceRelationCount = 0;
|
||||
int totalRelationCount = 0;
|
||||
|
||||
Relids queryRteIdentities = QueryRteIdentities(query);
|
||||
|
||||
RelationRestrictionContext *relationRestrictionContext =
|
||||
plannerRestrictionContext->relationRestrictionContext;
|
||||
JoinRestrictionContext *joinRestrictionContext =
|
||||
plannerRestrictionContext->joinRestrictionContext;
|
||||
|
||||
RelationRestrictionContext *filteredRelationRestrictionContext =
|
||||
FilterRelationRestrictionContext(relationRestrictionContext, queryRteIdentities);
|
||||
|
||||
JoinRestrictionContext *filtererdJoinRestrictionContext =
|
||||
FilterJoinRestrictionContext(joinRestrictionContext, queryRteIdentities);
|
||||
|
||||
/* allocate the filtered planner restriction context and set all the fields */
|
||||
filteredPlannerRestrictionContext = palloc0(sizeof(PlannerRestrictionContext));
|
||||
|
||||
filteredPlannerRestrictionContext->memoryContext =
|
||||
plannerRestrictionContext->memoryContext;
|
||||
|
||||
totalRelationCount = list_length(
|
||||
filteredRelationRestrictionContext->relationRestrictionList);
|
||||
referenceRelationCount = ReferenceRelationCount(filteredRelationRestrictionContext);
|
||||
|
||||
filteredRelationRestrictionContext->allReferenceTables =
|
||||
(totalRelationCount == referenceRelationCount);
|
||||
|
||||
/* we currently don't support local relations and we cannot come up to this point */
|
||||
filteredRelationRestrictionContext->hasLocalRelation = false;
|
||||
filteredRelationRestrictionContext->hasDistributedRelation = true;
|
||||
|
||||
/* finally set the relation and join restriction contexts */
|
||||
filteredPlannerRestrictionContext->relationRestrictionContext =
|
||||
filteredRelationRestrictionContext;
|
||||
filteredPlannerRestrictionContext->joinRestrictionContext =
|
||||
filtererdJoinRestrictionContext;
|
||||
|
||||
return filteredPlannerRestrictionContext;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FilterRelationRestrictionContext gets a relation restriction context and
|
||||
* set of rte identities. It returns the relation restrictions that that appear
|
||||
* in the queryRteIdentities and returns a newly allocated
|
||||
* RelationRestrictionContext.
|
||||
*/
|
||||
static RelationRestrictionContext *
|
||||
FilterRelationRestrictionContext(RelationRestrictionContext *relationRestrictionContext,
|
||||
Relids queryRteIdentities)
|
||||
{
|
||||
RelationRestrictionContext *filteredRestrictionContext =
|
||||
palloc0(sizeof(RelationRestrictionContext));
|
||||
|
||||
ListCell *relationRestrictionCell = NULL;
|
||||
|
||||
foreach(relationRestrictionCell, relationRestrictionContext->relationRestrictionList)
|
||||
{
|
||||
RelationRestriction *relationRestriction =
|
||||
(RelationRestriction *) lfirst(relationRestrictionCell);
|
||||
|
||||
int rteIdentity = GetRTEIdentity(relationRestriction->rte);
|
||||
|
||||
if (bms_is_member(rteIdentity, queryRteIdentities))
|
||||
{
|
||||
filteredRestrictionContext->relationRestrictionList =
|
||||
lappend(filteredRestrictionContext->relationRestrictionList,
|
||||
relationRestriction);
|
||||
}
|
||||
}
|
||||
|
||||
return filteredRestrictionContext;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FilterJoinRestrictionContext gets a join restriction context and
|
||||
* set of rte identities. It returns the join restrictions that that appear
|
||||
* in the queryRteIdentities and returns a newly allocated
|
||||
* JoinRestrictionContext.
|
||||
*
|
||||
* Note that the join restriction is added to the return context as soon as
|
||||
* any range table entry that appear in the join belongs to queryRteIdentities.
|
||||
*/
|
||||
static JoinRestrictionContext *
|
||||
FilterJoinRestrictionContext(JoinRestrictionContext *joinRestrictionContext, Relids
|
||||
queryRteIdentities)
|
||||
{
|
||||
JoinRestrictionContext *filtererdJoinRestrictionContext =
|
||||
palloc0(sizeof(JoinRestrictionContext));
|
||||
|
||||
ListCell *joinRestrictionCell = NULL;
|
||||
|
||||
foreach(joinRestrictionCell, joinRestrictionContext->joinRestrictionList)
|
||||
{
|
||||
JoinRestriction *joinRestriction =
|
||||
(JoinRestriction *) lfirst(joinRestrictionCell);
|
||||
RangeTblEntry **rangeTableEntries =
|
||||
joinRestriction->plannerInfo->simple_rte_array;
|
||||
int rangeTableArrayLength = joinRestriction->plannerInfo->simple_rel_array_size;
|
||||
|
||||
if (RangeTableArrayContainsAnyRTEIdentities(rangeTableEntries,
|
||||
rangeTableArrayLength,
|
||||
queryRteIdentities))
|
||||
{
|
||||
filtererdJoinRestrictionContext->joinRestrictionList = lappend(
|
||||
filtererdJoinRestrictionContext->joinRestrictionList,
|
||||
joinRestriction);
|
||||
}
|
||||
}
|
||||
|
||||
return filtererdJoinRestrictionContext;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RangeTableArrayContainsAnyRTEIdentities returns true if any of the range table entries
|
||||
* int rangeTableEntries array is an range table relation specified in queryRteIdentities.
|
||||
*/
|
||||
static bool
|
||||
RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int
|
||||
rangeTableArrayLength, Relids queryRteIdentities)
|
||||
{
|
||||
int rteIndex = 0;
|
||||
|
||||
/* simple_rte_array starts from 1, see plannerInfo struct */
|
||||
for (rteIndex = 1; rteIndex < rangeTableArrayLength; ++rteIndex)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = rangeTableEntries[rteIndex];
|
||||
List *rangeTableRelationList = NULL;
|
||||
ListCell *rteRelationCell = NULL;
|
||||
|
||||
/*
|
||||
* Get list of all RTE_RELATIONs in the given range table entry
|
||||
* (i.e.,rangeTableEntry could be a subquery where we're interested
|
||||
* in relations).
|
||||
*/
|
||||
ExtractRangeTableRelationWalker((Node *) rangeTableEntry,
|
||||
&rangeTableRelationList);
|
||||
|
||||
foreach(rteRelationCell, rangeTableRelationList)
|
||||
{
|
||||
RangeTblEntry *rteRelation = (RangeTblEntry *) lfirst(rteRelationCell);
|
||||
int rteIdentity = 0;
|
||||
|
||||
Assert(rteRelation->rtekind == RTE_RELATION);
|
||||
|
||||
rteIdentity = GetRTEIdentity(rteRelation);
|
||||
if (bms_is_member(rteIdentity, queryRteIdentities))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* QueryRteIdentities gets a queryTree, find get all the rte identities assigned by
|
||||
* us.
|
||||
*/
|
||||
static Relids
|
||||
QueryRteIdentities(Query *queryTree)
|
||||
{
|
||||
List *rangeTableList = NULL;
|
||||
ListCell *rangeTableCell = NULL;
|
||||
Relids queryRteIdentities = NULL;
|
||||
|
||||
/* extract range table entries for simple relations only */
|
||||
ExtractRangeTableRelationWalker((Node *) queryTree, &rangeTableList);
|
||||
|
||||
foreach(rangeTableCell, rangeTableList)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||
int rteIdentity = 0;
|
||||
|
||||
/* we're only interested in relations */
|
||||
Assert(rangeTableEntry->rtekind == RTE_RELATION);
|
||||
|
||||
rteIdentity = GetRTEIdentity(rangeTableEntry);
|
||||
|
||||
queryRteIdentities = bms_add_member(queryRteIdentities, rteIdentity);
|
||||
}
|
||||
|
||||
return queryRteIdentities;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeferErrorIfUnsupportedSublinkAndReferenceTable returns a deferred error if the
|
||||
* given query is not suitable for subquery pushdown.
|
||||
|
@ -3385,24 +3600,46 @@ NeedsDistributedPlanning(Query *queryTree)
|
|||
|
||||
|
||||
/*
|
||||
* ExtractRangeTableRelationWalker gathers all range table entries in a query
|
||||
* and filters them to preserve only those of the RTE_RELATION type.
|
||||
* ExtractRangeTableRelationWalker gathers all range table relation entries
|
||||
* in a query.
|
||||
*/
|
||||
bool
|
||||
ExtractRangeTableRelationWalker(Node *node, List **rangeTableRelationList)
|
||||
{
|
||||
List *rangeTableList = NIL;
|
||||
ListCell *rangeTableCell = NULL;
|
||||
bool walkIsComplete = ExtractRangeTableEntryWalker(node, &rangeTableList);
|
||||
bool walkIsComplete = false;
|
||||
|
||||
foreach(rangeTableCell, rangeTableList)
|
||||
if (node == NULL)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||
if (rangeTableEntry->rtekind == RTE_RELATION &&
|
||||
rangeTableEntry->relkind != RELKIND_VIEW)
|
||||
return false;
|
||||
}
|
||||
|
||||
if (IsA(node, RangeTblEntry))
|
||||
{
|
||||
RangeTblEntry *rangeTable = (RangeTblEntry *) node;
|
||||
|
||||
if (rangeTable->rtekind == RTE_RELATION && rangeTable->relkind != RELKIND_VIEW)
|
||||
{
|
||||
(*rangeTableRelationList) = lappend(*rangeTableRelationList, rangeTableEntry);
|
||||
(*rangeTableRelationList) = lappend(*rangeTableRelationList, rangeTable);
|
||||
|
||||
walkIsComplete = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
walkIsComplete = range_table_walker(list_make1(rangeTable),
|
||||
ExtractRangeTableRelationWalker,
|
||||
rangeTableRelationList, 0);
|
||||
}
|
||||
}
|
||||
else if (IsA(node, Query))
|
||||
{
|
||||
walkIsComplete = query_tree_walker((Query *) node,
|
||||
ExtractRangeTableRelationWalker,
|
||||
rangeTableRelationList, QTW_EXAMINE_RTES);
|
||||
}
|
||||
else
|
||||
{
|
||||
walkIsComplete = expression_tree_walker(node, ExtractRangeTableRelationWalker,
|
||||
rangeTableRelationList);
|
||||
}
|
||||
|
||||
return walkIsComplete;
|
||||
|
@ -3431,7 +3668,8 @@ ExtractRangeTableEntryWalker(Node *node, List **rangeTableList)
|
|||
}
|
||||
else if (IsA(node, Query))
|
||||
{
|
||||
walkIsComplete = query_tree_walker((Query *) node, ExtractRangeTableEntryWalker,
|
||||
walkIsComplete = query_tree_walker((Query *) node,
|
||||
ExtractRangeTableEntryWalker,
|
||||
rangeTableList, QTW_EXAMINE_RTES);
|
||||
}
|
||||
else
|
||||
|
|
|
@ -62,7 +62,6 @@ typedef struct AttributeEquivalenceClassMember
|
|||
} AttributeEquivalenceClassMember;
|
||||
|
||||
|
||||
static uint32 ReferenceRelationCount(RelationRestrictionContext *restrictionContext);
|
||||
static Var * FindTranslatedVar(List *appendRelList, Oid relationOid,
|
||||
Index relationRteIndex, Index *partitionKeyIndex);
|
||||
static bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
|
||||
|
@ -408,7 +407,7 @@ RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *
|
|||
* ReferenceRelationCount iterates over the relations and returns the reference table
|
||||
* relation count.
|
||||
*/
|
||||
static uint32
|
||||
uint32
|
||||
ReferenceRelationCount(RelationRestrictionContext *restrictionContext)
|
||||
{
|
||||
ListCell *relationRestrictionCell = NULL;
|
||||
|
|
|
@ -187,6 +187,9 @@ extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *query
|
|||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext,
|
||||
ParamListInfo boundParams);
|
||||
extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery(
|
||||
PlannerRestrictionContext *plannerRestrictionContext,
|
||||
Query *query);
|
||||
extern bool SafeToPushdownWindowFunction(Query *query, StringInfo *errorDetail);
|
||||
extern bool TargetListOnPartitionColumn(Query *query, List *targetEntryList);
|
||||
extern bool NeedsDistributedPlanning(Query *queryTree);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
extern bool ContainsUnionSubquery(Query *queryTree);
|
||||
extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *
|
||||
plannerRestrictionContext);
|
||||
extern uint32 ReferenceRelationCount(RelationRestrictionContext *restrictionContext);
|
||||
extern bool SafeToPushdownUnionSubquery(RelationRestrictionContext *restrictionContext);
|
||||
extern List * RelationIdList(Query *query);
|
||||
|
||||
|
|
Loading…
Reference in New Issue