mirror of https://github.com/citusdata/citus.git
Remove function GenerateNewTargetEntriesForSortClauses
(cherry picked from commit 9638933d9d
)
pull/5106/head
parent
3bcfadf2f1
commit
dd2dfac198
|
@ -322,10 +322,6 @@ static Node * WorkerLimitCount(Node *limitCount, Node *limitOffset, OrderByLimit
|
||||||
static List * WorkerSortClauseList(Node *limitCount,
|
static List * WorkerSortClauseList(Node *limitCount,
|
||||||
List *groupClauseList, List *sortClauseList,
|
List *groupClauseList, List *sortClauseList,
|
||||||
OrderByLimitReference orderByLimitReference);
|
OrderByLimitReference orderByLimitReference);
|
||||||
static List * GenerateNewTargetEntriesForSortClauses(List *originalTargetList,
|
|
||||||
List *sortClauseList,
|
|
||||||
AttrNumber *targetProjectionNumber,
|
|
||||||
Index *nextSortGroupRefIndex);
|
|
||||||
static bool CanPushDownLimitApproximate(List *sortClauseList, List *targetList);
|
static bool CanPushDownLimitApproximate(List *sortClauseList, List *targetList);
|
||||||
static bool HasOrderByAggregate(List *sortClauseList, List *targetList);
|
static bool HasOrderByAggregate(List *sortClauseList, List *targetList);
|
||||||
static bool HasOrderByNonCommutativeAggregate(List *sortClauseList, List *targetList);
|
static bool HasOrderByNonCommutativeAggregate(List *sortClauseList, List *targetList);
|
||||||
|
@ -2705,38 +2701,6 @@ ProcessWindowFunctionsForWorkerQuery(List *windowClauseList,
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
WindowClause *windowClause = NULL;
|
|
||||||
foreach_ptr(windowClause, windowClauseList)
|
|
||||||
{
|
|
||||||
List *partitionClauseTargetList =
|
|
||||||
GenerateNewTargetEntriesForSortClauses(originalTargetEntryList,
|
|
||||||
windowClause->partitionClause,
|
|
||||||
&(queryTargetList->
|
|
||||||
targetProjectionNumber),
|
|
||||||
queryWindowClause->
|
|
||||||
nextSortGroupRefIndex);
|
|
||||||
List *orderClauseTargetList =
|
|
||||||
GenerateNewTargetEntriesForSortClauses(originalTargetEntryList,
|
|
||||||
windowClause->orderClause,
|
|
||||||
&(queryTargetList->
|
|
||||||
targetProjectionNumber),
|
|
||||||
queryWindowClause->
|
|
||||||
nextSortGroupRefIndex);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Note that even Citus does push down the window clauses as-is, we may still need to
|
|
||||||
* add the generated entries to the target list. The reason is that the same aggregates
|
|
||||||
* might be referred from another target entry that is a bare aggregate (e.g., no window
|
|
||||||
* functions), which would have been mutated. For instance, when an average aggregate
|
|
||||||
* is mutated on the target list, the window function would refer to a sum aggregate,
|
|
||||||
* which is obviously wrong.
|
|
||||||
*/
|
|
||||||
queryTargetList->targetEntryList = list_concat(queryTargetList->targetEntryList,
|
|
||||||
partitionClauseTargetList);
|
|
||||||
queryTargetList->targetEntryList = list_concat(queryTargetList->targetEntryList,
|
|
||||||
orderClauseTargetList);
|
|
||||||
}
|
|
||||||
|
|
||||||
queryWindowClause->workerWindowClauseList = windowClauseList;
|
queryWindowClause->workerWindowClauseList = windowClauseList;
|
||||||
queryWindowClause->hasWindowFunctions = true;
|
queryWindowClause->hasWindowFunctions = true;
|
||||||
}
|
}
|
||||||
|
@ -2802,19 +2766,6 @@ ProcessLimitOrderByForWorkerQuery(OrderByLimitReference orderByLimitReference,
|
||||||
groupClauseList,
|
groupClauseList,
|
||||||
sortClauseList,
|
sortClauseList,
|
||||||
orderByLimitReference);
|
orderByLimitReference);
|
||||||
|
|
||||||
/*
|
|
||||||
* TODO: Do we really need to add the target entries if we're not pushing
|
|
||||||
* down ORDER BY?
|
|
||||||
*/
|
|
||||||
List *newTargetEntryListForSortClauses =
|
|
||||||
GenerateNewTargetEntriesForSortClauses(originalTargetList,
|
|
||||||
queryOrderByLimit->workerSortClauseList,
|
|
||||||
&(queryTargetList->targetProjectionNumber),
|
|
||||||
queryOrderByLimit->nextSortGroupRefIndex);
|
|
||||||
|
|
||||||
queryTargetList->targetEntryList =
|
|
||||||
list_concat(queryTargetList->targetEntryList, newTargetEntryListForSortClauses);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -4803,87 +4754,6 @@ WorkerSortClauseList(Node *limitCount, List *groupClauseList, List *sortClauseLi
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* GenerateNewTargetEntriesForSortClauses goes over provided sort clause lists and
|
|
||||||
* creates new target entries if needed to make sure sort clauses has correct
|
|
||||||
* references. The function returns list of new target entries, caller is
|
|
||||||
* responsible to add those target entries to the end of worker target list.
|
|
||||||
*
|
|
||||||
* The function is required because we change the target entry if it contains an
|
|
||||||
* expression having an aggregate operation, or just the AVG aggregate.
|
|
||||||
* Afterwards any order by clause referring to original target entry starts
|
|
||||||
* to point to a wrong expression.
|
|
||||||
*
|
|
||||||
* Note the function modifies SortGroupClause items in sortClauseList,
|
|
||||||
* targetProjectionNumber, and nextSortGroupRefIndex.
|
|
||||||
*/
|
|
||||||
static List *
|
|
||||||
GenerateNewTargetEntriesForSortClauses(List *originalTargetList,
|
|
||||||
List *sortClauseList,
|
|
||||||
AttrNumber *targetProjectionNumber,
|
|
||||||
Index *nextSortGroupRefIndex)
|
|
||||||
{
|
|
||||||
List *createdTargetList = NIL;
|
|
||||||
|
|
||||||
SortGroupClause *sgClause = NULL;
|
|
||||||
foreach_ptr(sgClause, sortClauseList)
|
|
||||||
{
|
|
||||||
TargetEntry *targetEntry = get_sortgroupclause_tle(sgClause, originalTargetList);
|
|
||||||
Expr *targetExpr = targetEntry->expr;
|
|
||||||
bool containsAggregate = contain_aggs_of_level((Node *) targetExpr, 0);
|
|
||||||
bool createNewTargetEntry = false;
|
|
||||||
|
|
||||||
/* we are only interested in target entries containing aggregates */
|
|
||||||
if (!containsAggregate)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If the target expression is not an Aggref, it is either an expression
|
|
||||||
* on a single aggregate, or expression containing multiple aggregates.
|
|
||||||
* Worker query mutates these target entries to have a naked target entry
|
|
||||||
* per aggregate function. We want to use original target entries if this
|
|
||||||
* the case.
|
|
||||||
* If the original target expression is an avg aggref, we also want to use
|
|
||||||
* original target entry.
|
|
||||||
*/
|
|
||||||
if (!IsA(targetExpr, Aggref))
|
|
||||||
{
|
|
||||||
createNewTargetEntry = true;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
Aggref *aggNode = (Aggref *) targetExpr;
|
|
||||||
AggregateType aggregateType = GetAggregateType(aggNode);
|
|
||||||
if (aggregateType == AGGREGATE_AVERAGE)
|
|
||||||
{
|
|
||||||
createNewTargetEntry = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (createNewTargetEntry)
|
|
||||||
{
|
|
||||||
bool resJunk = true;
|
|
||||||
AttrNumber nextResNo = (*targetProjectionNumber);
|
|
||||||
Expr *newExpr = copyObject(targetExpr);
|
|
||||||
TargetEntry *newTargetEntry = makeTargetEntry(newExpr, nextResNo,
|
|
||||||
targetEntry->resname, resJunk);
|
|
||||||
newTargetEntry->ressortgroupref = *nextSortGroupRefIndex;
|
|
||||||
|
|
||||||
createdTargetList = lappend(createdTargetList, newTargetEntry);
|
|
||||||
|
|
||||||
sgClause->tleSortGroupRef = *nextSortGroupRefIndex;
|
|
||||||
|
|
||||||
(*nextSortGroupRefIndex)++;
|
|
||||||
(*targetProjectionNumber)++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return createdTargetList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CanPushDownLimitApproximate checks if we can push down the limit clause to
|
* CanPushDownLimitApproximate checks if we can push down the limit clause to
|
||||||
* the worker nodes, and get approximate and meaningful results. We can do this
|
* the worker nodes, and get approximate and meaningful results. We can do this
|
||||||
|
|
|
@ -95,6 +95,9 @@ ORDER BY
|
||||||
|
|
||||||
-- window function operates on the results of
|
-- window function operates on the results of
|
||||||
-- a join
|
-- a join
|
||||||
|
-- we also want to verify that this doesn't crash
|
||||||
|
-- when the logging level is DEBUG4
|
||||||
|
SET log_min_messages TO DEBUG4;
|
||||||
SELECT
|
SELECT
|
||||||
us.user_id,
|
us.user_id,
|
||||||
SUM(us.value_1) OVER (PARTITION BY us.user_id)
|
SUM(us.value_1) OVER (PARTITION BY us.user_id)
|
||||||
|
@ -1593,3 +1596,57 @@ from public.users_table as ut limit 1;
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- verify that this doesn't crash with DEBUG4
|
||||||
|
SET log_min_messages TO DEBUG4;
|
||||||
|
SELECT
|
||||||
|
user_id, max(value_1) OVER (PARTITION BY user_id, MIN(value_2))
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
DISTINCT us.user_id, us.value_2, value_1, random() as r1
|
||||||
|
FROM
|
||||||
|
users_table as us, events_table
|
||||||
|
WHERE
|
||||||
|
us.user_id = events_table.user_id AND event_type IN (1,2)
|
||||||
|
ORDER BY
|
||||||
|
user_id, value_2
|
||||||
|
) s
|
||||||
|
GROUP BY
|
||||||
|
1, value_1
|
||||||
|
ORDER BY
|
||||||
|
2 DESC, 1;
|
||||||
|
user_id | max
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 5
|
||||||
|
3 | 5
|
||||||
|
3 | 5
|
||||||
|
4 | 5
|
||||||
|
5 | 5
|
||||||
|
5 | 5
|
||||||
|
6 | 5
|
||||||
|
6 | 5
|
||||||
|
1 | 4
|
||||||
|
2 | 4
|
||||||
|
3 | 4
|
||||||
|
3 | 4
|
||||||
|
3 | 4
|
||||||
|
4 | 4
|
||||||
|
4 | 4
|
||||||
|
5 | 4
|
||||||
|
5 | 4
|
||||||
|
1 | 3
|
||||||
|
2 | 3
|
||||||
|
2 | 3
|
||||||
|
2 | 3
|
||||||
|
6 | 3
|
||||||
|
2 | 2
|
||||||
|
4 | 2
|
||||||
|
4 | 2
|
||||||
|
4 | 2
|
||||||
|
6 | 2
|
||||||
|
1 | 1
|
||||||
|
3 | 1
|
||||||
|
5 | 1
|
||||||
|
6 | 1
|
||||||
|
5 | 0
|
||||||
|
(32 rows)
|
||||||
|
|
||||||
|
|
|
@ -95,6 +95,9 @@ ORDER BY
|
||||||
|
|
||||||
-- window function operates on the results of
|
-- window function operates on the results of
|
||||||
-- a join
|
-- a join
|
||||||
|
-- we also want to verify that this doesn't crash
|
||||||
|
-- when the logging level is DEBUG4
|
||||||
|
SET log_min_messages TO DEBUG4;
|
||||||
SELECT
|
SELECT
|
||||||
us.user_id,
|
us.user_id,
|
||||||
SUM(us.value_1) OVER (PARTITION BY us.user_id)
|
SUM(us.value_1) OVER (PARTITION BY us.user_id)
|
||||||
|
@ -1589,3 +1592,57 @@ from public.users_table as ut limit 1;
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- verify that this doesn't crash with DEBUG4
|
||||||
|
SET log_min_messages TO DEBUG4;
|
||||||
|
SELECT
|
||||||
|
user_id, max(value_1) OVER (PARTITION BY user_id, MIN(value_2))
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
DISTINCT us.user_id, us.value_2, value_1, random() as r1
|
||||||
|
FROM
|
||||||
|
users_table as us, events_table
|
||||||
|
WHERE
|
||||||
|
us.user_id = events_table.user_id AND event_type IN (1,2)
|
||||||
|
ORDER BY
|
||||||
|
user_id, value_2
|
||||||
|
) s
|
||||||
|
GROUP BY
|
||||||
|
1, value_1
|
||||||
|
ORDER BY
|
||||||
|
2 DESC, 1;
|
||||||
|
user_id | max
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 5
|
||||||
|
3 | 5
|
||||||
|
3 | 5
|
||||||
|
4 | 5
|
||||||
|
5 | 5
|
||||||
|
5 | 5
|
||||||
|
6 | 5
|
||||||
|
6 | 5
|
||||||
|
1 | 4
|
||||||
|
2 | 4
|
||||||
|
3 | 4
|
||||||
|
3 | 4
|
||||||
|
3 | 4
|
||||||
|
4 | 4
|
||||||
|
4 | 4
|
||||||
|
5 | 4
|
||||||
|
5 | 4
|
||||||
|
1 | 3
|
||||||
|
2 | 3
|
||||||
|
2 | 3
|
||||||
|
2 | 3
|
||||||
|
6 | 3
|
||||||
|
2 | 2
|
||||||
|
4 | 2
|
||||||
|
4 | 2
|
||||||
|
4 | 2
|
||||||
|
6 | 2
|
||||||
|
1 | 1
|
||||||
|
3 | 1
|
||||||
|
5 | 1
|
||||||
|
6 | 1
|
||||||
|
5 | 0
|
||||||
|
(32 rows)
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,9 @@ ORDER BY
|
||||||
|
|
||||||
-- window function operates on the results of
|
-- window function operates on the results of
|
||||||
-- a join
|
-- a join
|
||||||
|
-- we also want to verify that this doesn't crash
|
||||||
|
-- when the logging level is DEBUG4
|
||||||
|
SET log_min_messages TO DEBUG4;
|
||||||
SELECT
|
SELECT
|
||||||
us.user_id,
|
us.user_id,
|
||||||
SUM(us.value_1) OVER (PARTITION BY us.user_id)
|
SUM(us.value_1) OVER (PARTITION BY us.user_id)
|
||||||
|
@ -616,3 +619,21 @@ GROUP BY 1 ORDER BY 1;
|
||||||
select null = sum(null::int2) over ()
|
select null = sum(null::int2) over ()
|
||||||
from public.users_table as ut limit 1;
|
from public.users_table as ut limit 1;
|
||||||
|
|
||||||
|
-- verify that this doesn't crash with DEBUG4
|
||||||
|
SET log_min_messages TO DEBUG4;
|
||||||
|
SELECT
|
||||||
|
user_id, max(value_1) OVER (PARTITION BY user_id, MIN(value_2))
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
DISTINCT us.user_id, us.value_2, value_1, random() as r1
|
||||||
|
FROM
|
||||||
|
users_table as us, events_table
|
||||||
|
WHERE
|
||||||
|
us.user_id = events_table.user_id AND event_type IN (1,2)
|
||||||
|
ORDER BY
|
||||||
|
user_id, value_2
|
||||||
|
) s
|
||||||
|
GROUP BY
|
||||||
|
1, value_1
|
||||||
|
ORDER BY
|
||||||
|
2 DESC, 1;
|
||||||
|
|
Loading…
Reference in New Issue