mirror of https://github.com/citusdata/citus.git
Merge pull request #2090 from citusdata/simplify_optimizer_phase_2
Refactor logical optimizer [Phase 2] - Eliminate code duplication in `WorkerExtendedOpNode()`pull/2117/head
commit
f0b50f2f99
|
@ -129,6 +129,20 @@ static Expr * AddTypeConversion(Node *originalAggregate, Node *newExpression);
|
|||
static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
||||
ExtendedOpNodeProperties *
|
||||
extendedOpNodeProperties);
|
||||
static void ExpandWorkerTargetEntry(List *expressionList,
|
||||
TargetEntry *originalTargetEntry,
|
||||
bool addToGroupByClause,
|
||||
List **newTargetEntryList,
|
||||
AttrNumber *targetProjectionNumber,
|
||||
List **groupClauseList,
|
||||
Index *nextSortGroupRefIndex);
|
||||
static Index GetNextSortGroupRef(List *targetEntryList);
|
||||
static TargetEntry * GenerateWorkerTargetEntry(TargetEntry *targetEntry,
|
||||
Expr *workerExpression,
|
||||
AttrNumber targetProjectionNumber);
|
||||
static void AppendTargetEntryToGroupClause(TargetEntry *targetEntry,
|
||||
List **groupClause,
|
||||
Index *nextSortGroupRefIndex);
|
||||
static bool WorkerAggregateWalker(Node *node,
|
||||
WorkerAggregateWalkerContext *walkerContext);
|
||||
static List * WorkerAggregateExpressionList(Aggref *originalAggregate,
|
||||
|
@ -1833,7 +1847,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
|||
bool queryHasAggregates = false;
|
||||
bool distinctClauseSupersetofGroupClause = false;
|
||||
bool distinctPreventsLimitPushdown = false;
|
||||
bool createdNewGroupByClause = false;
|
||||
bool createNewGroupByClause = false;
|
||||
bool groupedByDisjointPartitionColumn =
|
||||
extendedOpNodeProperties->groupedByDisjointPartitionColumn;
|
||||
bool pushDownWindowFunction = extendedOpNodeProperties->pushDownWindowFunctions;
|
||||
|
@ -1841,18 +1855,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
|||
walkerContext->expressionList = NIL;
|
||||
walkerContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns;
|
||||
|
||||
/* find max of sort group ref index */
|
||||
foreach(targetEntryCell, targetEntryList)
|
||||
{
|
||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||
if (targetEntry->ressortgroupref > nextSortGroupRefIndex)
|
||||
{
|
||||
nextSortGroupRefIndex = targetEntry->ressortgroupref;
|
||||
}
|
||||
}
|
||||
|
||||
/* next group ref index starts from max group ref index + 1 */
|
||||
nextSortGroupRefIndex++;
|
||||
nextSortGroupRefIndex = GetNextSortGroupRef(targetEntryList);
|
||||
|
||||
/* iterate over original target entries */
|
||||
foreach(targetEntryCell, targetEntryList)
|
||||
|
@ -1860,7 +1863,6 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
|||
TargetEntry *originalTargetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||
Expr *originalExpression = originalTargetEntry->expr;
|
||||
List *newExpressionList = NIL;
|
||||
ListCell *newExpressionCell = NULL;
|
||||
bool hasAggregates = contain_agg_clause((Node *) originalExpression);
|
||||
bool hasWindowFunction = contain_window_function((Node *) originalExpression);
|
||||
|
||||
|
@ -1885,55 +1887,19 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
|||
newExpressionList = list_make1(originalExpression);
|
||||
}
|
||||
|
||||
/* now create target entries for each new expression */
|
||||
foreach(newExpressionCell, newExpressionList)
|
||||
{
|
||||
Expr *newExpression = (Expr *) lfirst(newExpressionCell);
|
||||
TargetEntry *newTargetEntry = copyObject(originalTargetEntry);
|
||||
newTargetEntry->expr = newExpression;
|
||||
createNewGroupByClause = walkerContext->createGroupByClause;
|
||||
|
||||
/*
|
||||
* Detect new targets of type Var and add it to group clause list.
|
||||
* This case is expected only if the target entry has aggregates and
|
||||
* it is inside a repartitioned subquery. We create group by entry
|
||||
* for each Var in target list. This code does not check if this
|
||||
* Var was already in the target list or in group by clauses.
|
||||
*/
|
||||
if (IsA(newExpression, Var) && walkerContext->createGroupByClause)
|
||||
{
|
||||
Var *column = (Var *) newTargetEntry->expr;
|
||||
SortGroupClause *groupByClause = CreateSortGroupClause(column);
|
||||
newTargetEntry->ressortgroupref = nextSortGroupRefIndex;
|
||||
groupByClause->tleSortGroupRef = nextSortGroupRefIndex;
|
||||
|
||||
groupClauseList = lappend(groupClauseList, groupByClause);
|
||||
nextSortGroupRefIndex++;
|
||||
|
||||
createdNewGroupByClause = true;
|
||||
}
|
||||
|
||||
if (newTargetEntry->resname == NULL)
|
||||
{
|
||||
StringInfo columnNameString = makeStringInfo();
|
||||
appendStringInfo(columnNameString, WORKER_COLUMN_FORMAT,
|
||||
targetProjectionNumber);
|
||||
|
||||
newTargetEntry->resname = columnNameString->data;
|
||||
}
|
||||
|
||||
/* force resjunk to false as we may need this on the master */
|
||||
newTargetEntry->resjunk = false;
|
||||
newTargetEntry->resno = targetProjectionNumber;
|
||||
targetProjectionNumber++;
|
||||
newTargetEntryList = lappend(newTargetEntryList, newTargetEntry);
|
||||
}
|
||||
ExpandWorkerTargetEntry(newExpressionList, originalTargetEntry,
|
||||
createNewGroupByClause, &newTargetEntryList,
|
||||
&targetProjectionNumber, &groupClauseList,
|
||||
&nextSortGroupRefIndex);
|
||||
}
|
||||
|
||||
/* we also need to add having expressions to worker target list */
|
||||
if (havingQual != NULL)
|
||||
{
|
||||
List *newExpressionList = NIL;
|
||||
ListCell *newExpressionCell = NULL;
|
||||
TargetEntry *targetEntry = NULL;
|
||||
|
||||
/* reset walker context */
|
||||
walkerContext->expressionList = NIL;
|
||||
|
@ -1942,39 +1908,12 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
|||
WorkerAggregateWalker(havingQual, walkerContext);
|
||||
newExpressionList = walkerContext->expressionList;
|
||||
|
||||
/* now create target entries for each new expression */
|
||||
foreach(newExpressionCell, newExpressionList)
|
||||
{
|
||||
TargetEntry *newTargetEntry = makeNode(TargetEntry);
|
||||
StringInfo columnNameString = makeStringInfo();
|
||||
createNewGroupByClause = walkerContext->createGroupByClause;
|
||||
|
||||
Expr *newExpression = (Expr *) lfirst(newExpressionCell);
|
||||
newTargetEntry->expr = newExpression;
|
||||
|
||||
appendStringInfo(columnNameString, WORKER_COLUMN_FORMAT,
|
||||
targetProjectionNumber);
|
||||
newTargetEntry->resname = columnNameString->data;
|
||||
|
||||
/* force resjunk to false as we may need this on the master */
|
||||
newTargetEntry->resjunk = false;
|
||||
newTargetEntry->resno = targetProjectionNumber;
|
||||
|
||||
if (IsA(newExpression, Var) && walkerContext->createGroupByClause)
|
||||
{
|
||||
Var *column = (Var *) newTargetEntry->expr;
|
||||
SortGroupClause *groupByClause = CreateSortGroupClause(column);
|
||||
newTargetEntry->ressortgroupref = nextSortGroupRefIndex;
|
||||
groupByClause->tleSortGroupRef = nextSortGroupRefIndex;
|
||||
|
||||
groupClauseList = lappend(groupClauseList, groupByClause);
|
||||
nextSortGroupRefIndex++;
|
||||
|
||||
createdNewGroupByClause = true;
|
||||
}
|
||||
|
||||
newTargetEntryList = lappend(newTargetEntryList, newTargetEntry);
|
||||
targetProjectionNumber++;
|
||||
}
|
||||
ExpandWorkerTargetEntry(newExpressionList, targetEntry,
|
||||
createNewGroupByClause, &newTargetEntryList,
|
||||
&targetProjectionNumber, &groupClauseList,
|
||||
&nextSortGroupRefIndex);
|
||||
}
|
||||
|
||||
workerExtendedOpNode = CitusMakeNode(MultiExtendedOp);
|
||||
|
@ -2027,7 +1966,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
|||
* (1) We do not create a new group by clause during aggregate mutation, and
|
||||
* (2) There distinct clause does not prevent limit pushdown
|
||||
*/
|
||||
if (!createdNewGroupByClause && !distinctPreventsLimitPushdown)
|
||||
if (!createNewGroupByClause && !distinctPreventsLimitPushdown)
|
||||
{
|
||||
List *newTargetEntryListForSortClauses = NIL;
|
||||
|
||||
|
@ -2110,6 +2049,182 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExpandWorkerTargetEntry is a utility function which processes the
|
||||
* expressions that are intended to be added to the worker target list.
|
||||
*
|
||||
* In summary, the function gets a list of expressions, converts them to target
|
||||
* entries and updates all the necessary fields such that the expression is correctly
|
||||
* added to the worker query's target list.
|
||||
*
|
||||
* Inputs:
|
||||
* - expressionList: The list of expressions that should be added to the worker query's
|
||||
* target list.
|
||||
* - originalTargetEntry: Target entry that the expressionList generated for. NULL
|
||||
* if the expressionList is not generated from any target entry.
|
||||
* - addToGroupByClause: True if the expressionList should also be added to the
|
||||
* worker query's GROUP BY clause.
|
||||
* Outputs:
|
||||
* - newTargetEntryList: Worker query's target list. The function adds one target entry
|
||||
* to this list for each expression that is in expressionList.
|
||||
* - targetProjectionNumber: A pointer to the worker query target list index. Should be
|
||||
* incremented by one for each target entry added to
|
||||
* newTargetEntryList.
|
||||
* - groupClauseList: Worker query's group clause list. The function adds one entry to
|
||||
* this list for any expression that is in expressionList when
|
||||
* addToGroupByClause flag is true.
|
||||
* - nextSortGroupRefIndex: A pointer to worker query's sort group reference index.
|
||||
* Should be incremented by one for each element added to
|
||||
* the groupClauseList.
|
||||
*/
|
||||
static void
|
||||
ExpandWorkerTargetEntry(List *expressionList, TargetEntry *originalTargetEntry,
|
||||
bool addToGroupByClause, List **newTargetEntryList,
|
||||
AttrNumber *targetProjectionNumber, List **groupClauseList,
|
||||
Index *nextSortGroupRefIndex)
|
||||
{
|
||||
ListCell *newExpressionCell = NULL;
|
||||
|
||||
/* now create target entries for each new expression */
|
||||
foreach(newExpressionCell, expressionList)
|
||||
{
|
||||
Expr *newExpression = (Expr *) lfirst(newExpressionCell);
|
||||
TargetEntry *newTargetEntry = NULL;
|
||||
|
||||
/* generate and add the new target entry to the target list */
|
||||
newTargetEntry =
|
||||
GenerateWorkerTargetEntry(originalTargetEntry, newExpression,
|
||||
*targetProjectionNumber);
|
||||
(*targetProjectionNumber)++;
|
||||
(*newTargetEntryList) = lappend(*newTargetEntryList, newTargetEntry);
|
||||
|
||||
/*
|
||||
* Detect new targets of type Var and add it to group clause list.
|
||||
* This case is expected only if the target entry has aggregates and
|
||||
* it is inside a repartitioned subquery. We create group by entry
|
||||
* for each Var in target list. This code does not check if this
|
||||
* Var was already in the target list or in group by clauses.
|
||||
*/
|
||||
if (IsA(newExpression, Var) && addToGroupByClause)
|
||||
{
|
||||
AppendTargetEntryToGroupClause(newTargetEntry, groupClauseList,
|
||||
nextSortGroupRefIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetNextSortGroupRef gets a target list entry and returns
|
||||
* the next ressortgroupref that should be used based on the
|
||||
* input target list.
|
||||
*/
|
||||
static Index
|
||||
GetNextSortGroupRef(List *targetEntryList)
|
||||
{
|
||||
ListCell *targetEntryCell = NULL;
|
||||
Index nextSortGroupRefIndex = 0;
|
||||
|
||||
/* find max of sort group ref index */
|
||||
foreach(targetEntryCell, targetEntryList)
|
||||
{
|
||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||
|
||||
if (targetEntry->ressortgroupref > nextSortGroupRefIndex)
|
||||
{
|
||||
nextSortGroupRefIndex = targetEntry->ressortgroupref;
|
||||
}
|
||||
}
|
||||
|
||||
/* next group ref index starts from max group ref index + 1 */
|
||||
nextSortGroupRefIndex++;
|
||||
|
||||
return nextSortGroupRefIndex;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GenerateWorkerTargetEntry is a simple utility function which gets a
|
||||
* target entry, an expression and a targetProjectionNumber.
|
||||
*
|
||||
* The function returns a newly allocated target entry which can be added
|
||||
* to the worker's target list.
|
||||
*/
|
||||
static TargetEntry *
|
||||
GenerateWorkerTargetEntry(TargetEntry *targetEntry, Expr *workerExpression,
|
||||
AttrNumber targetProjectionNumber)
|
||||
{
|
||||
TargetEntry *newTargetEntry = NULL;
|
||||
|
||||
/*
|
||||
* If a target entry is already provided, use a copy of
|
||||
* it because some of the callers rely on resorigtbl and
|
||||
* resorigcol.
|
||||
*/
|
||||
if (targetEntry)
|
||||
{
|
||||
newTargetEntry = copyObject(targetEntry);
|
||||
}
|
||||
else
|
||||
{
|
||||
newTargetEntry = makeNode(TargetEntry);
|
||||
}
|
||||
|
||||
if (newTargetEntry->resname == NULL)
|
||||
{
|
||||
StringInfo columnNameString = makeStringInfo();
|
||||
|
||||
appendStringInfo(columnNameString, WORKER_COLUMN_FORMAT,
|
||||
targetProjectionNumber);
|
||||
|
||||
newTargetEntry->resname = columnNameString->data;
|
||||
}
|
||||
|
||||
/* we can generate a target entry without any expressions */
|
||||
Assert(workerExpression != NULL);
|
||||
|
||||
/* force resjunk to false as we may need this on the master */
|
||||
newTargetEntry->expr = workerExpression;
|
||||
newTargetEntry->resjunk = false;
|
||||
newTargetEntry->resno = targetProjectionNumber;
|
||||
|
||||
return newTargetEntry;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AppendTargetEntryToGroupClause gets a target entry, pointer to group list
|
||||
* and the ressortgroupref index.
|
||||
*
|
||||
* The function modifies all of the three input such that the target entry is
|
||||
* appended to the group clause and the index is incremented by one.
|
||||
*/
|
||||
static void
|
||||
AppendTargetEntryToGroupClause(TargetEntry *targetEntry, List **groupClause,
|
||||
Index *nextSortGroupRefIndex)
|
||||
{
|
||||
Expr *targetExpr PG_USED_FOR_ASSERTS_ONLY = targetEntry->expr;
|
||||
Var *targetColumn = NULL;
|
||||
SortGroupClause *groupByClause = NULL;
|
||||
|
||||
/* we currently only support appending Var target entries */
|
||||
AssertArg(IsA(targetExpr, Var));
|
||||
|
||||
targetColumn = (Var *) targetEntry->expr;
|
||||
groupByClause = CreateSortGroupClause(targetColumn);
|
||||
|
||||
/* the target entry should have an index */
|
||||
targetEntry->ressortgroupref = *nextSortGroupRefIndex;
|
||||
|
||||
/* the group by clause entry should point to the correct index in the target list */
|
||||
groupByClause->tleSortGroupRef = *nextSortGroupRefIndex;
|
||||
|
||||
/* update the group by list and the index's value */
|
||||
*groupClause = lappend(*groupClause, groupByClause);
|
||||
(*nextSortGroupRefIndex)++;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* WorkerAggregateWalker walks over the original target entry expression, and
|
||||
* creates the list of expression trees (potentially more than one) to execute
|
||||
|
|
Loading…
Reference in New Issue