Merge pull request #2107 from citusdata/simplify_optimizer_phase_3

Simplify optimizer [Phase 3]  - Move processing each part of the query into its own functions
pull/1991/head
Önder Kalacı 2018-04-29 12:56:17 +03:00 committed by GitHub
commit b1e6636398
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 609 additions and 219 deletions

View File

@ -123,6 +123,10 @@ GroupedByDisjointPartitionColumn(List *tableNodeList, MultiExtendedOp *opNode)
} }
/*
* ExtendedOpNodeContainsRepartitionSubquery is a utility function that
* returns true if the extended op node contains a re-partition subquery.
*/
static bool static bool
ExtendedOpNodeContainsRepartitionSubquery(MultiExtendedOp *originalOpNode) ExtendedOpNodeContainsRepartitionSubquery(MultiExtendedOp *originalOpNode)
{ {

View File

@ -75,6 +75,80 @@ typedef struct WorkerAggregateWalkerContext
} WorkerAggregateWalkerContext; } WorkerAggregateWalkerContext;
/*
* QueryTargetList encapsulates the necessary fields to form
* worker query's target list.
*/
typedef struct QueryTargetList
{
List *targetEntryList; /* the list of target entries */
AttrNumber targetProjectionNumber; /* the index of the last entry */
} QueryTargetList;
/*
* QueryGroupClause encapsulates the necessary fields to form
* worker query's group by clause.
*/
typedef struct QueryGroupClause
{
List *groupClauseList; /* the list of group clause entries */
Index *nextSortGroupRefIndex; /* pointer to the index of the largest sort group reference index */
} QueryGroupClause;
/*
* QueryDistinctClause encapsulates the necessary fields to form
* worker query's DISTINCT/DISTINCT ON parts.
*/
typedef struct QueryDistinctClause
{
List *workerDistinctClause; /* the list of distinct clause entries */
bool workerHasDistinctOn;
} QueryDistinctClause;
/*
* QueryWindowClause encapsulates the necessary fields to form
* worker query's window clause.
*/
typedef struct QueryWindowClause
{
List *workerWindowClauseList; /* the list of window clause entries */
bool hasWindowFunctions;
Index *nextSortGroupRefIndex; /* see QueryGroupClause */
} QueryWindowClause;
/*
* QueryOrderByLimit encapsulates the necessary fields to form
* worker query's order by and limit clauses. Note that we don't
* keep track of limit offset clause since it is incorporated
* into the limit clause during the processing.
*/
typedef struct QueryOrderByLimit
{
Node *workerLimitCount;
List *workerSortClauseList;
Index *nextSortGroupRefIndex; /* see QueryGroupClause */
} QueryOrderByLimit;
/*
* OrderByLimitReference a structure that is used commonly while
* processing sort and limit clauses.
*/
typedef struct OrderByLimitReference
{
bool groupedByDisjointPartitionColumn;
bool groupClauseIsEmpty;
bool sortClauseIsEmpty;
bool hasOrderByAggregate;
bool canApproximate;
bool hasDistinctOn;
} OrderByLimitReference;
/* Local functions forward declarations */ /* Local functions forward declarations */
static MultiSelect * AndSelectNode(MultiSelect *selectNode); static MultiSelect * AndSelectNode(MultiSelect *selectNode);
static MultiSelect * OrSelectNode(MultiSelect *selectNode); static MultiSelect * OrSelectNode(MultiSelect *selectNode);
@ -129,20 +203,49 @@ static Expr * AddTypeConversion(Node *originalAggregate, Node *newExpression);
static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
ExtendedOpNodeProperties * ExtendedOpNodeProperties *
extendedOpNodeProperties); extendedOpNodeProperties);
static bool TargetListHasAggragates(List *targetEntryList);
static void ProcessTargetListForWorkerQuery(List *targetEntryList,
ExtendedOpNodeProperties *
extendedOpNodeProperties,
QueryTargetList *queryTargetList,
QueryGroupClause *queryGroupClause);
static void ProcessHavingClauseForWorkerQuery(Node *havingQual,
ExtendedOpNodeProperties *
extendedOpNodeProperties,
Node **workerHavingQual,
QueryTargetList *queryTargetList,
QueryGroupClause *queryGroupClause);
static void ProcessDistinctClauseForWorkerQuery(List *distinctClause, bool hasDistinctOn,
List *groupClauseList,
bool queryHasAggregates,
QueryDistinctClause *queryDistinctClause,
bool *distinctPreventsLimitPushdown);
static void ProcessWindowFunctionsForWorkerQuery(List *windowClauseList,
List *originalTargetEntryList,
QueryWindowClause *queryWindowClause,
QueryTargetList *queryTargetList);
static void ProcessLimitOrderByForWorkerQuery(OrderByLimitReference orderByLimitReference,
Node *originalLimitCount, Node *limitOffset,
List *sortClauseList, List *groupClauseList,
List *originalTargetList,
QueryOrderByLimit *queryOrderByLimit,
QueryTargetList *queryTargetList);
static OrderByLimitReference BuildOrderByLimitReference(bool hasDistinctOn, bool
groupedByDisjointPartitionColumn,
List *groupClause,
List *sortClauseList,
List *targetList);
static void ExpandWorkerTargetEntry(List *expressionList, static void ExpandWorkerTargetEntry(List *expressionList,
TargetEntry *originalTargetEntry, TargetEntry *originalTargetEntry,
bool addToGroupByClause, bool addToGroupByClause,
List **newTargetEntryList, QueryTargetList *queryTargetList,
AttrNumber *targetProjectionNumber, QueryGroupClause *queryGroupClause);
List **groupClauseList,
Index *nextSortGroupRefIndex);
static Index GetNextSortGroupRef(List *targetEntryList); static Index GetNextSortGroupRef(List *targetEntryList);
static TargetEntry * GenerateWorkerTargetEntry(TargetEntry *targetEntry, static TargetEntry * GenerateWorkerTargetEntry(TargetEntry *targetEntry,
Expr *workerExpression, Expr *workerExpression,
AttrNumber targetProjectionNumber); AttrNumber targetProjectionNumber);
static void AppendTargetEntryToGroupClause(TargetEntry *targetEntry, static void AppendTargetEntryToGroupClause(TargetEntry *targetEntry,
List **groupClause, QueryGroupClause *queryGroupClause);
Index *nextSortGroupRefIndex);
static bool WorkerAggregateWalker(Node *node, static bool WorkerAggregateWalker(Node *node,
WorkerAggregateWalkerContext *walkerContext); WorkerAggregateWalkerContext *walkerContext);
static List * WorkerAggregateExpressionList(Aggref *originalAggregate, static List * WorkerAggregateExpressionList(Aggref *originalAggregate,
@ -174,10 +277,11 @@ static bool TablePartitioningSupportsDistinct(List *tableNodeList,
Var *distinctColumn); Var *distinctColumn);
/* Local functions forward declarations for limit clauses */ /* Local functions forward declarations for limit clauses */
static Node * WorkerLimitCount(MultiExtendedOp *originalOpNode, static Node * WorkerLimitCount(Node *limitCount, Node *limitOffset, OrderByLimitReference
bool groupedByDisjointPartitionColumn); orderByLimitReference);
static List * WorkerSortClauseList(MultiExtendedOp *originalOpNode, static List * WorkerSortClauseList(Node *limitCount,
bool groupedByDisjointPartitionColumn); List *groupClauseList, List *sortClauseList,
OrderByLimitReference orderByLimitReference);
static List * GenerateNewTargetEntriesForSortClauses(List *originalTargetList, static List * GenerateNewTargetEntriesForSortClauses(List *originalTargetList,
List *sortClauseList, List *sortClauseList,
AttrNumber *targetProjectionNumber, AttrNumber *targetProjectionNumber,
@ -1821,41 +1925,166 @@ AddTypeConversion(Node *originalAggregate, Node *newExpression)
/* /*
* WorkerExtendedOpNode creates the worker extended operator node from the given * WorkerExtendedOpNode creates the worker extended operator node from the given
* target entries. The function walks over these target entries; and for entries * originalOpNode and extendedOpNodeProperties.
* with aggregates in them, this function calls the recursive aggregate walker *
* function to create aggregates for the worker nodes. Also, the function checks * For the details of the processing see the comments of the functions that
* if we can push down the limit to worker nodes; and if we can, sets the limit * are called from this function.
* count and sort clause list fields in the new operator node. It provides special
* treatment for count distinct operator if it is used in repartition subqueries
* or on non-partition columns. Each column in count distinct aggregate is added
* to target list, and group by list of worker extended operator.
*/ */
static MultiExtendedOp * static MultiExtendedOp *
WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
ExtendedOpNodeProperties *extendedOpNodeProperties) ExtendedOpNodeProperties *extendedOpNodeProperties)
{ {
MultiExtendedOp *workerExtendedOpNode = NULL; MultiExtendedOp *workerExtendedOpNode = NULL;
List *targetEntryList = originalOpNode->targetList;
ListCell *targetEntryCell = NULL;
List *newTargetEntryList = NIL;
List *groupClauseList = copyObject(originalOpNode->groupClauseList);
Node *havingQual = originalOpNode->havingQual;
AttrNumber targetProjectionNumber = 1;
WorkerAggregateWalkerContext *walkerContext =
palloc0(sizeof(WorkerAggregateWalkerContext));
Index nextSortGroupRefIndex = 0; Index nextSortGroupRefIndex = 0;
bool queryHasAggregates = false;
bool distinctClauseSupersetofGroupClause = false;
bool distinctPreventsLimitPushdown = false; bool distinctPreventsLimitPushdown = false;
bool createNewGroupByClause = false; bool groupByExtended = false;
bool groupedByDisjointPartitionColumn = bool groupedByDisjointPartitionColumn =
extendedOpNodeProperties->groupedByDisjointPartitionColumn; extendedOpNodeProperties->groupedByDisjointPartitionColumn;
bool pushDownWindowFunction = extendedOpNodeProperties->pushDownWindowFunctions;
walkerContext->expressionList = NIL; QueryTargetList queryTargetList;
walkerContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns; QueryGroupClause queryGroupClause;
QueryDistinctClause queryDistinctClause;
QueryWindowClause queryWindowClause;
QueryOrderByLimit queryOrderByLimit;
Node *queryHavingQual = NULL;
nextSortGroupRefIndex = GetNextSortGroupRef(targetEntryList); List *originalTargetEntryList = originalOpNode->targetList;
List *originalGroupClauseList = originalOpNode->groupClauseList;
List *originalSortClauseList = originalOpNode->sortClauseList;
Node *originalHavingQual = originalOpNode->havingQual;
Node *originalLimitCount = originalOpNode->limitCount;
Node *originalLimitOffset = originalOpNode->limitOffset;
List *originalWindowClause = originalOpNode->windowClause;
List *originalDistinctClause = originalOpNode->distinctClause;
bool hasDistinctOn = originalOpNode->hasDistinctOn;
int originalGroupClauseLength = list_length(originalGroupClauseList);
bool queryHasAggregates = TargetListHasAggragates(originalTargetEntryList);
/* initialize to default values */
memset(&queryTargetList, 0, sizeof(queryGroupClause));
memset(&queryGroupClause, 0, sizeof(queryGroupClause));
memset(&queryDistinctClause, 0, sizeof(queryGroupClause));
memset(&queryWindowClause, 0, sizeof(queryGroupClause));
memset(&queryOrderByLimit, 0, sizeof(queryGroupClause));
/* calculate the next sort group index based on the original target list */
nextSortGroupRefIndex = GetNextSortGroupRef(originalTargetEntryList);
/* targetProjectionNumber starts from 1 */
queryTargetList.targetProjectionNumber = 1;
/* worker query always include all the group by entries in the query */
queryGroupClause.groupClauseList = copyObject(originalGroupClauseList);
/*
* nextSortGroupRefIndex is used by group by, window and order by clauses.
* Thus, we pass a reference to a single nextSortGroupRefIndex and expect
* it modified separately while processing those parts of the query.
*/
queryGroupClause.nextSortGroupRefIndex = &nextSortGroupRefIndex;
queryWindowClause.nextSortGroupRefIndex = &nextSortGroupRefIndex;
queryOrderByLimit.nextSortGroupRefIndex = &nextSortGroupRefIndex;
/* process each part of the query in order to generate the worker query's parts */
ProcessTargetListForWorkerQuery(originalTargetEntryList, extendedOpNodeProperties,
&queryTargetList, &queryGroupClause);
ProcessHavingClauseForWorkerQuery(originalHavingQual, extendedOpNodeProperties,
&queryHavingQual, &queryTargetList,
&queryGroupClause);
ProcessDistinctClauseForWorkerQuery(originalDistinctClause, hasDistinctOn,
queryGroupClause.groupClauseList,
queryHasAggregates, &queryDistinctClause,
&distinctPreventsLimitPushdown);
ProcessWindowFunctionsForWorkerQuery(originalWindowClause, originalTargetEntryList,
&queryWindowClause, &queryTargetList);
/*
* Order by and limit clauses are relevant to each other, and processing
* them together makes it handy for us.
*
* The other parts of the query might have already prohibited pushing down
* LIMIT and ORDER BY clauses as described below:
* (1) Creating a new group by clause during aggregate mutation, or
* (2) Distinct clause is not pushed down
*/
groupByExtended =
list_length(queryGroupClause.groupClauseList) > originalGroupClauseLength;
if (!groupByExtended && !distinctPreventsLimitPushdown)
{
/* both sort and limit clauses rely on similar information */
OrderByLimitReference limitOrderByReference =
BuildOrderByLimitReference(hasDistinctOn,
groupedByDisjointPartitionColumn,
originalGroupClauseList,
originalSortClauseList,
originalTargetEntryList);
ProcessLimitOrderByForWorkerQuery(limitOrderByReference, originalLimitCount,
originalLimitOffset, originalSortClauseList,
originalGroupClauseList,
originalTargetEntryList,
&queryOrderByLimit,
&queryTargetList);
}
/* finally, fill the extended op node with the data we gathered */
workerExtendedOpNode = CitusMakeNode(MultiExtendedOp);
workerExtendedOpNode->targetList = queryTargetList.targetEntryList;
workerExtendedOpNode->groupClauseList = queryGroupClause.groupClauseList;
workerExtendedOpNode->havingQual = queryHavingQual;
workerExtendedOpNode->hasDistinctOn = queryDistinctClause.workerHasDistinctOn;
workerExtendedOpNode->distinctClause = queryDistinctClause.workerDistinctClause;
workerExtendedOpNode->hasWindowFuncs = queryWindowClause.hasWindowFunctions;
workerExtendedOpNode->windowClause = queryWindowClause.workerWindowClauseList;
workerExtendedOpNode->sortClauseList = queryOrderByLimit.workerSortClauseList;
workerExtendedOpNode->limitCount = queryOrderByLimit.workerLimitCount;
return workerExtendedOpNode;
}
/*
* ProcessTargetListForWorkerQuery gets the inputs and modifies the outputs
* such that the worker query's target list and group by clauses are extended
* for the given inputs.
*
* The function walks over the input targetEntryList. For the entries
* with aggregates in them, it calls the recursive aggregate walker function to
* create aggregates for the worker nodes. For example, the avg() is sent to
* the worker with two expressions count() and sum(). Thus, a single target entry
* might end up with multiple expressions in the worker query.
*
* The function doesn't change the aggragates in the window functions and sends them
* as-is. The reason is that Citus currently only supports pushing down window
* functions as-is. As we implement pull-to-master window functions, we should
* revisit here as well.
*
* The function also handles count distinct operator if it is used in repartition
* subqueries or on non-partition columns (e.g., cannot be pushed down). Each
* column in count distinct aggregate is added to target list, and group by
* list of worker extended operator. This approach guarantees the distinctness
* in the worker queries.
*
* inputs: targetEntryList, extendedOpNodeProperties
* outputs: queryTargetList, queryGroupClause
*/
static void
ProcessTargetListForWorkerQuery(List *targetEntryList,
ExtendedOpNodeProperties *extendedOpNodeProperties,
QueryTargetList *queryTargetList,
QueryGroupClause *queryGroupClause)
{
ListCell *targetEntryCell = NULL;
WorkerAggregateWalkerContext *workerAggContext =
palloc0(sizeof(WorkerAggregateWalkerContext));
workerAggContext->expressionList = NIL;
workerAggContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns;
/* iterate over original target entries */ /* iterate over original target entries */
foreach(targetEntryCell, targetEntryList) foreach(targetEntryCell, targetEntryList)
@ -1867,8 +2096,8 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
bool hasWindowFunction = contain_window_function((Node *) originalExpression); bool hasWindowFunction = contain_window_function((Node *) originalExpression);
/* reset walker context */ /* reset walker context */
walkerContext->expressionList = NIL; workerAggContext->expressionList = NIL;
walkerContext->createGroupByClause = false; workerAggContext->createGroupByClause = false;
/* /*
* If the expression uses aggregates inside window function contain agg * If the expression uses aggregates inside window function contain agg
@ -1877,58 +2106,140 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
*/ */
if (hasAggregates && !hasWindowFunction) if (hasAggregates && !hasWindowFunction)
{ {
WorkerAggregateWalker((Node *) originalExpression, walkerContext); WorkerAggregateWalker((Node *) originalExpression, workerAggContext);
newExpressionList = walkerContext->expressionList; newExpressionList = workerAggContext->expressionList;
queryHasAggregates = true;
} }
else else
{ {
newExpressionList = list_make1(originalExpression); newExpressionList = list_make1(originalExpression);
} }
createNewGroupByClause = walkerContext->createGroupByClause;
ExpandWorkerTargetEntry(newExpressionList, originalTargetEntry, ExpandWorkerTargetEntry(newExpressionList, originalTargetEntry,
createNewGroupByClause, &newTargetEntryList, workerAggContext->createGroupByClause,
&targetProjectionNumber, &groupClauseList, queryTargetList, queryGroupClause);
&nextSortGroupRefIndex);
} }
}
/* we also need to add having expressions to worker target list */
if (havingQual != NULL) /*
{ * ProcessHavingClauseForWorkerQuery gets the inputs and modifies the outputs
* such that the worker query's target list and group by clauses are extended
* based on the inputs.
*
* The rule is that Citus always applies the HAVING clause on the
* coordinator. Thus, it pulls the necessary data from the workers. Also, when the
* having clause is safe to pushdown to the workers, workerHavingQual is set to
* be the original having clause.
*
* TODO: Citus currently always pulls the expressions in the having clause to the
* coordinator and apply it on the coordinator. Do we really need to pull those
* expressions to the coordinator and apply the having on the coordinator if we're
* already pushing down the HAVING clause?
*
* inputs: originalHavingQual, extendedOpNodeProperties
* outputs: workerHavingQual, queryTargetList, queryGroupClause
*/
static void
ProcessHavingClauseForWorkerQuery(Node *originalHavingQual,
ExtendedOpNodeProperties *extendedOpNodeProperties,
Node **workerHavingQual,
QueryTargetList *queryTargetList,
QueryGroupClause *queryGroupClause)
{
List *newExpressionList = NIL; List *newExpressionList = NIL;
TargetEntry *targetEntry = NULL; TargetEntry *targetEntry = NULL;
WorkerAggregateWalkerContext *workerAggContext = NULL;
/* reset walker context */ if (originalHavingQual == NULL)
walkerContext->expressionList = NIL; {
walkerContext->createGroupByClause = false; return;
WorkerAggregateWalker(havingQual, walkerContext);
newExpressionList = walkerContext->expressionList;
createNewGroupByClause = walkerContext->createGroupByClause;
ExpandWorkerTargetEntry(newExpressionList, targetEntry,
createNewGroupByClause, &newTargetEntryList,
&targetProjectionNumber, &groupClauseList,
&nextSortGroupRefIndex);
} }
workerExtendedOpNode = CitusMakeNode(MultiExtendedOp); *workerHavingQual = NULL;
workerExtendedOpNode->distinctClause = NIL;
workerExtendedOpNode->hasDistinctOn = false;
workerExtendedOpNode->hasWindowFuncs = originalOpNode->hasWindowFuncs;
workerExtendedOpNode->windowClause = originalOpNode->windowClause;
workerExtendedOpNode->groupClauseList = groupClauseList;
if (originalOpNode->distinctClause) workerAggContext = palloc0(sizeof(WorkerAggregateWalkerContext));
workerAggContext->expressionList = NIL;
workerAggContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns;
workerAggContext->createGroupByClause = false;
WorkerAggregateWalker(originalHavingQual, workerAggContext);
newExpressionList = workerAggContext->expressionList;
ExpandWorkerTargetEntry(newExpressionList, targetEntry,
workerAggContext->createGroupByClause,
queryTargetList, queryGroupClause);
/*
* If grouped by a partition column whose values are shards have disjoint sets
* of partition values, we can push down the having qualifier.
*
* When a query with subquery is provided, we can't determine if
* groupedByDisjointPartitionColumn, therefore we also check if there is a
* window function too. If there is a window function we would know that it
* is safe to push down (i.e. it is partitioned on distribution column, and
* if there is a group by, it contains distribution column).
*
*/
if (extendedOpNodeProperties->groupedByDisjointPartitionColumn ||
extendedOpNodeProperties->pushDownWindowFunctions)
{ {
/*
* We converted the having expression to a list in subquery pushdown
* planner. However, this query cannot be parsed as it is in the worker.
* We should convert this back to being explicit for worker query
* so that it can be parsed when it hits to the standard planner in
* worker.
*/
if (IsA(originalHavingQual, List))
{
*workerHavingQual =
(Node *) make_ands_explicit((List *) originalHavingQual);
}
else
{
*workerHavingQual = originalHavingQual;
}
}
}
/*
* PrcoessDistinctClauseForWorkerQuery gets the inputs and modifies the outputs
* such that worker query's DISTINCT and DISTINCT ON clauses are set accordingly.
* Note the the function may or may not decide to pushdown the DISTINCT and DISTINCT
* on clauses based on the inputs.
*
* See the detailed comments in the function for the rules of pushing down DISTINCT
* and DISTINCT ON clauses to the worker queries.
*
* The function also sets distinctPreventsLimitPushdown. As the name reveals,
* distinct could prevent pushwing down LIMIT clauses later in the planning.
* For the details, see the comments in the function.
*
* inputs: distinctClause, hasDistinctOn, groupClauseList, queryHasAggregates
* outputs: queryDistinctClause, distinctPreventsLimitPushdown
*
*/
static void
ProcessDistinctClauseForWorkerQuery(List *distinctClause, bool hasDistinctOn,
List *groupClauseList,
bool queryHasAggregates,
QueryDistinctClause *queryDistinctClause,
bool *distinctPreventsLimitPushdown)
{
bool distinctClauseSupersetofGroupClause = false;
bool shouldPushdownDistinct = false; bool shouldPushdownDistinct = false;
if (distinctClause == NIL)
{
return;
}
*distinctPreventsLimitPushdown = false;
if (groupClauseList == NIL || if (groupClauseList == NIL ||
IsGroupBySubsetOfDistinct(groupClauseList, IsGroupBySubsetOfDistinct(groupClauseList, distinctClause))
originalOpNode->distinctClause))
{ {
distinctClauseSupersetofGroupClause = true; distinctClauseSupersetofGroupClause = true;
} }
@ -1942,7 +2253,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
* LIMIT might cause missing the necessary data from * LIMIT might cause missing the necessary data from
* the worker query * the worker query
*/ */
distinctPreventsLimitPushdown = true; *distinctPreventsLimitPushdown = true;
} }
/* /*
@ -1956,96 +2267,187 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
distinctClauseSupersetofGroupClause; distinctClauseSupersetofGroupClause;
if (shouldPushdownDistinct) if (shouldPushdownDistinct)
{ {
workerExtendedOpNode->distinctClause = originalOpNode->distinctClause; queryDistinctClause->workerDistinctClause = distinctClause;
workerExtendedOpNode->hasDistinctOn = originalOpNode->hasDistinctOn; queryDistinctClause->workerHasDistinctOn = hasDistinctOn;
}
} }
}
/*
* Order by and limit clauses are pushed down only if /*
* (1) We do not create a new group by clause during aggregate mutation, and * ProcessWindowFunctionsForWorkerQuery gets the inputs and modifies the outputs such
* (2) There distinct clause does not prevent limit pushdown * that worker query's workerWindowClauseList is set when the window clauses are safe to
* pushdown.
*
* TODO: Citus only supports pushing down window clauses as-is under certain circumstances.
* And, at this point in the planning, we are guaraanted to process a window function
* which is safe to pushdown as-is. It should also be possible to pull the relevant data
* to the coordinator and apply the window clauses for the remaining cases.
*
* Note that even though Citus only pushes down the window functions, it may need to
* modify the target list of the worker query when the window function refers to
* an avg(). The reason is that any aggragate which is also referred by other
* target entries would be mutated by Citus. Thus, we add a copy of the same aggragate
* to the worker target list to make sure that the window function refers to the
* non-mutated aggragate.
*
* inputs: windowClauseList, originalTargetEntryList
* outputs: queryWindowClause, queryTargetList
*
*/ */
if (!createNewGroupByClause && !distinctPreventsLimitPushdown) static void
{ ProcessWindowFunctionsForWorkerQuery(List *windowClauseList,
List *newTargetEntryListForSortClauses = NIL; List *originalTargetEntryList,
QueryWindowClause *queryWindowClause,
/* if we can push down the limit, also set related fields */ QueryTargetList *queryTargetList)
workerExtendedOpNode->limitCount = WorkerLimitCount(originalOpNode, {
groupedByDisjointPartitionColumn);
workerExtendedOpNode->sortClauseList =
WorkerSortClauseList(originalOpNode, groupedByDisjointPartitionColumn);
newTargetEntryListForSortClauses =
GenerateNewTargetEntriesForSortClauses(originalOpNode->targetList,
workerExtendedOpNode->sortClauseList,
&targetProjectionNumber,
&nextSortGroupRefIndex);
newTargetEntryList = list_concat(newTargetEntryList,
newTargetEntryListForSortClauses);
}
if (workerExtendedOpNode->windowClause)
{
List *windowClauseList = workerExtendedOpNode->windowClause;
ListCell *windowClauseCell = NULL; ListCell *windowClauseCell = NULL;
if (windowClauseList == NIL)
{
queryWindowClause->hasWindowFunctions = false;
return;
}
foreach(windowClauseCell, windowClauseList) foreach(windowClauseCell, windowClauseList)
{ {
WindowClause *windowClause = (WindowClause *) lfirst(windowClauseCell); WindowClause *windowClause = (WindowClause *) lfirst(windowClauseCell);
List *partitionClauseTargetList = List *partitionClauseTargetList =
GenerateNewTargetEntriesForSortClauses(originalOpNode->targetList, GenerateNewTargetEntriesForSortClauses(originalTargetEntryList,
windowClause->partitionClause, windowClause->partitionClause,
&targetProjectionNumber, &(queryTargetList->
&nextSortGroupRefIndex); targetProjectionNumber),
queryWindowClause->
nextSortGroupRefIndex);
List *orderClauseTargetList = List *orderClauseTargetList =
GenerateNewTargetEntriesForSortClauses(originalOpNode->targetList, GenerateNewTargetEntriesForSortClauses(originalTargetEntryList,
windowClause->orderClause, windowClause->orderClause,
&targetProjectionNumber, &(queryTargetList->
&nextSortGroupRefIndex); targetProjectionNumber),
queryWindowClause->
nextSortGroupRefIndex);
newTargetEntryList = list_concat(newTargetEntryList, /*
* Note that even Citus does push down the window clauses as-is, we may still need to
* add the generated entries to the target list. The reason is that the same aggragates
* might be referred from another target entry that is a bare aggragate (e.g., no window
* functions), which would have been mutated. For instance, when an average aggragate
* is mutated on the target list, the window function would refer to a sum aggragate,
* which is obviously wrong.
*/
queryTargetList->targetEntryList = list_concat(queryTargetList->targetEntryList,
partitionClauseTargetList); partitionClauseTargetList);
newTargetEntryList = list_concat(newTargetEntryList, queryTargetList->targetEntryList = list_concat(queryTargetList->targetEntryList,
orderClauseTargetList); orderClauseTargetList);
} }
}
workerExtendedOpNode->targetList = newTargetEntryList; queryWindowClause->workerWindowClauseList = windowClauseList;
queryWindowClause->hasWindowFunctions = true;
}
/*
* ProcessLimitOrderByForWorkerQuery gets the inputs and modifies the outputs
* such that worker query's LIMIT and ORDER BY clauses are set accordingly.
* Adding entries to ORDER BY might trigger adding new entries to newTargetEntryList.
* See GenerateNewTargetEntriesForSortClauses() for the details.
*
* For the decisions on whether and how to pushdown LIMIT and ORDER BY are documented
* in the functions that are called from this function.
*
* inputs: sortLimitReference, originalLimitCount, limitOffset,
* sortClauseList, groupClauseList, originalTargetList
* outputs: queryOrderByLimit, queryTargetList
*/
static void
ProcessLimitOrderByForWorkerQuery(OrderByLimitReference orderByLimitReference,
Node *originalLimitCount, Node *limitOffset,
List *sortClauseList, List *groupClauseList,
List *originalTargetList,
QueryOrderByLimit *queryOrderByLimit,
QueryTargetList *queryTargetList)
{
List *newTargetEntryListForSortClauses = NIL;
queryOrderByLimit->workerLimitCount =
WorkerLimitCount(originalLimitCount, limitOffset, orderByLimitReference);
queryOrderByLimit->workerSortClauseList =
WorkerSortClauseList(originalLimitCount,
groupClauseList,
sortClauseList,
orderByLimitReference);
/* /*
* If grouped by a partition column whose values are shards have disjoint sets * TODO: Do we really need to add the target entries if we're not pushing
* of partition values, we can push down the having qualifier. * down ORDER BY?
*
* When a query with subquery is provided, we can't determine if
* groupedByDisjointPartitionColumn, therefore we also check if there is a
* window function too. If there is a window function we would know that it
* is safe to push down (i.e. it is partitioned on distribution column, and
* if there is a group by, it contains distribution column).
*
*/ */
if (havingQual != NULL && newTargetEntryListForSortClauses =
(groupedByDisjointPartitionColumn || pushDownWindowFunction)) GenerateNewTargetEntriesForSortClauses(originalTargetList,
queryOrderByLimit->workerSortClauseList,
&(queryTargetList->targetProjectionNumber),
queryOrderByLimit->nextSortGroupRefIndex);
queryTargetList->targetEntryList =
list_concat(queryTargetList->targetEntryList, newTargetEntryListForSortClauses);
}
/*
* BuildLimitOrderByReference is a helper function that simply builds
* the necessary information for processing the limit and order by.
* The return value should be used in a read-only manner.
*/
static OrderByLimitReference
BuildOrderByLimitReference(bool hasDistinctOn, bool groupedByDisjointPartitionColumn,
List *groupClause, List *sortClauseList, List *targetList)
{
OrderByLimitReference limitOrderByReference;
limitOrderByReference.groupedByDisjointPartitionColumn =
groupedByDisjointPartitionColumn;
limitOrderByReference.hasDistinctOn = hasDistinctOn;
limitOrderByReference.groupClauseIsEmpty = (groupClause == NIL);
limitOrderByReference.sortClauseIsEmpty = (sortClauseList == NIL);
limitOrderByReference.canApproximate =
CanPushDownLimitApproximate(sortClauseList, targetList);
limitOrderByReference.hasOrderByAggregate =
HasOrderByAggregate(sortClauseList, targetList);
return limitOrderByReference;
}
/*
* TargetListHasAggragates returns true if any of the elements in the
* target list contain aggragates that are not inside the window functions.
*/
static bool
TargetListHasAggragates(List *targetEntryList)
{
ListCell *targetEntryCell = NULL;
/* iterate over original target entries */
foreach(targetEntryCell, targetEntryList)
{ {
workerExtendedOpNode->havingQual = originalOpNode->havingQual; TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
Expr *targetExpr = targetEntry->expr;
bool hasAggregates = contain_agg_clause((Node *) targetExpr);
bool hasWindowFunction = contain_window_function((Node *) targetExpr);
/* /*
* We converted the having expression to a list in subquery pushdown * If the expression uses aggregates inside window function contain agg
* planner. However, this query cannot be parsed as it is in the worker. * clause still returns true. We want to make sure it is not a part of
* We should convert this back to being explicit for worker query * window function before we proceed.
* so that it can be parsed when it hits to the standard planner in
* worker.
*/ */
if (IsA(workerExtendedOpNode->havingQual, List)) if (hasAggregates && !hasWindowFunction)
{ {
workerExtendedOpNode->havingQual = return true;
(Node *) make_ands_explicit((List *) workerExtendedOpNode->havingQual);
} }
} }
return workerExtendedOpNode; return false;
} }
@ -2064,24 +2466,11 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
* if the expressionList is not generated from any target entry. * if the expressionList is not generated from any target entry.
* - addToGroupByClause: True if the expressionList should also be added to the * - addToGroupByClause: True if the expressionList should also be added to the
* worker query's GROUP BY clause. * worker query's GROUP BY clause.
* Outputs:
* - newTargetEntryList: Worker query's target list. The function adds one target entry
* to this list for each expression that is in expressionList.
* - targetProjectionNumber: A pointer to the worker query target list index. Should be
* incremented by one for each target entry added to
* newTargetEntryList.
* - groupClauseList: Worker query's group clause list. The function adds one entry to
* this list for any expression that is in expressionList when
* addToGroupByClause flag is true.
* - nextSortGroupRefIndex: A pointer to worker query's sort group reference index.
* Should be incremented by one for each element added to
* the groupClauseList.
*/ */
static void static void
ExpandWorkerTargetEntry(List *expressionList, TargetEntry *originalTargetEntry, ExpandWorkerTargetEntry(List *expressionList, TargetEntry *originalTargetEntry,
bool addToGroupByClause, List **newTargetEntryList, bool addToGroupByClause, QueryTargetList *queryTargetList,
AttrNumber *targetProjectionNumber, List **groupClauseList, QueryGroupClause *queryGroupClause)
Index *nextSortGroupRefIndex)
{ {
ListCell *newExpressionCell = NULL; ListCell *newExpressionCell = NULL;
@ -2094,9 +2483,10 @@ ExpandWorkerTargetEntry(List *expressionList, TargetEntry *originalTargetEntry,
/* generate and add the new target entry to the target list */ /* generate and add the new target entry to the target list */
newTargetEntry = newTargetEntry =
GenerateWorkerTargetEntry(originalTargetEntry, newExpression, GenerateWorkerTargetEntry(originalTargetEntry, newExpression,
*targetProjectionNumber); queryTargetList->targetProjectionNumber);
(*targetProjectionNumber)++; (queryTargetList->targetProjectionNumber)++;
(*newTargetEntryList) = lappend(*newTargetEntryList, newTargetEntry); queryTargetList->targetEntryList =
lappend(queryTargetList->targetEntryList, newTargetEntry);
/* /*
* Detect new targets of type Var and add it to group clause list. * Detect new targets of type Var and add it to group clause list.
@ -2107,8 +2497,7 @@ ExpandWorkerTargetEntry(List *expressionList, TargetEntry *originalTargetEntry,
*/ */
if (IsA(newExpression, Var) && addToGroupByClause) if (IsA(newExpression, Var) && addToGroupByClause)
{ {
AppendTargetEntryToGroupClause(newTargetEntry, groupClauseList, AppendTargetEntryToGroupClause(newTargetEntry, queryGroupClause);
nextSortGroupRefIndex);
} }
} }
} }
@ -2200,8 +2589,8 @@ GenerateWorkerTargetEntry(TargetEntry *targetEntry, Expr *workerExpression,
* appended to the group clause and the index is incremented by one. * appended to the group clause and the index is incremented by one.
*/ */
static void static void
AppendTargetEntryToGroupClause(TargetEntry *targetEntry, List **groupClause, AppendTargetEntryToGroupClause(TargetEntry *targetEntry,
Index *nextSortGroupRefIndex) QueryGroupClause *queryGroupClause)
{ {
Expr *targetExpr PG_USED_FOR_ASSERTS_ONLY = targetEntry->expr; Expr *targetExpr PG_USED_FOR_ASSERTS_ONLY = targetEntry->expr;
Var *targetColumn = NULL; Var *targetColumn = NULL;
@ -2214,14 +2603,15 @@ AppendTargetEntryToGroupClause(TargetEntry *targetEntry, List **groupClause,
groupByClause = CreateSortGroupClause(targetColumn); groupByClause = CreateSortGroupClause(targetColumn);
/* the target entry should have an index */ /* the target entry should have an index */
targetEntry->ressortgroupref = *nextSortGroupRefIndex; targetEntry->ressortgroupref = *queryGroupClause->nextSortGroupRefIndex;
/* the group by clause entry should point to the correct index in the target list */ /* the group by clause entry should point to the correct index in the target list */
groupByClause->tleSortGroupRef = *nextSortGroupRefIndex; groupByClause->tleSortGroupRef = *queryGroupClause->nextSortGroupRefIndex;
/* update the group by list and the index's value */ /* update the group by list and the index's value */
*groupClause = lappend(*groupClause, groupByClause); queryGroupClause->groupClauseList =
(*nextSortGroupRefIndex)++; lappend(queryGroupClause->groupClauseList, groupByClause);
(*queryGroupClause->nextSortGroupRefIndex)++;
} }
@ -3388,7 +3778,7 @@ ExtractQueryWalker(Node *node, List **queryList)
/* /*
* WorkerLimitCount checks if the given extended node contains a limit node, and * WorkerLimitCount checks if the given input contains a valid limit node, and
* if that node can be pushed down. For this, the function checks if this limit * if that node can be pushed down. For this, the function checks if this limit
* count or a meaningful approximation of it can be pushed down to worker nodes. * count or a meaningful approximation of it can be pushed down to worker nodes.
* If they can, the function returns the limit count. * If they can, the function returns the limit count.
@ -3413,19 +3803,15 @@ ExtractQueryWalker(Node *node, List **queryList)
* returns null. * returns null.
*/ */
static Node * static Node *
WorkerLimitCount(MultiExtendedOp *originalOpNode, WorkerLimitCount(Node *limitCount, Node *limitOffset, OrderByLimitReference
bool groupedByDisjointPartitionColumn) orderByLimitReference)
{ {
Node *workerLimitNode = NULL; Node *workerLimitNode = NULL;
List *groupClauseList = originalOpNode->groupClauseList;
List *sortClauseList = originalOpNode->sortClauseList;
List *targetList = originalOpNode->targetList;
bool hasOrderByAggregate = HasOrderByAggregate(sortClauseList, targetList);
bool canPushDownLimit = false; bool canPushDownLimit = false;
bool canApproximate = false; bool canApproximate = false;
/* no limit node to push down */ /* no limit node to push down */
if (originalOpNode->limitCount == NULL) if (limitCount == NULL)
{ {
return NULL; return NULL;
} }
@ -3435,9 +3821,8 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode,
* certain expressions such as parameters are not evaluated and converted * certain expressions such as parameters are not evaluated and converted
* into Consts on the op node. * into Consts on the op node.
*/ */
Assert(IsA(originalOpNode->limitCount, Const)); Assert(IsA(limitCount, Const));
Assert(originalOpNode->limitOffset == NULL || Assert(limitOffset == NULL || IsA(limitOffset, Const));
IsA(originalOpNode->limitOffset, Const));
/* /*
* If we don't have group by clauses, or we have group by partition column, * If we don't have group by clauses, or we have group by partition column,
@ -3445,31 +3830,32 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode,
* original limit. Else if we have order by clauses with commutative aggregates, * original limit. Else if we have order by clauses with commutative aggregates,
* we can push down approximate limits. * we can push down approximate limits.
*/ */
if (groupClauseList == NIL || groupedByDisjointPartitionColumn) if (orderByLimitReference.groupClauseIsEmpty ||
orderByLimitReference.groupedByDisjointPartitionColumn)
{ {
canPushDownLimit = true; canPushDownLimit = true;
} }
else if (sortClauseList == NIL) else if (orderByLimitReference.sortClauseIsEmpty)
{ {
canPushDownLimit = false; canPushDownLimit = false;
} }
else if (!hasOrderByAggregate) else if (!orderByLimitReference.hasOrderByAggregate)
{ {
canPushDownLimit = true; canPushDownLimit = true;
} }
else else
{ {
canApproximate = CanPushDownLimitApproximate(sortClauseList, targetList); canApproximate = orderByLimitReference.canApproximate;
} }
/* create the workerLimitNode according to the decisions above */ /* create the workerLimitNode according to the decisions above */
if (canPushDownLimit) if (canPushDownLimit)
{ {
workerLimitNode = (Node *) copyObject(originalOpNode->limitCount); workerLimitNode = (Node *) copyObject(limitCount);
} }
else if (canApproximate) else if (canApproximate)
{ {
Const *workerLimitConst = (Const *) copyObject(originalOpNode->limitCount); Const *workerLimitConst = (Const *) copyObject(limitCount);
int64 workerLimitCount = (int64) LimitClauseRowFetchCount; int64 workerLimitCount = (int64) LimitClauseRowFetchCount;
workerLimitConst->constvalue = Int64GetDatum(workerLimitCount); workerLimitConst->constvalue = Int64GetDatum(workerLimitCount);
@ -3480,10 +3866,10 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode,
* If offset clause is present and limit can be pushed down (whether exactly or * If offset clause is present and limit can be pushed down (whether exactly or
* approximately), add the offset value to limit on workers * approximately), add the offset value to limit on workers
*/ */
if (workerLimitNode != NULL && originalOpNode->limitOffset != NULL) if (workerLimitNode != NULL && limitOffset != NULL)
{ {
Const *workerLimitConst = (Const *) workerLimitNode; Const *workerLimitConst = (Const *) workerLimitNode;
Const *workerOffsetConst = (Const *) originalOpNode->limitOffset; Const *workerOffsetConst = (Const *) limitOffset;
int64 workerLimitCount = DatumGetInt64(workerLimitConst->constvalue); int64 workerLimitCount = DatumGetInt64(workerLimitConst->constvalue);
int64 workerOffsetCount = DatumGetInt64(workerOffsetConst->constvalue); int64 workerOffsetCount = DatumGetInt64(workerOffsetConst->constvalue);
@ -3506,27 +3892,26 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode,
/* /*
* WorkerSortClauseList first checks if the given extended node contains a limit * WorkerSortClauseList first checks if the given input contains a limit
* or hasDistinctOn that can be pushed down. If it does, the function then * or hasDistinctOn that can be pushed down. If it does, the function then
* checks if we need to add any sorting and grouping clauses to the sort list we * checks if we need to add any sorting and grouping clauses to the sort list we
* push down for the limit. If we do, the function adds these clauses and * push down for the limit. If we do, the function adds these clauses and
* returns them. Otherwise, the function returns null. * returns them. Otherwise, the function returns null.
*/ */
static List * static List *
WorkerSortClauseList(MultiExtendedOp *originalOpNode, WorkerSortClauseList(Node *limitCount, List *groupClauseList, List *sortClauseList,
bool groupedByDisjointPartitionColumn) OrderByLimitReference orderByLimitReference)
{ {
List *workerSortClauseList = NIL; List *workerSortClauseList = NIL;
List *groupClauseList = originalOpNode->groupClauseList;
List *sortClauseList = copyObject(originalOpNode->sortClauseList);
List *targetList = originalOpNode->targetList;
/* if no limit node and no hasDistinctOn, no need to push down sort clauses */ /* if no limit node and no hasDistinctOn, no need to push down sort clauses */
if (originalOpNode->limitCount == NULL && !originalOpNode->hasDistinctOn) if (limitCount == NULL && !orderByLimitReference.hasDistinctOn)
{ {
return NIL; return NIL;
} }
sortClauseList = copyObject(sortClauseList);
/* /*
* If we are pushing down the limit, push down any order by clauses. Also if * If we are pushing down the limit, push down any order by clauses. Also if
* we are pushing down the limit because the order by clauses don't have any * we are pushing down the limit because the order by clauses don't have any
@ -3535,14 +3920,15 @@ WorkerSortClauseList(MultiExtendedOp *originalOpNode,
* in different task results. By ordering on the group by clause, we ensure * in different task results. By ordering on the group by clause, we ensure
* that query results are consistent. * that query results are consistent.
*/ */
if (groupClauseList == NIL || groupedByDisjointPartitionColumn) if (orderByLimitReference.groupClauseIsEmpty ||
orderByLimitReference.groupedByDisjointPartitionColumn)
{ {
workerSortClauseList = sortClauseList; workerSortClauseList = sortClauseList;
} }
else if (sortClauseList != NIL) else if (sortClauseList != NIL)
{ {
bool orderByNonAggregates = !(HasOrderByAggregate(sortClauseList, targetList)); bool orderByNonAggregates = !orderByLimitReference.hasOrderByAggregate;
bool canApproximate = CanPushDownLimitApproximate(sortClauseList, targetList); bool canApproximate = orderByLimitReference.canApproximate;
if (orderByNonAggregates) if (orderByNonAggregates)
{ {