mirror of https://github.com/citusdata/citus.git
Eliminate code duplication in WorkerExtendedOpNode()
Before this commit, we had code duplication in the WorkerExtendedOpNode(). The duplication was noticeable and any change is prone to bugs. The PR consists of 4 commits. Each commit incrementally fixes the problem by moving certain parts of the duplicated code into smaller, better-documented functions.pull/2090/head
parent
8d4c4d5c58
commit
ac8f2f1e6d
|
@ -129,6 +129,20 @@ static Expr * AddTypeConversion(Node *originalAggregate, Node *newExpression);
|
||||||
static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
||||||
ExtendedOpNodeProperties *
|
ExtendedOpNodeProperties *
|
||||||
extendedOpNodeProperties);
|
extendedOpNodeProperties);
|
||||||
|
static void ExpandWorkerTargetEntry(List *expressionList,
|
||||||
|
TargetEntry *originalTargetEntry,
|
||||||
|
bool addToGroupByClause,
|
||||||
|
List **newTargetEntryList,
|
||||||
|
AttrNumber *targetProjectionNumber,
|
||||||
|
List **groupClauseList,
|
||||||
|
Index *nextSortGroupRefIndex);
|
||||||
|
static Index GetNextSortGroupRef(List *targetEntryList);
|
||||||
|
static TargetEntry * GenerateWorkerTargetEntry(TargetEntry *targetEntry,
|
||||||
|
Expr *workerExpression,
|
||||||
|
AttrNumber targetProjectionNumber);
|
||||||
|
static void AppendTargetEntryToGroupClause(TargetEntry *targetEntry,
|
||||||
|
List **groupClause,
|
||||||
|
Index *nextSortGroupRefIndex);
|
||||||
static bool WorkerAggregateWalker(Node *node,
|
static bool WorkerAggregateWalker(Node *node,
|
||||||
WorkerAggregateWalkerContext *walkerContext);
|
WorkerAggregateWalkerContext *walkerContext);
|
||||||
static List * WorkerAggregateExpressionList(Aggref *originalAggregate,
|
static List * WorkerAggregateExpressionList(Aggref *originalAggregate,
|
||||||
|
@ -1833,7 +1847,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
||||||
bool queryHasAggregates = false;
|
bool queryHasAggregates = false;
|
||||||
bool distinctClauseSupersetofGroupClause = false;
|
bool distinctClauseSupersetofGroupClause = false;
|
||||||
bool distinctPreventsLimitPushdown = false;
|
bool distinctPreventsLimitPushdown = false;
|
||||||
bool createdNewGroupByClause = false;
|
bool createNewGroupByClause = false;
|
||||||
bool groupedByDisjointPartitionColumn =
|
bool groupedByDisjointPartitionColumn =
|
||||||
extendedOpNodeProperties->groupedByDisjointPartitionColumn;
|
extendedOpNodeProperties->groupedByDisjointPartitionColumn;
|
||||||
bool pushDownWindowFunction = extendedOpNodeProperties->pushDownWindowFunctions;
|
bool pushDownWindowFunction = extendedOpNodeProperties->pushDownWindowFunctions;
|
||||||
|
@ -1841,18 +1855,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
||||||
walkerContext->expressionList = NIL;
|
walkerContext->expressionList = NIL;
|
||||||
walkerContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns;
|
walkerContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns;
|
||||||
|
|
||||||
/* find max of sort group ref index */
|
nextSortGroupRefIndex = GetNextSortGroupRef(targetEntryList);
|
||||||
foreach(targetEntryCell, targetEntryList)
|
|
||||||
{
|
|
||||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
|
||||||
if (targetEntry->ressortgroupref > nextSortGroupRefIndex)
|
|
||||||
{
|
|
||||||
nextSortGroupRefIndex = targetEntry->ressortgroupref;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* next group ref index starts from max group ref index + 1 */
|
|
||||||
nextSortGroupRefIndex++;
|
|
||||||
|
|
||||||
/* iterate over original target entries */
|
/* iterate over original target entries */
|
||||||
foreach(targetEntryCell, targetEntryList)
|
foreach(targetEntryCell, targetEntryList)
|
||||||
|
@ -1860,7 +1863,6 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
||||||
TargetEntry *originalTargetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
TargetEntry *originalTargetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||||
Expr *originalExpression = originalTargetEntry->expr;
|
Expr *originalExpression = originalTargetEntry->expr;
|
||||||
List *newExpressionList = NIL;
|
List *newExpressionList = NIL;
|
||||||
ListCell *newExpressionCell = NULL;
|
|
||||||
bool hasAggregates = contain_agg_clause((Node *) originalExpression);
|
bool hasAggregates = contain_agg_clause((Node *) originalExpression);
|
||||||
bool hasWindowFunction = contain_window_function((Node *) originalExpression);
|
bool hasWindowFunction = contain_window_function((Node *) originalExpression);
|
||||||
|
|
||||||
|
@ -1885,55 +1887,19 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
||||||
newExpressionList = list_make1(originalExpression);
|
newExpressionList = list_make1(originalExpression);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* now create target entries for each new expression */
|
createNewGroupByClause = walkerContext->createGroupByClause;
|
||||||
foreach(newExpressionCell, newExpressionList)
|
|
||||||
{
|
|
||||||
Expr *newExpression = (Expr *) lfirst(newExpressionCell);
|
|
||||||
TargetEntry *newTargetEntry = copyObject(originalTargetEntry);
|
|
||||||
newTargetEntry->expr = newExpression;
|
|
||||||
|
|
||||||
/*
|
ExpandWorkerTargetEntry(newExpressionList, originalTargetEntry,
|
||||||
* Detect new targets of type Var and add it to group clause list.
|
createNewGroupByClause, &newTargetEntryList,
|
||||||
* This case is expected only if the target entry has aggregates and
|
&targetProjectionNumber, &groupClauseList,
|
||||||
* it is inside a repartitioned subquery. We create group by entry
|
&nextSortGroupRefIndex);
|
||||||
* for each Var in target list. This code does not check if this
|
|
||||||
* Var was already in the target list or in group by clauses.
|
|
||||||
*/
|
|
||||||
if (IsA(newExpression, Var) && walkerContext->createGroupByClause)
|
|
||||||
{
|
|
||||||
Var *column = (Var *) newTargetEntry->expr;
|
|
||||||
SortGroupClause *groupByClause = CreateSortGroupClause(column);
|
|
||||||
newTargetEntry->ressortgroupref = nextSortGroupRefIndex;
|
|
||||||
groupByClause->tleSortGroupRef = nextSortGroupRefIndex;
|
|
||||||
|
|
||||||
groupClauseList = lappend(groupClauseList, groupByClause);
|
|
||||||
nextSortGroupRefIndex++;
|
|
||||||
|
|
||||||
createdNewGroupByClause = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (newTargetEntry->resname == NULL)
|
|
||||||
{
|
|
||||||
StringInfo columnNameString = makeStringInfo();
|
|
||||||
appendStringInfo(columnNameString, WORKER_COLUMN_FORMAT,
|
|
||||||
targetProjectionNumber);
|
|
||||||
|
|
||||||
newTargetEntry->resname = columnNameString->data;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* force resjunk to false as we may need this on the master */
|
|
||||||
newTargetEntry->resjunk = false;
|
|
||||||
newTargetEntry->resno = targetProjectionNumber;
|
|
||||||
targetProjectionNumber++;
|
|
||||||
newTargetEntryList = lappend(newTargetEntryList, newTargetEntry);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we also need to add having expressions to worker target list */
|
/* we also need to add having expressions to worker target list */
|
||||||
if (havingQual != NULL)
|
if (havingQual != NULL)
|
||||||
{
|
{
|
||||||
List *newExpressionList = NIL;
|
List *newExpressionList = NIL;
|
||||||
ListCell *newExpressionCell = NULL;
|
TargetEntry *targetEntry = NULL;
|
||||||
|
|
||||||
/* reset walker context */
|
/* reset walker context */
|
||||||
walkerContext->expressionList = NIL;
|
walkerContext->expressionList = NIL;
|
||||||
|
@ -1942,39 +1908,12 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
||||||
WorkerAggregateWalker(havingQual, walkerContext);
|
WorkerAggregateWalker(havingQual, walkerContext);
|
||||||
newExpressionList = walkerContext->expressionList;
|
newExpressionList = walkerContext->expressionList;
|
||||||
|
|
||||||
/* now create target entries for each new expression */
|
createNewGroupByClause = walkerContext->createGroupByClause;
|
||||||
foreach(newExpressionCell, newExpressionList)
|
|
||||||
{
|
|
||||||
TargetEntry *newTargetEntry = makeNode(TargetEntry);
|
|
||||||
StringInfo columnNameString = makeStringInfo();
|
|
||||||
|
|
||||||
Expr *newExpression = (Expr *) lfirst(newExpressionCell);
|
ExpandWorkerTargetEntry(newExpressionList, targetEntry,
|
||||||
newTargetEntry->expr = newExpression;
|
createNewGroupByClause, &newTargetEntryList,
|
||||||
|
&targetProjectionNumber, &groupClauseList,
|
||||||
appendStringInfo(columnNameString, WORKER_COLUMN_FORMAT,
|
&nextSortGroupRefIndex);
|
||||||
targetProjectionNumber);
|
|
||||||
newTargetEntry->resname = columnNameString->data;
|
|
||||||
|
|
||||||
/* force resjunk to false as we may need this on the master */
|
|
||||||
newTargetEntry->resjunk = false;
|
|
||||||
newTargetEntry->resno = targetProjectionNumber;
|
|
||||||
|
|
||||||
if (IsA(newExpression, Var) && walkerContext->createGroupByClause)
|
|
||||||
{
|
|
||||||
Var *column = (Var *) newTargetEntry->expr;
|
|
||||||
SortGroupClause *groupByClause = CreateSortGroupClause(column);
|
|
||||||
newTargetEntry->ressortgroupref = nextSortGroupRefIndex;
|
|
||||||
groupByClause->tleSortGroupRef = nextSortGroupRefIndex;
|
|
||||||
|
|
||||||
groupClauseList = lappend(groupClauseList, groupByClause);
|
|
||||||
nextSortGroupRefIndex++;
|
|
||||||
|
|
||||||
createdNewGroupByClause = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
newTargetEntryList = lappend(newTargetEntryList, newTargetEntry);
|
|
||||||
targetProjectionNumber++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
workerExtendedOpNode = CitusMakeNode(MultiExtendedOp);
|
workerExtendedOpNode = CitusMakeNode(MultiExtendedOp);
|
||||||
|
@ -2027,7 +1966,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
||||||
* (1) We do not create a new group by clause during aggregate mutation, and
|
* (1) We do not create a new group by clause during aggregate mutation, and
|
||||||
* (2) There distinct clause does not prevent limit pushdown
|
* (2) There distinct clause does not prevent limit pushdown
|
||||||
*/
|
*/
|
||||||
if (!createdNewGroupByClause && !distinctPreventsLimitPushdown)
|
if (!createNewGroupByClause && !distinctPreventsLimitPushdown)
|
||||||
{
|
{
|
||||||
List *newTargetEntryListForSortClauses = NIL;
|
List *newTargetEntryListForSortClauses = NIL;
|
||||||
|
|
||||||
|
@ -2110,6 +2049,182 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExpandWorkerTargetEntry is a utility function which processes the
|
||||||
|
* expressions that are intended to be added to the worker target list.
|
||||||
|
*
|
||||||
|
* In summary, the function gets a list of expressions, converts them to target
|
||||||
|
* entries and updates all the necessary fields such that the expression is correctly
|
||||||
|
* added to the worker query's target list.
|
||||||
|
*
|
||||||
|
* Inputs:
|
||||||
|
* - expressionList: The list of expressions that should be added to the worker query's
|
||||||
|
* target list.
|
||||||
|
* - originalTargetEntry: Target entry that the expressionList generated for. NULL
|
||||||
|
* if the expressionList is not generated from any target entry.
|
||||||
|
* - addToGroupByClause: True if the expressionList should also be added to the
|
||||||
|
* worker query's GROUP BY clause.
|
||||||
|
* Outputs:
|
||||||
|
* - newTargetEntryList: Worker query's target list. The function adds one target entry
|
||||||
|
* to this list for each expression that is in expressionList.
|
||||||
|
* - targetProjectionNumber: A pointer to the worker query target list index. Should be
|
||||||
|
* incremented by one for each target entry added to
|
||||||
|
* newTargetEntryList.
|
||||||
|
* - groupClauseList: Worker query's group clause list. The function adds one entry to
|
||||||
|
* this list for any expression that is in expressionList when
|
||||||
|
* addToGroupByClause flag is true.
|
||||||
|
* - nextSortGroupRefIndex: A pointer to worker query's sort group reference index.
|
||||||
|
* Should be incremented by one for each element added to
|
||||||
|
* the groupClauseList.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ExpandWorkerTargetEntry(List *expressionList, TargetEntry *originalTargetEntry,
|
||||||
|
bool addToGroupByClause, List **newTargetEntryList,
|
||||||
|
AttrNumber *targetProjectionNumber, List **groupClauseList,
|
||||||
|
Index *nextSortGroupRefIndex)
|
||||||
|
{
|
||||||
|
ListCell *newExpressionCell = NULL;
|
||||||
|
|
||||||
|
/* now create target entries for each new expression */
|
||||||
|
foreach(newExpressionCell, expressionList)
|
||||||
|
{
|
||||||
|
Expr *newExpression = (Expr *) lfirst(newExpressionCell);
|
||||||
|
TargetEntry *newTargetEntry = NULL;
|
||||||
|
|
||||||
|
/* generate and add the new target entry to the target list */
|
||||||
|
newTargetEntry =
|
||||||
|
GenerateWorkerTargetEntry(originalTargetEntry, newExpression,
|
||||||
|
*targetProjectionNumber);
|
||||||
|
(*targetProjectionNumber)++;
|
||||||
|
(*newTargetEntryList) = lappend(*newTargetEntryList, newTargetEntry);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Detect new targets of type Var and add it to group clause list.
|
||||||
|
* This case is expected only if the target entry has aggregates and
|
||||||
|
* it is inside a repartitioned subquery. We create group by entry
|
||||||
|
* for each Var in target list. This code does not check if this
|
||||||
|
* Var was already in the target list or in group by clauses.
|
||||||
|
*/
|
||||||
|
if (IsA(newExpression, Var) && addToGroupByClause)
|
||||||
|
{
|
||||||
|
AppendTargetEntryToGroupClause(newTargetEntry, groupClauseList,
|
||||||
|
nextSortGroupRefIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetNextSortGroupRef gets a target list entry and returns
|
||||||
|
* the next ressortgroupref that should be used based on the
|
||||||
|
* input target list.
|
||||||
|
*/
|
||||||
|
static Index
|
||||||
|
GetNextSortGroupRef(List *targetEntryList)
|
||||||
|
{
|
||||||
|
ListCell *targetEntryCell = NULL;
|
||||||
|
Index nextSortGroupRefIndex = 0;
|
||||||
|
|
||||||
|
/* find max of sort group ref index */
|
||||||
|
foreach(targetEntryCell, targetEntryList)
|
||||||
|
{
|
||||||
|
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||||
|
|
||||||
|
if (targetEntry->ressortgroupref > nextSortGroupRefIndex)
|
||||||
|
{
|
||||||
|
nextSortGroupRefIndex = targetEntry->ressortgroupref;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* next group ref index starts from max group ref index + 1 */
|
||||||
|
nextSortGroupRefIndex++;
|
||||||
|
|
||||||
|
return nextSortGroupRefIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GenerateWorkerTargetEntry is a simple utility function which gets a
|
||||||
|
* target entry, an expression and a targetProjectionNumber.
|
||||||
|
*
|
||||||
|
* The function returns a newly allocated target entry which can be added
|
||||||
|
* to the worker's target list.
|
||||||
|
*/
|
||||||
|
static TargetEntry *
|
||||||
|
GenerateWorkerTargetEntry(TargetEntry *targetEntry, Expr *workerExpression,
|
||||||
|
AttrNumber targetProjectionNumber)
|
||||||
|
{
|
||||||
|
TargetEntry *newTargetEntry = NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If a target entry is already provided, use a copy of
|
||||||
|
* it because some of the callers rely on resorigtbl and
|
||||||
|
* resorigcol.
|
||||||
|
*/
|
||||||
|
if (targetEntry)
|
||||||
|
{
|
||||||
|
newTargetEntry = copyObject(targetEntry);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
newTargetEntry = makeNode(TargetEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newTargetEntry->resname == NULL)
|
||||||
|
{
|
||||||
|
StringInfo columnNameString = makeStringInfo();
|
||||||
|
|
||||||
|
appendStringInfo(columnNameString, WORKER_COLUMN_FORMAT,
|
||||||
|
targetProjectionNumber);
|
||||||
|
|
||||||
|
newTargetEntry->resname = columnNameString->data;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* we can generate a target entry without any expressions */
|
||||||
|
Assert(workerExpression != NULL);
|
||||||
|
|
||||||
|
/* force resjunk to false as we may need this on the master */
|
||||||
|
newTargetEntry->expr = workerExpression;
|
||||||
|
newTargetEntry->resjunk = false;
|
||||||
|
newTargetEntry->resno = targetProjectionNumber;
|
||||||
|
|
||||||
|
return newTargetEntry;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AppendTargetEntryToGroupClause gets a target entry, pointer to group list
|
||||||
|
* and the ressortgroupref index.
|
||||||
|
*
|
||||||
|
* The function modifies all of the three input such that the target entry is
|
||||||
|
* appended to the group clause and the index is incremented by one.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
AppendTargetEntryToGroupClause(TargetEntry *targetEntry, List **groupClause,
|
||||||
|
Index *nextSortGroupRefIndex)
|
||||||
|
{
|
||||||
|
Expr *targetExpr PG_USED_FOR_ASSERTS_ONLY = targetEntry->expr;
|
||||||
|
Var *targetColumn = NULL;
|
||||||
|
SortGroupClause *groupByClause = NULL;
|
||||||
|
|
||||||
|
/* we currently only support appending Var target entries */
|
||||||
|
AssertArg(IsA(targetExpr, Var));
|
||||||
|
|
||||||
|
targetColumn = (Var *) targetEntry->expr;
|
||||||
|
groupByClause = CreateSortGroupClause(targetColumn);
|
||||||
|
|
||||||
|
/* the target entry should have an index */
|
||||||
|
targetEntry->ressortgroupref = *nextSortGroupRefIndex;
|
||||||
|
|
||||||
|
/* the group by clause entry should point to the correct index in the target list */
|
||||||
|
groupByClause->tleSortGroupRef = *nextSortGroupRefIndex;
|
||||||
|
|
||||||
|
/* update the group by list and the index's value */
|
||||||
|
*groupClause = lappend(*groupClause, groupByClause);
|
||||||
|
(*nextSortGroupRefIndex)++;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WorkerAggregateWalker walks over the original target entry expression, and
|
* WorkerAggregateWalker walks over the original target entry expression, and
|
||||||
* creates the list of expression trees (potentially more than one) to execute
|
* creates the list of expression trees (potentially more than one) to execute
|
||||||
|
|
Loading…
Reference in New Issue