Merge pull request #3406 from citusdata/fix-limit-approx

Expand the set of aggregates which cannot have LIMIT approximated
pull/3423/head
Philip Dubé 2020-01-30 18:00:40 +00:00 committed by GitHub
commit 6b43fab325
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 86 additions and 28 deletions

View File

@ -136,6 +136,18 @@ typedef struct QueryOrderByLimit
} QueryOrderByLimit;
/*
* LimitPushdownable tells us how a limit can be pushed down.
* See WorkerLimitCount for details.
*/
typedef enum LimitPushdownable
{
LIMIT_CANNOT_PUSHDOWN,
LIMIT_CAN_PUSHDOWN,
LIMIT_CAN_APPROXIMATE,
} LimitPushdownable;
/*
* OrderByLimitReference a structure that is used commonly while
* processing sort and limit clauses.
@ -300,7 +312,7 @@ static List * GenerateNewTargetEntriesForSortClauses(List *originalTargetList,
Index *nextSortGroupRefIndex);
static bool CanPushDownLimitApproximate(List *sortClauseList, List *targetList);
static bool HasOrderByAggregate(List *sortClauseList, List *targetList);
static bool HasOrderByAverage(List *sortClauseList, List *targetList);
static bool HasOrderByNonCommutativeAggregate(List *sortClauseList, List *targetList);
static bool HasOrderByComplexExpression(List *sortClauseList, List *targetList);
static bool HasOrderByHllType(List *sortClauseList, List *targetList);
@ -4213,8 +4225,7 @@ WorkerLimitCount(Node *limitCount, Node *limitOffset, OrderByLimitReference
orderByLimitReference)
{
Node *workerLimitNode = NULL;
bool canPushDownLimit = false;
bool canApproximate = false;
LimitPushdownable canPushDownLimit = LIMIT_CANNOT_PUSHDOWN;
/* no limit node to push down */
if (limitCount == NULL)
@ -4239,27 +4250,27 @@ WorkerLimitCount(Node *limitCount, Node *limitOffset, OrderByLimitReference
if (orderByLimitReference.groupClauseIsEmpty ||
orderByLimitReference.groupedByDisjointPartitionColumn)
{
canPushDownLimit = true;
canPushDownLimit = LIMIT_CAN_PUSHDOWN;
}
else if (orderByLimitReference.sortClauseIsEmpty)
{
canPushDownLimit = false;
canPushDownLimit = LIMIT_CANNOT_PUSHDOWN;
}
else if (!orderByLimitReference.hasOrderByAggregate)
{
canPushDownLimit = true;
canPushDownLimit = LIMIT_CAN_PUSHDOWN;
}
else
else if (orderByLimitReference.canApproximate)
{
canApproximate = orderByLimitReference.canApproximate;
canPushDownLimit = LIMIT_CAN_APPROXIMATE;
}
/* create the workerLimitNode according to the decisions above */
if (canPushDownLimit)
if (canPushDownLimit == LIMIT_CAN_PUSHDOWN)
{
workerLimitNode = (Node *) copyObject(limitCount);
}
else if (canApproximate)
else if (canPushDownLimit == LIMIT_CAN_APPROXIMATE)
{
Const *workerLimitConst = (Const *) copyObject(limitCount);
int64 workerLimitCount = (int64) LimitClauseRowFetchCount;
@ -4452,14 +4463,11 @@ CanPushDownLimitApproximate(List *sortClauseList, List *targetList)
if (sortClauseList != NIL)
{
bool orderByAverage = HasOrderByAverage(sortClauseList, targetList);
bool orderByNonCommutativeAggregate =
HasOrderByNonCommutativeAggregate(sortClauseList, targetList);
bool orderByComplex = HasOrderByComplexExpression(sortClauseList, targetList);
/*
* If we don't have any order by average or any complex expressions with
* aggregates in them, we can meaningfully approximate.
*/
if (!orderByAverage && !orderByComplex)
if (!orderByNonCommutativeAggregate && !orderByComplex)
{
canApproximate = true;
}
@ -4497,13 +4505,13 @@ HasOrderByAggregate(List *sortClauseList, List *targetList)
/*
* HasOrderByAverage walks over the given order by clauses, and checks if we
* have an order by an average. If we do, the function returns true.
* HasOrderByNonCommutativeAggregate walks over the given order by clauses,
* and checks if we have an order by an aggregate which is not commutative.
*/
static bool
HasOrderByAverage(List *sortClauseList, List *targetList)
HasOrderByNonCommutativeAggregate(List *sortClauseList, List *targetList)
{
bool hasOrderByAverage = false;
bool hasOrderByNonCommutativeAggregate = false;
ListCell *sortClauseCell = NULL;
foreach(sortClauseCell, sortClauseList)
@ -4517,15 +4525,22 @@ HasOrderByAverage(List *sortClauseList, List *targetList)
Aggref *aggregate = (Aggref *) sortExpression;
AggregateType aggregateType = GetAggregateType(aggregate);
if (aggregateType == AGGREGATE_AVERAGE)
if (aggregateType != AGGREGATE_MIN &&
aggregateType != AGGREGATE_MAX &&
aggregateType != AGGREGATE_SUM &&
aggregateType != AGGREGATE_COUNT &&
aggregateType != AGGREGATE_BIT_AND &&
aggregateType != AGGREGATE_BIT_OR &&
aggregateType != AGGREGATE_EVERY &&
aggregateType != AGGREGATE_ANY_VALUE)
{
hasOrderByAverage = true;
hasOrderByNonCommutativeAggregate = true;
break;
}
}
}
return hasOrderByAverage;
return hasOrderByNonCommutativeAggregate;
}

View File

@ -90,10 +90,45 @@ DEBUG: push down of limit count: 150
-- We now test scenarios where applying the limit optimization wouldn't produce
-- meaningful results. First, we check that we don't push down the limit clause
-- for non-commutative aggregates.
SELECT l_partkey, avg(l_suppkey) AS average FROM lineitem
SELECT l_partkey, avg(l_suppkey) FROM lineitem
GROUP BY l_partkey
ORDER BY average DESC, l_partkey LIMIT 10;
l_partkey | average
ORDER BY 2 DESC, l_partkey LIMIT 10;
l_partkey | avg
---------------------------------------------------------------------
9998 | 9999.0000000000000000
102466 | 9997.0000000000000000
184959 | 9996.0000000000000000
17492 | 9994.0000000000000000
124966 | 9991.0000000000000000
89989 | 9990.0000000000000000
32479 | 9989.0000000000000000
144960 | 9989.0000000000000000
147473 | 9988.0000000000000000
37481 | 9985.0000000000000000
(10 rows)
SELECT l_partkey, stddev(l_suppkey::float8) FROM lineitem
GROUP BY l_partkey
ORDER BY 2 DESC NULLS LAST, l_partkey LIMIT 10;
l_partkey | stddev
---------------------------------------------------------------------
192434 | 5343.60594542674
160226 | 5337.24198439606
151174 | 5335.1206640525
60844 | 5316.02878096046
62405 | 5316.02878096046
50168 | 5313.9074606169
52148 | 5313.9074606169
52398 | 5313.9074606169
10259 | 5305.42217924267
3496 | 5303.30085889911
(10 rows)
-- also test that we handle execution on coordinator properly
SELECT l_partkey, avg(distinct l_suppkey) FROM lineitem
GROUP BY l_partkey
ORDER BY 2 DESC, l_partkey LIMIT 10;
l_partkey | avg
---------------------------------------------------------------------
9998 | 9999.0000000000000000
102466 | 9997.0000000000000000

View File

@ -48,9 +48,17 @@ SELECT c_custkey, c_name, count(*) as lineitem_count
-- meaningful results. First, we check that we don't push down the limit clause
-- for non-commutative aggregates.
SELECT l_partkey, avg(l_suppkey) AS average FROM lineitem
SELECT l_partkey, avg(l_suppkey) FROM lineitem
GROUP BY l_partkey
ORDER BY average DESC, l_partkey LIMIT 10;
ORDER BY 2 DESC, l_partkey LIMIT 10;
SELECT l_partkey, stddev(l_suppkey::float8) FROM lineitem
GROUP BY l_partkey
ORDER BY 2 DESC NULLS LAST, l_partkey LIMIT 10;
-- also test that we handle execution on coordinator properly
SELECT l_partkey, avg(distinct l_suppkey) FROM lineitem
GROUP BY l_partkey
ORDER BY 2 DESC, l_partkey LIMIT 10;
-- Next, check that we don't apply the limit optimization for expressions that
-- have aggregates within them