From 832c91e28c76d331b5b5b9f8acee1b4e783afe62 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 11 Apr 2018 16:59:42 +0300 Subject: [PATCH] Move processing each part of the query into its own functions This commit doesn't change any of the logic at all. Instead, the goal is to: * Get rid of any code duplication * Incremental changes to the optimizer made it slightly hard to follow the code, improve that and make it easier to implement new features * Simplify the code by moving each part of query processing (e.g., DISTINCT, LIMIT etc) into its own function * Make the interaction between each part of the query more obvious (e.g., How DISTINCT affects LIMIT etc) --- .../planner/extended_op_node_utils.c | 4 + .../planner/multi_logical_optimizer.c | 824 +++++++++++++----- 2 files changed, 609 insertions(+), 219 deletions(-) diff --git a/src/backend/distributed/planner/extended_op_node_utils.c b/src/backend/distributed/planner/extended_op_node_utils.c index 088d4e54d..0a14b12bc 100644 --- a/src/backend/distributed/planner/extended_op_node_utils.c +++ b/src/backend/distributed/planner/extended_op_node_utils.c @@ -123,6 +123,10 @@ GroupedByDisjointPartitionColumn(List *tableNodeList, MultiExtendedOp *opNode) } +/* + * ExtendedOpNodeContainsRepartitionSubquery is a utility function that + * returns true if the extended op node contains a re-partition subquery. + */ static bool ExtendedOpNodeContainsRepartitionSubquery(MultiExtendedOp *originalOpNode) { diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 9c3611ec9..c31cf9748 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -75,6 +75,80 @@ typedef struct WorkerAggregateWalkerContext } WorkerAggregateWalkerContext; +/* + * QueryTargetList encapsulates the necessary fields to form + * worker query's target list. + */ +typedef struct QueryTargetList +{ + List *targetEntryList; /* the list of target entries */ + AttrNumber targetProjectionNumber; /* the index of the last entry */ +} QueryTargetList; + + +/* + * QueryGroupClause encapsulates the necessary fields to form + * worker query's group by clause. + */ +typedef struct QueryGroupClause +{ + List *groupClauseList; /* the list of group clause entries */ + Index *nextSortGroupRefIndex; /* pointer to the index of the largest sort group reference index */ +} QueryGroupClause; + + +/* + * QueryDistinctClause encapsulates the necessary fields to form + * worker query's DISTINCT/DISTINCT ON parts. + */ +typedef struct QueryDistinctClause +{ + List *workerDistinctClause; /* the list of distinct clause entries */ + bool workerHasDistinctOn; +} QueryDistinctClause; + + +/* + * QueryWindowClause encapsulates the necessary fields to form + * worker query's window clause. + */ +typedef struct QueryWindowClause +{ + List *workerWindowClauseList; /* the list of window clause entries */ + bool hasWindowFunctions; + Index *nextSortGroupRefIndex; /* see QueryGroupClause */ +} QueryWindowClause; + + +/* + * QueryOrderByLimit encapsulates the necessary fields to form + * worker query's order by and limit clauses. Note that we don't + * keep track of limit offset clause since it is incorporated + * into the limit clause during the processing. + */ +typedef struct QueryOrderByLimit +{ + Node *workerLimitCount; + List *workerSortClauseList; + Index *nextSortGroupRefIndex; /* see QueryGroupClause */ +} QueryOrderByLimit; + + +/* + * OrderByLimitReference a structure that is used commonly while + * processing sort and limit clauses. + */ +typedef struct OrderByLimitReference +{ + bool groupedByDisjointPartitionColumn; + bool groupClauseIsEmpty; + bool sortClauseIsEmpty; + bool hasOrderByAggregate; + bool canApproximate; + bool hasDistinctOn; +} OrderByLimitReference; + + /* Local functions forward declarations */ static MultiSelect * AndSelectNode(MultiSelect *selectNode); static MultiSelect * OrSelectNode(MultiSelect *selectNode); @@ -129,20 +203,49 @@ static Expr * AddTypeConversion(Node *originalAggregate, Node *newExpression); static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, ExtendedOpNodeProperties * extendedOpNodeProperties); +static bool TargetListHasAggragates(List *targetEntryList); +static void ProcessTargetListForWorkerQuery(List *targetEntryList, + ExtendedOpNodeProperties * + extendedOpNodeProperties, + QueryTargetList *queryTargetList, + QueryGroupClause *queryGroupClause); +static void ProcessHavingClauseForWorkerQuery(Node *havingQual, + ExtendedOpNodeProperties * + extendedOpNodeProperties, + Node **workerHavingQual, + QueryTargetList *queryTargetList, + QueryGroupClause *queryGroupClause); +static void ProcessDistinctClauseForWorkerQuery(List *distinctClause, bool hasDistinctOn, + List *groupClauseList, + bool queryHasAggregates, + QueryDistinctClause *queryDistinctClause, + bool *distinctPreventsLimitPushdown); +static void ProcessWindowFunctionsForWorkerQuery(List *windowClauseList, + List *originalTargetEntryList, + QueryWindowClause *queryWindowClause, + QueryTargetList *queryTargetList); +static void ProcessLimitOrderByForWorkerQuery(OrderByLimitReference orderByLimitReference, + Node *originalLimitCount, Node *limitOffset, + List *sortClauseList, List *groupClauseList, + List *originalTargetList, + QueryOrderByLimit *queryOrderByLimit, + QueryTargetList *queryTargetList); +static OrderByLimitReference BuildOrderByLimitReference(bool hasDistinctOn, bool + groupedByDisjointPartitionColumn, + List *groupClause, + List *sortClauseList, + List *targetList); static void ExpandWorkerTargetEntry(List *expressionList, TargetEntry *originalTargetEntry, bool addToGroupByClause, - List **newTargetEntryList, - AttrNumber *targetProjectionNumber, - List **groupClauseList, - Index *nextSortGroupRefIndex); + QueryTargetList *queryTargetList, + QueryGroupClause *queryGroupClause); static Index GetNextSortGroupRef(List *targetEntryList); static TargetEntry * GenerateWorkerTargetEntry(TargetEntry *targetEntry, Expr *workerExpression, AttrNumber targetProjectionNumber); static void AppendTargetEntryToGroupClause(TargetEntry *targetEntry, - List **groupClause, - Index *nextSortGroupRefIndex); + QueryGroupClause *queryGroupClause); static bool WorkerAggregateWalker(Node *node, WorkerAggregateWalkerContext *walkerContext); static List * WorkerAggregateExpressionList(Aggref *originalAggregate, @@ -174,10 +277,11 @@ static bool TablePartitioningSupportsDistinct(List *tableNodeList, Var *distinctColumn); /* Local functions forward declarations for limit clauses */ -static Node * WorkerLimitCount(MultiExtendedOp *originalOpNode, - bool groupedByDisjointPartitionColumn); -static List * WorkerSortClauseList(MultiExtendedOp *originalOpNode, - bool groupedByDisjointPartitionColumn); +static Node * WorkerLimitCount(Node *limitCount, Node *limitOffset, OrderByLimitReference + orderByLimitReference); +static List * WorkerSortClauseList(Node *limitCount, + List *groupClauseList, List *sortClauseList, + OrderByLimitReference orderByLimitReference); static List * GenerateNewTargetEntriesForSortClauses(List *originalTargetList, List *sortClauseList, AttrNumber *targetProjectionNumber, @@ -1821,41 +1925,166 @@ AddTypeConversion(Node *originalAggregate, Node *newExpression) /* * WorkerExtendedOpNode creates the worker extended operator node from the given - * target entries. The function walks over these target entries; and for entries - * with aggregates in them, this function calls the recursive aggregate walker - * function to create aggregates for the worker nodes. Also, the function checks - * if we can push down the limit to worker nodes; and if we can, sets the limit - * count and sort clause list fields in the new operator node. It provides special - * treatment for count distinct operator if it is used in repartition subqueries - * or on non-partition columns. Each column in count distinct aggregate is added - * to target list, and group by list of worker extended operator. + * originalOpNode and extendedOpNodeProperties. + * + * For the details of the processing see the comments of the functions that + * are called from this function. */ static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, ExtendedOpNodeProperties *extendedOpNodeProperties) { MultiExtendedOp *workerExtendedOpNode = NULL; - List *targetEntryList = originalOpNode->targetList; - ListCell *targetEntryCell = NULL; - List *newTargetEntryList = NIL; - List *groupClauseList = copyObject(originalOpNode->groupClauseList); - Node *havingQual = originalOpNode->havingQual; - AttrNumber targetProjectionNumber = 1; - WorkerAggregateWalkerContext *walkerContext = - palloc0(sizeof(WorkerAggregateWalkerContext)); Index nextSortGroupRefIndex = 0; - bool queryHasAggregates = false; - bool distinctClauseSupersetofGroupClause = false; bool distinctPreventsLimitPushdown = false; - bool createNewGroupByClause = false; + bool groupByExtended = false; bool groupedByDisjointPartitionColumn = extendedOpNodeProperties->groupedByDisjointPartitionColumn; - bool pushDownWindowFunction = extendedOpNodeProperties->pushDownWindowFunctions; - walkerContext->expressionList = NIL; - walkerContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns; + QueryTargetList queryTargetList; + QueryGroupClause queryGroupClause; + QueryDistinctClause queryDistinctClause; + QueryWindowClause queryWindowClause; + QueryOrderByLimit queryOrderByLimit; + Node *queryHavingQual = NULL; - nextSortGroupRefIndex = GetNextSortGroupRef(targetEntryList); + List *originalTargetEntryList = originalOpNode->targetList; + List *originalGroupClauseList = originalOpNode->groupClauseList; + List *originalSortClauseList = originalOpNode->sortClauseList; + Node *originalHavingQual = originalOpNode->havingQual; + Node *originalLimitCount = originalOpNode->limitCount; + Node *originalLimitOffset = originalOpNode->limitOffset; + List *originalWindowClause = originalOpNode->windowClause; + List *originalDistinctClause = originalOpNode->distinctClause; + bool hasDistinctOn = originalOpNode->hasDistinctOn; + + int originalGroupClauseLength = list_length(originalGroupClauseList); + bool queryHasAggregates = TargetListHasAggragates(originalTargetEntryList); + + /* initialize to default values */ + memset(&queryTargetList, 0, sizeof(queryGroupClause)); + memset(&queryGroupClause, 0, sizeof(queryGroupClause)); + memset(&queryDistinctClause, 0, sizeof(queryGroupClause)); + memset(&queryWindowClause, 0, sizeof(queryGroupClause)); + memset(&queryOrderByLimit, 0, sizeof(queryGroupClause)); + + /* calculate the next sort group index based on the original target list */ + nextSortGroupRefIndex = GetNextSortGroupRef(originalTargetEntryList); + + /* targetProjectionNumber starts from 1 */ + queryTargetList.targetProjectionNumber = 1; + + /* worker query always include all the group by entries in the query */ + queryGroupClause.groupClauseList = copyObject(originalGroupClauseList); + + /* + * nextSortGroupRefIndex is used by group by, window and order by clauses. + * Thus, we pass a reference to a single nextSortGroupRefIndex and expect + * it modified separately while processing those parts of the query. + */ + queryGroupClause.nextSortGroupRefIndex = &nextSortGroupRefIndex; + queryWindowClause.nextSortGroupRefIndex = &nextSortGroupRefIndex; + queryOrderByLimit.nextSortGroupRefIndex = &nextSortGroupRefIndex; + + /* process each part of the query in order to generate the worker query's parts */ + ProcessTargetListForWorkerQuery(originalTargetEntryList, extendedOpNodeProperties, + &queryTargetList, &queryGroupClause); + + ProcessHavingClauseForWorkerQuery(originalHavingQual, extendedOpNodeProperties, + &queryHavingQual, &queryTargetList, + &queryGroupClause); + + ProcessDistinctClauseForWorkerQuery(originalDistinctClause, hasDistinctOn, + queryGroupClause.groupClauseList, + queryHasAggregates, &queryDistinctClause, + &distinctPreventsLimitPushdown); + + ProcessWindowFunctionsForWorkerQuery(originalWindowClause, originalTargetEntryList, + &queryWindowClause, &queryTargetList); + + /* + * Order by and limit clauses are relevant to each other, and processing + * them together makes it handy for us. + * + * The other parts of the query might have already prohibited pushing down + * LIMIT and ORDER BY clauses as described below: + * (1) Creating a new group by clause during aggregate mutation, or + * (2) Distinct clause is not pushed down + */ + groupByExtended = + list_length(queryGroupClause.groupClauseList) > originalGroupClauseLength; + if (!groupByExtended && !distinctPreventsLimitPushdown) + { + /* both sort and limit clauses rely on similar information */ + OrderByLimitReference limitOrderByReference = + BuildOrderByLimitReference(hasDistinctOn, + groupedByDisjointPartitionColumn, + originalGroupClauseList, + originalSortClauseList, + originalTargetEntryList); + + ProcessLimitOrderByForWorkerQuery(limitOrderByReference, originalLimitCount, + originalLimitOffset, originalSortClauseList, + originalGroupClauseList, + originalTargetEntryList, + &queryOrderByLimit, + &queryTargetList); + } + + /* finally, fill the extended op node with the data we gathered */ + workerExtendedOpNode = CitusMakeNode(MultiExtendedOp); + + workerExtendedOpNode->targetList = queryTargetList.targetEntryList; + workerExtendedOpNode->groupClauseList = queryGroupClause.groupClauseList; + workerExtendedOpNode->havingQual = queryHavingQual; + workerExtendedOpNode->hasDistinctOn = queryDistinctClause.workerHasDistinctOn; + workerExtendedOpNode->distinctClause = queryDistinctClause.workerDistinctClause; + workerExtendedOpNode->hasWindowFuncs = queryWindowClause.hasWindowFunctions; + workerExtendedOpNode->windowClause = queryWindowClause.workerWindowClauseList; + workerExtendedOpNode->sortClauseList = queryOrderByLimit.workerSortClauseList; + workerExtendedOpNode->limitCount = queryOrderByLimit.workerLimitCount; + + return workerExtendedOpNode; +} + + +/* + * ProcessTargetListForWorkerQuery gets the inputs and modifies the outputs + * such that the worker query's target list and group by clauses are extended + * for the given inputs. + * + * The function walks over the input targetEntryList. For the entries + * with aggregates in them, it calls the recursive aggregate walker function to + * create aggregates for the worker nodes. For example, the avg() is sent to + * the worker with two expressions count() and sum(). Thus, a single target entry + * might end up with multiple expressions in the worker query. + * + * The function doesn't change the aggragates in the window functions and sends them + * as-is. The reason is that Citus currently only supports pushing down window + * functions as-is. As we implement pull-to-master window functions, we should + * revisit here as well. + * + * The function also handles count distinct operator if it is used in repartition + * subqueries or on non-partition columns (e.g., cannot be pushed down). Each + * column in count distinct aggregate is added to target list, and group by + * list of worker extended operator. This approach guarantees the distinctness + * in the worker queries. + * + * inputs: targetEntryList, extendedOpNodeProperties + * outputs: queryTargetList, queryGroupClause + */ +static void +ProcessTargetListForWorkerQuery(List *targetEntryList, + ExtendedOpNodeProperties *extendedOpNodeProperties, + QueryTargetList *queryTargetList, + QueryGroupClause *queryGroupClause) +{ + ListCell *targetEntryCell = NULL; + WorkerAggregateWalkerContext *workerAggContext = + palloc0(sizeof(WorkerAggregateWalkerContext)); + + workerAggContext->expressionList = NIL; + workerAggContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns; /* iterate over original target entries */ foreach(targetEntryCell, targetEntryList) @@ -1867,8 +2096,8 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, bool hasWindowFunction = contain_window_function((Node *) originalExpression); /* reset walker context */ - walkerContext->expressionList = NIL; - walkerContext->createGroupByClause = false; + workerAggContext->expressionList = NIL; + workerAggContext->createGroupByClause = false; /* * If the expression uses aggregates inside window function contain agg @@ -1877,143 +2106,69 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, */ if (hasAggregates && !hasWindowFunction) { - WorkerAggregateWalker((Node *) originalExpression, walkerContext); + WorkerAggregateWalker((Node *) originalExpression, workerAggContext); - newExpressionList = walkerContext->expressionList; - queryHasAggregates = true; + newExpressionList = workerAggContext->expressionList; } else { newExpressionList = list_make1(originalExpression); } - createNewGroupByClause = walkerContext->createGroupByClause; - ExpandWorkerTargetEntry(newExpressionList, originalTargetEntry, - createNewGroupByClause, &newTargetEntryList, - &targetProjectionNumber, &groupClauseList, - &nextSortGroupRefIndex); + workerAggContext->createGroupByClause, + queryTargetList, queryGroupClause); } +} - /* we also need to add having expressions to worker target list */ - if (havingQual != NULL) + +/* + * ProcessHavingClauseForWorkerQuery gets the inputs and modifies the outputs + * such that the worker query's target list and group by clauses are extended + * based on the inputs. + * + * The rule is that Citus always applies the HAVING clause on the + * coordinator. Thus, it pulls the necessary data from the workers. Also, when the + * having clause is safe to pushdown to the workers, workerHavingQual is set to + * be the original having clause. + * + * TODO: Citus currently always pulls the expressions in the having clause to the + * coordinator and apply it on the coordinator. Do we really need to pull those + * expressions to the coordinator and apply the having on the coordinator if we're + * already pushing down the HAVING clause? + * + * inputs: originalHavingQual, extendedOpNodeProperties + * outputs: workerHavingQual, queryTargetList, queryGroupClause + */ +static void +ProcessHavingClauseForWorkerQuery(Node *originalHavingQual, + ExtendedOpNodeProperties *extendedOpNodeProperties, + Node **workerHavingQual, + QueryTargetList *queryTargetList, + QueryGroupClause *queryGroupClause) +{ + List *newExpressionList = NIL; + TargetEntry *targetEntry = NULL; + WorkerAggregateWalkerContext *workerAggContext = NULL; + + if (originalHavingQual == NULL) { - List *newExpressionList = NIL; - TargetEntry *targetEntry = NULL; - - /* reset walker context */ - walkerContext->expressionList = NIL; - walkerContext->createGroupByClause = false; - - WorkerAggregateWalker(havingQual, walkerContext); - newExpressionList = walkerContext->expressionList; - - createNewGroupByClause = walkerContext->createGroupByClause; - - ExpandWorkerTargetEntry(newExpressionList, targetEntry, - createNewGroupByClause, &newTargetEntryList, - &targetProjectionNumber, &groupClauseList, - &nextSortGroupRefIndex); + return; } - workerExtendedOpNode = CitusMakeNode(MultiExtendedOp); - workerExtendedOpNode->distinctClause = NIL; - workerExtendedOpNode->hasDistinctOn = false; - workerExtendedOpNode->hasWindowFuncs = originalOpNode->hasWindowFuncs; - workerExtendedOpNode->windowClause = originalOpNode->windowClause; - workerExtendedOpNode->groupClauseList = groupClauseList; + *workerHavingQual = NULL; - if (originalOpNode->distinctClause) - { - bool shouldPushdownDistinct = false; - if (groupClauseList == NIL || - IsGroupBySubsetOfDistinct(groupClauseList, - originalOpNode->distinctClause)) - { - distinctClauseSupersetofGroupClause = true; - } - else - { - distinctClauseSupersetofGroupClause = false; + workerAggContext = palloc0(sizeof(WorkerAggregateWalkerContext)); + workerAggContext->expressionList = NIL; + workerAggContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns; + workerAggContext->createGroupByClause = false; - /* - * GROUP BY being a subset of DISTINCT guarantees the - * distinctness on the workers. Otherwise, pushing down - * LIMIT might cause missing the necessary data from - * the worker query - */ - distinctPreventsLimitPushdown = true; - } + WorkerAggregateWalker(originalHavingQual, workerAggContext); + newExpressionList = workerAggContext->expressionList; - /* - * Distinct is pushed down to worker query only if the query does not - * contain an aggregate in which master processing might be required to - * complete the final result before distinct operation. We also prevent - * distinct pushdown if distinct clause is missing some entries that - * group by clause has. - */ - shouldPushdownDistinct = !queryHasAggregates && - distinctClauseSupersetofGroupClause; - if (shouldPushdownDistinct) - { - workerExtendedOpNode->distinctClause = originalOpNode->distinctClause; - workerExtendedOpNode->hasDistinctOn = originalOpNode->hasDistinctOn; - } - } - - /* - * Order by and limit clauses are pushed down only if - * (1) We do not create a new group by clause during aggregate mutation, and - * (2) There distinct clause does not prevent limit pushdown - */ - if (!createNewGroupByClause && !distinctPreventsLimitPushdown) - { - List *newTargetEntryListForSortClauses = NIL; - - /* if we can push down the limit, also set related fields */ - workerExtendedOpNode->limitCount = WorkerLimitCount(originalOpNode, - groupedByDisjointPartitionColumn); - workerExtendedOpNode->sortClauseList = - WorkerSortClauseList(originalOpNode, groupedByDisjointPartitionColumn); - - newTargetEntryListForSortClauses = - GenerateNewTargetEntriesForSortClauses(originalOpNode->targetList, - workerExtendedOpNode->sortClauseList, - &targetProjectionNumber, - &nextSortGroupRefIndex); - - newTargetEntryList = list_concat(newTargetEntryList, - newTargetEntryListForSortClauses); - } - - if (workerExtendedOpNode->windowClause) - { - List *windowClauseList = workerExtendedOpNode->windowClause; - ListCell *windowClauseCell = NULL; - - foreach(windowClauseCell, windowClauseList) - { - WindowClause *windowClause = (WindowClause *) lfirst(windowClauseCell); - - List *partitionClauseTargetList = - GenerateNewTargetEntriesForSortClauses(originalOpNode->targetList, - windowClause->partitionClause, - &targetProjectionNumber, - &nextSortGroupRefIndex); - List *orderClauseTargetList = - GenerateNewTargetEntriesForSortClauses(originalOpNode->targetList, - windowClause->orderClause, - &targetProjectionNumber, - &nextSortGroupRefIndex); - - newTargetEntryList = list_concat(newTargetEntryList, - partitionClauseTargetList); - newTargetEntryList = list_concat(newTargetEntryList, - orderClauseTargetList); - } - } - - workerExtendedOpNode->targetList = newTargetEntryList; + ExpandWorkerTargetEntry(newExpressionList, targetEntry, + workerAggContext->createGroupByClause, + queryTargetList, queryGroupClause); /* * If grouped by a partition column whose values are shards have disjoint sets @@ -2026,11 +2181,9 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, * if there is a group by, it contains distribution column). * */ - if (havingQual != NULL && - (groupedByDisjointPartitionColumn || pushDownWindowFunction)) + if (extendedOpNodeProperties->groupedByDisjointPartitionColumn || + extendedOpNodeProperties->pushDownWindowFunctions) { - workerExtendedOpNode->havingQual = originalOpNode->havingQual; - /* * We converted the having expression to a list in subquery pushdown * planner. However, this query cannot be parsed as it is in the worker. @@ -2038,14 +2191,263 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, * so that it can be parsed when it hits to the standard planner in * worker. */ - if (IsA(workerExtendedOpNode->havingQual, List)) + if (IsA(originalHavingQual, List)) { - workerExtendedOpNode->havingQual = - (Node *) make_ands_explicit((List *) workerExtendedOpNode->havingQual); + *workerHavingQual = + (Node *) make_ands_explicit((List *) originalHavingQual); + } + else + { + *workerHavingQual = originalHavingQual; + } + } +} + + +/* + * PrcoessDistinctClauseForWorkerQuery gets the inputs and modifies the outputs + * such that worker query's DISTINCT and DISTINCT ON clauses are set accordingly. + * Note the the function may or may not decide to pushdown the DISTINCT and DISTINCT + * on clauses based on the inputs. + * + * See the detailed comments in the function for the rules of pushing down DISTINCT + * and DISTINCT ON clauses to the worker queries. + * + * The function also sets distinctPreventsLimitPushdown. As the name reveals, + * distinct could prevent pushwing down LIMIT clauses later in the planning. + * For the details, see the comments in the function. + * + * inputs: distinctClause, hasDistinctOn, groupClauseList, queryHasAggregates + * outputs: queryDistinctClause, distinctPreventsLimitPushdown + * + */ +static void +ProcessDistinctClauseForWorkerQuery(List *distinctClause, bool hasDistinctOn, + List *groupClauseList, + bool queryHasAggregates, + QueryDistinctClause *queryDistinctClause, + bool *distinctPreventsLimitPushdown) +{ + bool distinctClauseSupersetofGroupClause = false; + bool shouldPushdownDistinct = false; + + if (distinctClause == NIL) + { + return; + } + + *distinctPreventsLimitPushdown = false; + + if (groupClauseList == NIL || + IsGroupBySubsetOfDistinct(groupClauseList, distinctClause)) + { + distinctClauseSupersetofGroupClause = true; + } + else + { + distinctClauseSupersetofGroupClause = false; + + /* + * GROUP BY being a subset of DISTINCT guarantees the + * distinctness on the workers. Otherwise, pushing down + * LIMIT might cause missing the necessary data from + * the worker query + */ + *distinctPreventsLimitPushdown = true; + } + + /* + * Distinct is pushed down to worker query only if the query does not + * contain an aggregate in which master processing might be required to + * complete the final result before distinct operation. We also prevent + * distinct pushdown if distinct clause is missing some entries that + * group by clause has. + */ + shouldPushdownDistinct = !queryHasAggregates && + distinctClauseSupersetofGroupClause; + if (shouldPushdownDistinct) + { + queryDistinctClause->workerDistinctClause = distinctClause; + queryDistinctClause->workerHasDistinctOn = hasDistinctOn; + } +} + + +/* + * ProcessWindowFunctionsForWorkerQuery gets the inputs and modifies the outputs such + * that worker query's workerWindowClauseList is set when the window clauses are safe to + * pushdown. + * + * TODO: Citus only supports pushing down window clauses as-is under certain circumstances. + * And, at this point in the planning, we are guaraanted to process a window function + * which is safe to pushdown as-is. It should also be possible to pull the relevant data + * to the coordinator and apply the window clauses for the remaining cases. + * + * Note that even though Citus only pushes down the window functions, it may need to + * modify the target list of the worker query when the window function refers to + * an avg(). The reason is that any aggragate which is also referred by other + * target entries would be mutated by Citus. Thus, we add a copy of the same aggragate + * to the worker target list to make sure that the window function refers to the + * non-mutated aggragate. + * + * inputs: windowClauseList, originalTargetEntryList + * outputs: queryWindowClause, queryTargetList + * + */ +static void +ProcessWindowFunctionsForWorkerQuery(List *windowClauseList, + List *originalTargetEntryList, + QueryWindowClause *queryWindowClause, + QueryTargetList *queryTargetList) +{ + ListCell *windowClauseCell = NULL; + + if (windowClauseList == NIL) + { + queryWindowClause->hasWindowFunctions = false; + + return; + } + + foreach(windowClauseCell, windowClauseList) + { + WindowClause *windowClause = (WindowClause *) lfirst(windowClauseCell); + + List *partitionClauseTargetList = + GenerateNewTargetEntriesForSortClauses(originalTargetEntryList, + windowClause->partitionClause, + &(queryTargetList-> + targetProjectionNumber), + queryWindowClause-> + nextSortGroupRefIndex); + List *orderClauseTargetList = + GenerateNewTargetEntriesForSortClauses(originalTargetEntryList, + windowClause->orderClause, + &(queryTargetList-> + targetProjectionNumber), + queryWindowClause-> + nextSortGroupRefIndex); + + /* + * Note that even Citus does push down the window clauses as-is, we may still need to + * add the generated entries to the target list. The reason is that the same aggragates + * might be referred from another target entry that is a bare aggragate (e.g., no window + * functions), which would have been mutated. For instance, when an average aggragate + * is mutated on the target list, the window function would refer to a sum aggragate, + * which is obviously wrong. + */ + queryTargetList->targetEntryList = list_concat(queryTargetList->targetEntryList, + partitionClauseTargetList); + queryTargetList->targetEntryList = list_concat(queryTargetList->targetEntryList, + orderClauseTargetList); + } + + queryWindowClause->workerWindowClauseList = windowClauseList; + queryWindowClause->hasWindowFunctions = true; +} + + +/* + * ProcessLimitOrderByForWorkerQuery gets the inputs and modifies the outputs + * such that worker query's LIMIT and ORDER BY clauses are set accordingly. + * Adding entries to ORDER BY might trigger adding new entries to newTargetEntryList. + * See GenerateNewTargetEntriesForSortClauses() for the details. + * + * For the decisions on whether and how to pushdown LIMIT and ORDER BY are documented + * in the functions that are called from this function. + * + * inputs: sortLimitReference, originalLimitCount, limitOffset, + * sortClauseList, groupClauseList, originalTargetList + * outputs: queryOrderByLimit, queryTargetList + */ +static void +ProcessLimitOrderByForWorkerQuery(OrderByLimitReference orderByLimitReference, + Node *originalLimitCount, Node *limitOffset, + List *sortClauseList, List *groupClauseList, + List *originalTargetList, + QueryOrderByLimit *queryOrderByLimit, + QueryTargetList *queryTargetList) +{ + List *newTargetEntryListForSortClauses = NIL; + + queryOrderByLimit->workerLimitCount = + WorkerLimitCount(originalLimitCount, limitOffset, orderByLimitReference); + + queryOrderByLimit->workerSortClauseList = + WorkerSortClauseList(originalLimitCount, + groupClauseList, + sortClauseList, + orderByLimitReference); + + /* + * TODO: Do we really need to add the target entries if we're not pushing + * down ORDER BY? + */ + newTargetEntryListForSortClauses = + GenerateNewTargetEntriesForSortClauses(originalTargetList, + queryOrderByLimit->workerSortClauseList, + &(queryTargetList->targetProjectionNumber), + queryOrderByLimit->nextSortGroupRefIndex); + + queryTargetList->targetEntryList = + list_concat(queryTargetList->targetEntryList, newTargetEntryListForSortClauses); +} + + +/* + * BuildLimitOrderByReference is a helper function that simply builds + * the necessary information for processing the limit and order by. + * The return value should be used in a read-only manner. + */ +static OrderByLimitReference +BuildOrderByLimitReference(bool hasDistinctOn, bool groupedByDisjointPartitionColumn, + List *groupClause, List *sortClauseList, List *targetList) +{ + OrderByLimitReference limitOrderByReference; + + limitOrderByReference.groupedByDisjointPartitionColumn = + groupedByDisjointPartitionColumn; + limitOrderByReference.hasDistinctOn = hasDistinctOn; + limitOrderByReference.groupClauseIsEmpty = (groupClause == NIL); + limitOrderByReference.sortClauseIsEmpty = (sortClauseList == NIL); + limitOrderByReference.canApproximate = + CanPushDownLimitApproximate(sortClauseList, targetList); + limitOrderByReference.hasOrderByAggregate = + HasOrderByAggregate(sortClauseList, targetList); + + return limitOrderByReference; +} + + +/* + * TargetListHasAggragates returns true if any of the elements in the + * target list contain aggragates that are not inside the window functions. + */ +static bool +TargetListHasAggragates(List *targetEntryList) +{ + ListCell *targetEntryCell = NULL; + + /* iterate over original target entries */ + foreach(targetEntryCell, targetEntryList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + Expr *targetExpr = targetEntry->expr; + bool hasAggregates = contain_agg_clause((Node *) targetExpr); + bool hasWindowFunction = contain_window_function((Node *) targetExpr); + + /* + * If the expression uses aggregates inside window function contain agg + * clause still returns true. We want to make sure it is not a part of + * window function before we proceed. + */ + if (hasAggregates && !hasWindowFunction) + { + return true; } } - return workerExtendedOpNode; + return false; } @@ -2064,24 +2466,11 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, * if the expressionList is not generated from any target entry. * - addToGroupByClause: True if the expressionList should also be added to the * worker query's GROUP BY clause. - * Outputs: - * - newTargetEntryList: Worker query's target list. The function adds one target entry - * to this list for each expression that is in expressionList. - * - targetProjectionNumber: A pointer to the worker query target list index. Should be - * incremented by one for each target entry added to - * newTargetEntryList. - * - groupClauseList: Worker query's group clause list. The function adds one entry to - * this list for any expression that is in expressionList when - * addToGroupByClause flag is true. - * - nextSortGroupRefIndex: A pointer to worker query's sort group reference index. - * Should be incremented by one for each element added to - * the groupClauseList. */ static void ExpandWorkerTargetEntry(List *expressionList, TargetEntry *originalTargetEntry, - bool addToGroupByClause, List **newTargetEntryList, - AttrNumber *targetProjectionNumber, List **groupClauseList, - Index *nextSortGroupRefIndex) + bool addToGroupByClause, QueryTargetList *queryTargetList, + QueryGroupClause *queryGroupClause) { ListCell *newExpressionCell = NULL; @@ -2094,9 +2483,10 @@ ExpandWorkerTargetEntry(List *expressionList, TargetEntry *originalTargetEntry, /* generate and add the new target entry to the target list */ newTargetEntry = GenerateWorkerTargetEntry(originalTargetEntry, newExpression, - *targetProjectionNumber); - (*targetProjectionNumber)++; - (*newTargetEntryList) = lappend(*newTargetEntryList, newTargetEntry); + queryTargetList->targetProjectionNumber); + (queryTargetList->targetProjectionNumber)++; + queryTargetList->targetEntryList = + lappend(queryTargetList->targetEntryList, newTargetEntry); /* * Detect new targets of type Var and add it to group clause list. @@ -2107,8 +2497,7 @@ ExpandWorkerTargetEntry(List *expressionList, TargetEntry *originalTargetEntry, */ if (IsA(newExpression, Var) && addToGroupByClause) { - AppendTargetEntryToGroupClause(newTargetEntry, groupClauseList, - nextSortGroupRefIndex); + AppendTargetEntryToGroupClause(newTargetEntry, queryGroupClause); } } } @@ -2200,8 +2589,8 @@ GenerateWorkerTargetEntry(TargetEntry *targetEntry, Expr *workerExpression, * appended to the group clause and the index is incremented by one. */ static void -AppendTargetEntryToGroupClause(TargetEntry *targetEntry, List **groupClause, - Index *nextSortGroupRefIndex) +AppendTargetEntryToGroupClause(TargetEntry *targetEntry, + QueryGroupClause *queryGroupClause) { Expr *targetExpr PG_USED_FOR_ASSERTS_ONLY = targetEntry->expr; Var *targetColumn = NULL; @@ -2214,14 +2603,15 @@ AppendTargetEntryToGroupClause(TargetEntry *targetEntry, List **groupClause, groupByClause = CreateSortGroupClause(targetColumn); /* the target entry should have an index */ - targetEntry->ressortgroupref = *nextSortGroupRefIndex; + targetEntry->ressortgroupref = *queryGroupClause->nextSortGroupRefIndex; /* the group by clause entry should point to the correct index in the target list */ - groupByClause->tleSortGroupRef = *nextSortGroupRefIndex; + groupByClause->tleSortGroupRef = *queryGroupClause->nextSortGroupRefIndex; /* update the group by list and the index's value */ - *groupClause = lappend(*groupClause, groupByClause); - (*nextSortGroupRefIndex)++; + queryGroupClause->groupClauseList = + lappend(queryGroupClause->groupClauseList, groupByClause); + (*queryGroupClause->nextSortGroupRefIndex)++; } @@ -3388,7 +3778,7 @@ ExtractQueryWalker(Node *node, List **queryList) /* - * WorkerLimitCount checks if the given extended node contains a limit node, and + * WorkerLimitCount checks if the given input contains a valid limit node, and * if that node can be pushed down. For this, the function checks if this limit * count or a meaningful approximation of it can be pushed down to worker nodes. * If they can, the function returns the limit count. @@ -3413,19 +3803,15 @@ ExtractQueryWalker(Node *node, List **queryList) * returns null. */ static Node * -WorkerLimitCount(MultiExtendedOp *originalOpNode, - bool groupedByDisjointPartitionColumn) +WorkerLimitCount(Node *limitCount, Node *limitOffset, OrderByLimitReference + orderByLimitReference) { Node *workerLimitNode = NULL; - List *groupClauseList = originalOpNode->groupClauseList; - List *sortClauseList = originalOpNode->sortClauseList; - List *targetList = originalOpNode->targetList; - bool hasOrderByAggregate = HasOrderByAggregate(sortClauseList, targetList); bool canPushDownLimit = false; bool canApproximate = false; /* no limit node to push down */ - if (originalOpNode->limitCount == NULL) + if (limitCount == NULL) { return NULL; } @@ -3435,9 +3821,8 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode, * certain expressions such as parameters are not evaluated and converted * into Consts on the op node. */ - Assert(IsA(originalOpNode->limitCount, Const)); - Assert(originalOpNode->limitOffset == NULL || - IsA(originalOpNode->limitOffset, Const)); + Assert(IsA(limitCount, Const)); + Assert(limitOffset == NULL || IsA(limitOffset, Const)); /* * If we don't have group by clauses, or we have group by partition column, @@ -3445,31 +3830,32 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode, * original limit. Else if we have order by clauses with commutative aggregates, * we can push down approximate limits. */ - if (groupClauseList == NIL || groupedByDisjointPartitionColumn) + if (orderByLimitReference.groupClauseIsEmpty || + orderByLimitReference.groupedByDisjointPartitionColumn) { canPushDownLimit = true; } - else if (sortClauseList == NIL) + else if (orderByLimitReference.sortClauseIsEmpty) { canPushDownLimit = false; } - else if (!hasOrderByAggregate) + else if (!orderByLimitReference.hasOrderByAggregate) { canPushDownLimit = true; } else { - canApproximate = CanPushDownLimitApproximate(sortClauseList, targetList); + canApproximate = orderByLimitReference.canApproximate; } /* create the workerLimitNode according to the decisions above */ if (canPushDownLimit) { - workerLimitNode = (Node *) copyObject(originalOpNode->limitCount); + workerLimitNode = (Node *) copyObject(limitCount); } else if (canApproximate) { - Const *workerLimitConst = (Const *) copyObject(originalOpNode->limitCount); + Const *workerLimitConst = (Const *) copyObject(limitCount); int64 workerLimitCount = (int64) LimitClauseRowFetchCount; workerLimitConst->constvalue = Int64GetDatum(workerLimitCount); @@ -3480,10 +3866,10 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode, * If offset clause is present and limit can be pushed down (whether exactly or * approximately), add the offset value to limit on workers */ - if (workerLimitNode != NULL && originalOpNode->limitOffset != NULL) + if (workerLimitNode != NULL && limitOffset != NULL) { Const *workerLimitConst = (Const *) workerLimitNode; - Const *workerOffsetConst = (Const *) originalOpNode->limitOffset; + Const *workerOffsetConst = (Const *) limitOffset; int64 workerLimitCount = DatumGetInt64(workerLimitConst->constvalue); int64 workerOffsetCount = DatumGetInt64(workerOffsetConst->constvalue); @@ -3506,27 +3892,26 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode, /* - * WorkerSortClauseList first checks if the given extended node contains a limit + * WorkerSortClauseList first checks if the given input contains a limit * or hasDistinctOn that can be pushed down. If it does, the function then * checks if we need to add any sorting and grouping clauses to the sort list we * push down for the limit. If we do, the function adds these clauses and * returns them. Otherwise, the function returns null. */ static List * -WorkerSortClauseList(MultiExtendedOp *originalOpNode, - bool groupedByDisjointPartitionColumn) +WorkerSortClauseList(Node *limitCount, List *groupClauseList, List *sortClauseList, + OrderByLimitReference orderByLimitReference) { List *workerSortClauseList = NIL; - List *groupClauseList = originalOpNode->groupClauseList; - List *sortClauseList = copyObject(originalOpNode->sortClauseList); - List *targetList = originalOpNode->targetList; /* if no limit node and no hasDistinctOn, no need to push down sort clauses */ - if (originalOpNode->limitCount == NULL && !originalOpNode->hasDistinctOn) + if (limitCount == NULL && !orderByLimitReference.hasDistinctOn) { return NIL; } + sortClauseList = copyObject(sortClauseList); + /* * If we are pushing down the limit, push down any order by clauses. Also if * we are pushing down the limit because the order by clauses don't have any @@ -3535,14 +3920,15 @@ WorkerSortClauseList(MultiExtendedOp *originalOpNode, * in different task results. By ordering on the group by clause, we ensure * that query results are consistent. */ - if (groupClauseList == NIL || groupedByDisjointPartitionColumn) + if (orderByLimitReference.groupClauseIsEmpty || + orderByLimitReference.groupedByDisjointPartitionColumn) { workerSortClauseList = sortClauseList; } else if (sortClauseList != NIL) { - bool orderByNonAggregates = !(HasOrderByAggregate(sortClauseList, targetList)); - bool canApproximate = CanPushDownLimitApproximate(sortClauseList, targetList); + bool orderByNonAggregates = !orderByLimitReference.hasOrderByAggregate; + bool canApproximate = orderByLimitReference.canApproximate; if (orderByNonAggregates) {