diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index b63a70681..9c3611ec9 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -129,6 +129,20 @@ static Expr * AddTypeConversion(Node *originalAggregate, Node *newExpression); static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, ExtendedOpNodeProperties * extendedOpNodeProperties); +static void ExpandWorkerTargetEntry(List *expressionList, + TargetEntry *originalTargetEntry, + bool addToGroupByClause, + List **newTargetEntryList, + AttrNumber *targetProjectionNumber, + List **groupClauseList, + Index *nextSortGroupRefIndex); +static Index GetNextSortGroupRef(List *targetEntryList); +static TargetEntry * GenerateWorkerTargetEntry(TargetEntry *targetEntry, + Expr *workerExpression, + AttrNumber targetProjectionNumber); +static void AppendTargetEntryToGroupClause(TargetEntry *targetEntry, + List **groupClause, + Index *nextSortGroupRefIndex); static bool WorkerAggregateWalker(Node *node, WorkerAggregateWalkerContext *walkerContext); static List * WorkerAggregateExpressionList(Aggref *originalAggregate, @@ -1833,7 +1847,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, bool queryHasAggregates = false; bool distinctClauseSupersetofGroupClause = false; bool distinctPreventsLimitPushdown = false; - bool createdNewGroupByClause = false; + bool createNewGroupByClause = false; bool groupedByDisjointPartitionColumn = extendedOpNodeProperties->groupedByDisjointPartitionColumn; bool pushDownWindowFunction = extendedOpNodeProperties->pushDownWindowFunctions; @@ -1841,18 +1855,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, walkerContext->expressionList = NIL; walkerContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns; - /* find max of sort group ref index */ - foreach(targetEntryCell, targetEntryList) - { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - if (targetEntry->ressortgroupref > nextSortGroupRefIndex) - { - nextSortGroupRefIndex = targetEntry->ressortgroupref; - } - } - - /* next group ref index starts from max group ref index + 1 */ - nextSortGroupRefIndex++; + nextSortGroupRefIndex = GetNextSortGroupRef(targetEntryList); /* iterate over original target entries */ foreach(targetEntryCell, targetEntryList) @@ -1860,7 +1863,6 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, TargetEntry *originalTargetEntry = (TargetEntry *) lfirst(targetEntryCell); Expr *originalExpression = originalTargetEntry->expr; List *newExpressionList = NIL; - ListCell *newExpressionCell = NULL; bool hasAggregates = contain_agg_clause((Node *) originalExpression); bool hasWindowFunction = contain_window_function((Node *) originalExpression); @@ -1885,55 +1887,19 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, newExpressionList = list_make1(originalExpression); } - /* now create target entries for each new expression */ - foreach(newExpressionCell, newExpressionList) - { - Expr *newExpression = (Expr *) lfirst(newExpressionCell); - TargetEntry *newTargetEntry = copyObject(originalTargetEntry); - newTargetEntry->expr = newExpression; + createNewGroupByClause = walkerContext->createGroupByClause; - /* - * Detect new targets of type Var and add it to group clause list. - * This case is expected only if the target entry has aggregates and - * it is inside a repartitioned subquery. We create group by entry - * for each Var in target list. This code does not check if this - * Var was already in the target list or in group by clauses. - */ - if (IsA(newExpression, Var) && walkerContext->createGroupByClause) - { - Var *column = (Var *) newTargetEntry->expr; - SortGroupClause *groupByClause = CreateSortGroupClause(column); - newTargetEntry->ressortgroupref = nextSortGroupRefIndex; - groupByClause->tleSortGroupRef = nextSortGroupRefIndex; - - groupClauseList = lappend(groupClauseList, groupByClause); - nextSortGroupRefIndex++; - - createdNewGroupByClause = true; - } - - if (newTargetEntry->resname == NULL) - { - StringInfo columnNameString = makeStringInfo(); - appendStringInfo(columnNameString, WORKER_COLUMN_FORMAT, - targetProjectionNumber); - - newTargetEntry->resname = columnNameString->data; - } - - /* force resjunk to false as we may need this on the master */ - newTargetEntry->resjunk = false; - newTargetEntry->resno = targetProjectionNumber; - targetProjectionNumber++; - newTargetEntryList = lappend(newTargetEntryList, newTargetEntry); - } + ExpandWorkerTargetEntry(newExpressionList, originalTargetEntry, + createNewGroupByClause, &newTargetEntryList, + &targetProjectionNumber, &groupClauseList, + &nextSortGroupRefIndex); } /* we also need to add having expressions to worker target list */ if (havingQual != NULL) { List *newExpressionList = NIL; - ListCell *newExpressionCell = NULL; + TargetEntry *targetEntry = NULL; /* reset walker context */ walkerContext->expressionList = NIL; @@ -1942,39 +1908,12 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, WorkerAggregateWalker(havingQual, walkerContext); newExpressionList = walkerContext->expressionList; - /* now create target entries for each new expression */ - foreach(newExpressionCell, newExpressionList) - { - TargetEntry *newTargetEntry = makeNode(TargetEntry); - StringInfo columnNameString = makeStringInfo(); + createNewGroupByClause = walkerContext->createGroupByClause; - Expr *newExpression = (Expr *) lfirst(newExpressionCell); - newTargetEntry->expr = newExpression; - - appendStringInfo(columnNameString, WORKER_COLUMN_FORMAT, - targetProjectionNumber); - newTargetEntry->resname = columnNameString->data; - - /* force resjunk to false as we may need this on the master */ - newTargetEntry->resjunk = false; - newTargetEntry->resno = targetProjectionNumber; - - if (IsA(newExpression, Var) && walkerContext->createGroupByClause) - { - Var *column = (Var *) newTargetEntry->expr; - SortGroupClause *groupByClause = CreateSortGroupClause(column); - newTargetEntry->ressortgroupref = nextSortGroupRefIndex; - groupByClause->tleSortGroupRef = nextSortGroupRefIndex; - - groupClauseList = lappend(groupClauseList, groupByClause); - nextSortGroupRefIndex++; - - createdNewGroupByClause = true; - } - - newTargetEntryList = lappend(newTargetEntryList, newTargetEntry); - targetProjectionNumber++; - } + ExpandWorkerTargetEntry(newExpressionList, targetEntry, + createNewGroupByClause, &newTargetEntryList, + &targetProjectionNumber, &groupClauseList, + &nextSortGroupRefIndex); } workerExtendedOpNode = CitusMakeNode(MultiExtendedOp); @@ -2027,7 +1966,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, * (1) We do not create a new group by clause during aggregate mutation, and * (2) There distinct clause does not prevent limit pushdown */ - if (!createdNewGroupByClause && !distinctPreventsLimitPushdown) + if (!createNewGroupByClause && !distinctPreventsLimitPushdown) { List *newTargetEntryListForSortClauses = NIL; @@ -2110,6 +2049,182 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, } +/* + * ExpandWorkerTargetEntry is a utility function which processes the + * expressions that are intended to be added to the worker target list. + * + * In summary, the function gets a list of expressions, converts them to target + * entries and updates all the necessary fields such that the expression is correctly + * added to the worker query's target list. + * + * Inputs: + * - expressionList: The list of expressions that should be added to the worker query's + * target list. + * - originalTargetEntry: Target entry that the expressionList generated for. NULL + * if the expressionList is not generated from any target entry. + * - addToGroupByClause: True if the expressionList should also be added to the + * worker query's GROUP BY clause. + * Outputs: + * - newTargetEntryList: Worker query's target list. The function adds one target entry + * to this list for each expression that is in expressionList. + * - targetProjectionNumber: A pointer to the worker query target list index. Should be + * incremented by one for each target entry added to + * newTargetEntryList. + * - groupClauseList: Worker query's group clause list. The function adds one entry to + * this list for any expression that is in expressionList when + * addToGroupByClause flag is true. + * - nextSortGroupRefIndex: A pointer to worker query's sort group reference index. + * Should be incremented by one for each element added to + * the groupClauseList. + */ +static void +ExpandWorkerTargetEntry(List *expressionList, TargetEntry *originalTargetEntry, + bool addToGroupByClause, List **newTargetEntryList, + AttrNumber *targetProjectionNumber, List **groupClauseList, + Index *nextSortGroupRefIndex) +{ + ListCell *newExpressionCell = NULL; + + /* now create target entries for each new expression */ + foreach(newExpressionCell, expressionList) + { + Expr *newExpression = (Expr *) lfirst(newExpressionCell); + TargetEntry *newTargetEntry = NULL; + + /* generate and add the new target entry to the target list */ + newTargetEntry = + GenerateWorkerTargetEntry(originalTargetEntry, newExpression, + *targetProjectionNumber); + (*targetProjectionNumber)++; + (*newTargetEntryList) = lappend(*newTargetEntryList, newTargetEntry); + + /* + * Detect new targets of type Var and add it to group clause list. + * This case is expected only if the target entry has aggregates and + * it is inside a repartitioned subquery. We create group by entry + * for each Var in target list. This code does not check if this + * Var was already in the target list or in group by clauses. + */ + if (IsA(newExpression, Var) && addToGroupByClause) + { + AppendTargetEntryToGroupClause(newTargetEntry, groupClauseList, + nextSortGroupRefIndex); + } + } +} + + +/* + * GetNextSortGroupRef gets a target list entry and returns + * the next ressortgroupref that should be used based on the + * input target list. + */ +static Index +GetNextSortGroupRef(List *targetEntryList) +{ + ListCell *targetEntryCell = NULL; + Index nextSortGroupRefIndex = 0; + + /* find max of sort group ref index */ + foreach(targetEntryCell, targetEntryList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + + if (targetEntry->ressortgroupref > nextSortGroupRefIndex) + { + nextSortGroupRefIndex = targetEntry->ressortgroupref; + } + } + + /* next group ref index starts from max group ref index + 1 */ + nextSortGroupRefIndex++; + + return nextSortGroupRefIndex; +} + + +/* + * GenerateWorkerTargetEntry is a simple utility function which gets a + * target entry, an expression and a targetProjectionNumber. + * + * The function returns a newly allocated target entry which can be added + * to the worker's target list. + */ +static TargetEntry * +GenerateWorkerTargetEntry(TargetEntry *targetEntry, Expr *workerExpression, + AttrNumber targetProjectionNumber) +{ + TargetEntry *newTargetEntry = NULL; + + /* + * If a target entry is already provided, use a copy of + * it because some of the callers rely on resorigtbl and + * resorigcol. + */ + if (targetEntry) + { + newTargetEntry = copyObject(targetEntry); + } + else + { + newTargetEntry = makeNode(TargetEntry); + } + + if (newTargetEntry->resname == NULL) + { + StringInfo columnNameString = makeStringInfo(); + + appendStringInfo(columnNameString, WORKER_COLUMN_FORMAT, + targetProjectionNumber); + + newTargetEntry->resname = columnNameString->data; + } + + /* we can generate a target entry without any expressions */ + Assert(workerExpression != NULL); + + /* force resjunk to false as we may need this on the master */ + newTargetEntry->expr = workerExpression; + newTargetEntry->resjunk = false; + newTargetEntry->resno = targetProjectionNumber; + + return newTargetEntry; +} + + +/* + * AppendTargetEntryToGroupClause gets a target entry, pointer to group list + * and the ressortgroupref index. + * + * The function modifies all of the three input such that the target entry is + * appended to the group clause and the index is incremented by one. + */ +static void +AppendTargetEntryToGroupClause(TargetEntry *targetEntry, List **groupClause, + Index *nextSortGroupRefIndex) +{ + Expr *targetExpr PG_USED_FOR_ASSERTS_ONLY = targetEntry->expr; + Var *targetColumn = NULL; + SortGroupClause *groupByClause = NULL; + + /* we currently only support appending Var target entries */ + AssertArg(IsA(targetExpr, Var)); + + targetColumn = (Var *) targetEntry->expr; + groupByClause = CreateSortGroupClause(targetColumn); + + /* the target entry should have an index */ + targetEntry->ressortgroupref = *nextSortGroupRefIndex; + + /* the group by clause entry should point to the correct index in the target list */ + groupByClause->tleSortGroupRef = *nextSortGroupRefIndex; + + /* update the group by list and the index's value */ + *groupClause = lappend(*groupClause, groupByClause); + (*nextSortGroupRefIndex)++; +} + + /* * WorkerAggregateWalker walks over the original target entry expression, and * creates the list of expression trees (potentially more than one) to execute