Remove function GenerateNewTargetEntriesForSortClauses

pull/5010/head
Ahmet Gedemenli 2021-05-31 19:49:55 +03:00
parent d3feee37ea
commit 9638933d9d
4 changed files with 135 additions and 130 deletions

View File

@ -318,10 +318,6 @@ static Node * WorkerLimitCount(Node *limitCount, Node *limitOffset, OrderByLimit
static List * WorkerSortClauseList(Node *limitCount,
List *groupClauseList, List *sortClauseList,
OrderByLimitReference orderByLimitReference);
static List * GenerateNewTargetEntriesForSortClauses(List *originalTargetList,
List *sortClauseList,
AttrNumber *targetProjectionNumber,
Index *nextSortGroupRefIndex);
static bool CanPushDownLimitApproximate(List *sortClauseList, List *targetList);
static bool HasOrderByAggregate(List *sortClauseList, List *targetList);
static bool HasOrderByNonCommutativeAggregate(List *sortClauseList, List *targetList);
@ -2701,38 +2697,6 @@ ProcessWindowFunctionsForWorkerQuery(List *windowClauseList,
return;
}
WindowClause *windowClause = NULL;
foreach_ptr(windowClause, windowClauseList)
{
List *partitionClauseTargetList =
GenerateNewTargetEntriesForSortClauses(originalTargetEntryList,
windowClause->partitionClause,
&(queryTargetList->
targetProjectionNumber),
queryWindowClause->
nextSortGroupRefIndex);
List *orderClauseTargetList =
GenerateNewTargetEntriesForSortClauses(originalTargetEntryList,
windowClause->orderClause,
&(queryTargetList->
targetProjectionNumber),
queryWindowClause->
nextSortGroupRefIndex);
/*
* Note that even Citus does push down the window clauses as-is, we may still need to
* add the generated entries to the target list. The reason is that the same aggregates
* might be referred from another target entry that is a bare aggregate (e.g., no window
* functions), which would have been mutated. For instance, when an average aggregate
* is mutated on the target list, the window function would refer to a sum aggregate,
* which is obviously wrong.
*/
queryTargetList->targetEntryList = list_concat(queryTargetList->targetEntryList,
partitionClauseTargetList);
queryTargetList->targetEntryList = list_concat(queryTargetList->targetEntryList,
orderClauseTargetList);
}
queryWindowClause->workerWindowClauseList = windowClauseList;
queryWindowClause->hasWindowFunctions = true;
}
@ -2798,19 +2762,6 @@ ProcessLimitOrderByForWorkerQuery(OrderByLimitReference orderByLimitReference,
groupClauseList,
sortClauseList,
orderByLimitReference);
/*
* TODO: Do we really need to add the target entries if we're not pushing
* down ORDER BY?
*/
List *newTargetEntryListForSortClauses =
GenerateNewTargetEntriesForSortClauses(originalTargetList,
queryOrderByLimit->workerSortClauseList,
&(queryTargetList->targetProjectionNumber),
queryOrderByLimit->nextSortGroupRefIndex);
queryTargetList->targetEntryList =
list_concat(queryTargetList->targetEntryList, newTargetEntryListForSortClauses);
}
@ -4795,87 +4746,6 @@ WorkerSortClauseList(Node *limitCount, List *groupClauseList, List *sortClauseLi
}
/*
* GenerateNewTargetEntriesForSortClauses goes over provided sort clause lists and
* creates new target entries if needed to make sure sort clauses has correct
* references. The function returns list of new target entries, caller is
* responsible to add those target entries to the end of worker target list.
*
* The function is required because we change the target entry if it contains an
* expression having an aggregate operation, or just the AVG aggregate.
* Afterwards any order by clause referring to original target entry starts
* to point to a wrong expression.
*
* Note the function modifies SortGroupClause items in sortClauseList,
* targetProjectionNumber, and nextSortGroupRefIndex.
*/
static List *
GenerateNewTargetEntriesForSortClauses(List *originalTargetList,
List *sortClauseList,
AttrNumber *targetProjectionNumber,
Index *nextSortGroupRefIndex)
{
List *createdTargetList = NIL;
SortGroupClause *sgClause = NULL;
foreach_ptr(sgClause, sortClauseList)
{
TargetEntry *targetEntry = get_sortgroupclause_tle(sgClause, originalTargetList);
Expr *targetExpr = targetEntry->expr;
bool containsAggregate = contain_aggs_of_level((Node *) targetExpr, 0);
bool createNewTargetEntry = false;
/* we are only interested in target entries containing aggregates */
if (!containsAggregate)
{
continue;
}
/*
* If the target expression is not an Aggref, it is either an expression
* on a single aggregate, or expression containing multiple aggregates.
* Worker query mutates these target entries to have a naked target entry
* per aggregate function. We want to use original target entries if this
* the case.
* If the original target expression is an avg aggref, we also want to use
* original target entry.
*/
if (!IsA(targetExpr, Aggref))
{
createNewTargetEntry = true;
}
else
{
Aggref *aggNode = (Aggref *) targetExpr;
AggregateType aggregateType = GetAggregateType(aggNode);
if (aggregateType == AGGREGATE_AVERAGE)
{
createNewTargetEntry = true;
}
}
if (createNewTargetEntry)
{
bool resJunk = true;
AttrNumber nextResNo = (*targetProjectionNumber);
Expr *newExpr = copyObject(targetExpr);
TargetEntry *newTargetEntry = makeTargetEntry(newExpr, nextResNo,
targetEntry->resname, resJunk);
newTargetEntry->ressortgroupref = *nextSortGroupRefIndex;
createdTargetList = lappend(createdTargetList, newTargetEntry);
sgClause->tleSortGroupRef = *nextSortGroupRefIndex;
(*nextSortGroupRefIndex)++;
(*targetProjectionNumber)++;
}
}
return createdTargetList;
}
/*
* CanPushDownLimitApproximate checks if we can push down the limit clause to
* the worker nodes, and get approximate and meaningful results. We can do this

View File

@ -95,6 +95,9 @@ ORDER BY
-- window function operates on the results of
-- a join
-- we also want to verify that this doesn't crash
-- when the logging level is DEBUG4
SET log_min_messages TO DEBUG4;
SELECT
us.user_id,
SUM(us.value_1) OVER (PARTITION BY us.user_id)
@ -1593,3 +1596,57 @@ from public.users_table as ut limit 1;
(1 row)
-- verify that this doesn't crash with DEBUG4
SET log_min_messages TO DEBUG4;
SELECT
user_id, max(value_1) OVER (PARTITION BY user_id, MIN(value_2))
FROM (
SELECT
DISTINCT us.user_id, us.value_2, value_1, random() as r1
FROM
users_table as us, events_table
WHERE
us.user_id = events_table.user_id AND event_type IN (1,2)
ORDER BY
user_id, value_2
) s
GROUP BY
1, value_1
ORDER BY
2 DESC, 1;
user_id | max
---------------------------------------------------------------------
1 | 5
3 | 5
3 | 5
4 | 5
5 | 5
5 | 5
6 | 5
6 | 5
1 | 4
2 | 4
3 | 4
3 | 4
3 | 4
4 | 4
4 | 4
5 | 4
5 | 4
1 | 3
2 | 3
2 | 3
2 | 3
6 | 3
2 | 2
4 | 2
4 | 2
4 | 2
6 | 2
1 | 1
3 | 1
5 | 1
6 | 1
5 | 0
(32 rows)

View File

@ -95,6 +95,9 @@ ORDER BY
-- window function operates on the results of
-- a join
-- we also want to verify that this doesn't crash
-- when the logging level is DEBUG4
SET log_min_messages TO DEBUG4;
SELECT
us.user_id,
SUM(us.value_1) OVER (PARTITION BY us.user_id)
@ -1589,3 +1592,57 @@ from public.users_table as ut limit 1;
(1 row)
-- verify that this doesn't crash with DEBUG4
SET log_min_messages TO DEBUG4;
SELECT
user_id, max(value_1) OVER (PARTITION BY user_id, MIN(value_2))
FROM (
SELECT
DISTINCT us.user_id, us.value_2, value_1, random() as r1
FROM
users_table as us, events_table
WHERE
us.user_id = events_table.user_id AND event_type IN (1,2)
ORDER BY
user_id, value_2
) s
GROUP BY
1, value_1
ORDER BY
2 DESC, 1;
user_id | max
---------------------------------------------------------------------
1 | 5
3 | 5
3 | 5
4 | 5
5 | 5
5 | 5
6 | 5
6 | 5
1 | 4
2 | 4
3 | 4
3 | 4
3 | 4
4 | 4
4 | 4
5 | 4
5 | 4
1 | 3
2 | 3
2 | 3
2 | 3
6 | 3
2 | 2
4 | 2
4 | 2
4 | 2
6 | 2
1 | 1
3 | 1
5 | 1
6 | 1
5 | 0
(32 rows)

View File

@ -44,6 +44,9 @@ ORDER BY
-- window function operates on the results of
-- a join
-- we also want to verify that this doesn't crash
-- when the logging level is DEBUG4
SET log_min_messages TO DEBUG4;
SELECT
us.user_id,
SUM(us.value_1) OVER (PARTITION BY us.user_id)
@ -616,3 +619,21 @@ GROUP BY 1 ORDER BY 1;
select null = sum(null::int2) over ()
from public.users_table as ut limit 1;
-- verify that this doesn't crash with DEBUG4
SET log_min_messages TO DEBUG4;
SELECT
user_id, max(value_1) OVER (PARTITION BY user_id, MIN(value_2))
FROM (
SELECT
DISTINCT us.user_id, us.value_2, value_1, random() as r1
FROM
users_table as us, events_table
WHERE
us.user_id = events_table.user_id AND event_type IN (1,2)
ORDER BY
user_id, value_2
) s
GROUP BY
1, value_1
ORDER BY
2 DESC, 1;