mirror of https://github.com/citusdata/citus.git
Expand the set of aggregates which cannot have LIMIT approximated
Previously we only prevented AVG from being pushed down, but this is incorrect: - array_agg, while somewhat non sensical to order by, will potentially be missing values - combinefunc aggregation will raise errors about cstrings not being comparable (while we also can't know if the aggregate is commutative) This commit limits approximating LIMIT pushdown when ordering by aggregates to: min, max, sum, count, bit_and, bit_or, every, any Which means of those we previously supported, we now exclude: avg, array_agg, jsonb_agg, jsonb_object_agg, json_agg, json_object_agg, hll_add, hll_union, topn_add, topn_unionpull/3406/head
parent
8584cb005b
commit
5fccc56d3e
|
@ -136,6 +136,18 @@ typedef struct QueryOrderByLimit
|
|||
} QueryOrderByLimit;
|
||||
|
||||
|
||||
/*
|
||||
* LimitPushdownable tells us how a limit can be pushed down.
|
||||
* See WorkerLimitCount for details.
|
||||
*/
|
||||
typedef enum LimitPushdownable
|
||||
{
|
||||
LIMIT_CANNOT_PUSHDOWN,
|
||||
LIMIT_CAN_PUSHDOWN,
|
||||
LIMIT_CAN_APPROXIMATE,
|
||||
} LimitPushdownable;
|
||||
|
||||
|
||||
/*
|
||||
* OrderByLimitReference a structure that is used commonly while
|
||||
* processing sort and limit clauses.
|
||||
|
@ -300,7 +312,7 @@ static List * GenerateNewTargetEntriesForSortClauses(List *originalTargetList,
|
|||
Index *nextSortGroupRefIndex);
|
||||
static bool CanPushDownLimitApproximate(List *sortClauseList, List *targetList);
|
||||
static bool HasOrderByAggregate(List *sortClauseList, List *targetList);
|
||||
static bool HasOrderByAverage(List *sortClauseList, List *targetList);
|
||||
static bool HasOrderByNonCommutativeAggregate(List *sortClauseList, List *targetList);
|
||||
static bool HasOrderByComplexExpression(List *sortClauseList, List *targetList);
|
||||
static bool HasOrderByHllType(List *sortClauseList, List *targetList);
|
||||
|
||||
|
@ -4213,8 +4225,7 @@ WorkerLimitCount(Node *limitCount, Node *limitOffset, OrderByLimitReference
|
|||
orderByLimitReference)
|
||||
{
|
||||
Node *workerLimitNode = NULL;
|
||||
bool canPushDownLimit = false;
|
||||
bool canApproximate = false;
|
||||
LimitPushdownable canPushDownLimit = LIMIT_CANNOT_PUSHDOWN;
|
||||
|
||||
/* no limit node to push down */
|
||||
if (limitCount == NULL)
|
||||
|
@ -4239,27 +4250,27 @@ WorkerLimitCount(Node *limitCount, Node *limitOffset, OrderByLimitReference
|
|||
if (orderByLimitReference.groupClauseIsEmpty ||
|
||||
orderByLimitReference.groupedByDisjointPartitionColumn)
|
||||
{
|
||||
canPushDownLimit = true;
|
||||
canPushDownLimit = LIMIT_CAN_PUSHDOWN;
|
||||
}
|
||||
else if (orderByLimitReference.sortClauseIsEmpty)
|
||||
{
|
||||
canPushDownLimit = false;
|
||||
canPushDownLimit = LIMIT_CANNOT_PUSHDOWN;
|
||||
}
|
||||
else if (!orderByLimitReference.hasOrderByAggregate)
|
||||
{
|
||||
canPushDownLimit = true;
|
||||
canPushDownLimit = LIMIT_CAN_PUSHDOWN;
|
||||
}
|
||||
else
|
||||
else if (orderByLimitReference.canApproximate)
|
||||
{
|
||||
canApproximate = orderByLimitReference.canApproximate;
|
||||
canPushDownLimit = LIMIT_CAN_APPROXIMATE;
|
||||
}
|
||||
|
||||
/* create the workerLimitNode according to the decisions above */
|
||||
if (canPushDownLimit)
|
||||
if (canPushDownLimit == LIMIT_CAN_PUSHDOWN)
|
||||
{
|
||||
workerLimitNode = (Node *) copyObject(limitCount);
|
||||
}
|
||||
else if (canApproximate)
|
||||
else if (canPushDownLimit == LIMIT_CAN_APPROXIMATE)
|
||||
{
|
||||
Const *workerLimitConst = (Const *) copyObject(limitCount);
|
||||
int64 workerLimitCount = (int64) LimitClauseRowFetchCount;
|
||||
|
@ -4452,14 +4463,11 @@ CanPushDownLimitApproximate(List *sortClauseList, List *targetList)
|
|||
|
||||
if (sortClauseList != NIL)
|
||||
{
|
||||
bool orderByAverage = HasOrderByAverage(sortClauseList, targetList);
|
||||
bool orderByNonCommutativeAggregate =
|
||||
HasOrderByNonCommutativeAggregate(sortClauseList, targetList);
|
||||
bool orderByComplex = HasOrderByComplexExpression(sortClauseList, targetList);
|
||||
|
||||
/*
|
||||
* If we don't have any order by average or any complex expressions with
|
||||
* aggregates in them, we can meaningfully approximate.
|
||||
*/
|
||||
if (!orderByAverage && !orderByComplex)
|
||||
if (!orderByNonCommutativeAggregate && !orderByComplex)
|
||||
{
|
||||
canApproximate = true;
|
||||
}
|
||||
|
@ -4497,13 +4505,13 @@ HasOrderByAggregate(List *sortClauseList, List *targetList)
|
|||
|
||||
|
||||
/*
|
||||
* HasOrderByAverage walks over the given order by clauses, and checks if we
|
||||
* have an order by an average. If we do, the function returns true.
|
||||
* HasOrderByNonCommutativeAggregate walks over the given order by clauses,
|
||||
* and checks if we have an order by an aggregate which is not commutative.
|
||||
*/
|
||||
static bool
|
||||
HasOrderByAverage(List *sortClauseList, List *targetList)
|
||||
HasOrderByNonCommutativeAggregate(List *sortClauseList, List *targetList)
|
||||
{
|
||||
bool hasOrderByAverage = false;
|
||||
bool hasOrderByNonCommutativeAggregate = false;
|
||||
ListCell *sortClauseCell = NULL;
|
||||
|
||||
foreach(sortClauseCell, sortClauseList)
|
||||
|
@ -4517,15 +4525,22 @@ HasOrderByAverage(List *sortClauseList, List *targetList)
|
|||
Aggref *aggregate = (Aggref *) sortExpression;
|
||||
|
||||
AggregateType aggregateType = GetAggregateType(aggregate);
|
||||
if (aggregateType == AGGREGATE_AVERAGE)
|
||||
if (aggregateType != AGGREGATE_MIN &&
|
||||
aggregateType != AGGREGATE_MAX &&
|
||||
aggregateType != AGGREGATE_SUM &&
|
||||
aggregateType != AGGREGATE_COUNT &&
|
||||
aggregateType != AGGREGATE_BIT_AND &&
|
||||
aggregateType != AGGREGATE_BIT_OR &&
|
||||
aggregateType != AGGREGATE_EVERY &&
|
||||
aggregateType != AGGREGATE_ANY_VALUE)
|
||||
{
|
||||
hasOrderByAverage = true;
|
||||
hasOrderByNonCommutativeAggregate = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return hasOrderByAverage;
|
||||
return hasOrderByNonCommutativeAggregate;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -90,10 +90,45 @@ DEBUG: push down of limit count: 150
|
|||
-- We now test scenarios where applying the limit optimization wouldn't produce
|
||||
-- meaningful results. First, we check that we don't push down the limit clause
|
||||
-- for non-commutative aggregates.
|
||||
SELECT l_partkey, avg(l_suppkey) AS average FROM lineitem
|
||||
SELECT l_partkey, avg(l_suppkey) FROM lineitem
|
||||
GROUP BY l_partkey
|
||||
ORDER BY average DESC, l_partkey LIMIT 10;
|
||||
l_partkey | average
|
||||
ORDER BY 2 DESC, l_partkey LIMIT 10;
|
||||
l_partkey | avg
|
||||
---------------------------------------------------------------------
|
||||
9998 | 9999.0000000000000000
|
||||
102466 | 9997.0000000000000000
|
||||
184959 | 9996.0000000000000000
|
||||
17492 | 9994.0000000000000000
|
||||
124966 | 9991.0000000000000000
|
||||
89989 | 9990.0000000000000000
|
||||
32479 | 9989.0000000000000000
|
||||
144960 | 9989.0000000000000000
|
||||
147473 | 9988.0000000000000000
|
||||
37481 | 9985.0000000000000000
|
||||
(10 rows)
|
||||
|
||||
SELECT l_partkey, stddev(l_suppkey::float8) FROM lineitem
|
||||
GROUP BY l_partkey
|
||||
ORDER BY 2 DESC NULLS LAST, l_partkey LIMIT 10;
|
||||
l_partkey | stddev
|
||||
---------------------------------------------------------------------
|
||||
192434 | 5343.60594542674
|
||||
160226 | 5337.24198439606
|
||||
151174 | 5335.1206640525
|
||||
60844 | 5316.02878096046
|
||||
62405 | 5316.02878096046
|
||||
50168 | 5313.9074606169
|
||||
52148 | 5313.9074606169
|
||||
52398 | 5313.9074606169
|
||||
10259 | 5305.42217924267
|
||||
3496 | 5303.30085889911
|
||||
(10 rows)
|
||||
|
||||
-- also test that we handle execution on coordinator properly
|
||||
SELECT l_partkey, avg(distinct l_suppkey) FROM lineitem
|
||||
GROUP BY l_partkey
|
||||
ORDER BY 2 DESC, l_partkey LIMIT 10;
|
||||
l_partkey | avg
|
||||
---------------------------------------------------------------------
|
||||
9998 | 9999.0000000000000000
|
||||
102466 | 9997.0000000000000000
|
||||
|
|
|
@ -48,9 +48,17 @@ SELECT c_custkey, c_name, count(*) as lineitem_count
|
|||
-- meaningful results. First, we check that we don't push down the limit clause
|
||||
-- for non-commutative aggregates.
|
||||
|
||||
SELECT l_partkey, avg(l_suppkey) AS average FROM lineitem
|
||||
SELECT l_partkey, avg(l_suppkey) FROM lineitem
|
||||
GROUP BY l_partkey
|
||||
ORDER BY average DESC, l_partkey LIMIT 10;
|
||||
ORDER BY 2 DESC, l_partkey LIMIT 10;
|
||||
SELECT l_partkey, stddev(l_suppkey::float8) FROM lineitem
|
||||
GROUP BY l_partkey
|
||||
ORDER BY 2 DESC NULLS LAST, l_partkey LIMIT 10;
|
||||
|
||||
-- also test that we handle execution on coordinator properly
|
||||
SELECT l_partkey, avg(distinct l_suppkey) FROM lineitem
|
||||
GROUP BY l_partkey
|
||||
ORDER BY 2 DESC, l_partkey LIMIT 10;
|
||||
|
||||
-- Next, check that we don't apply the limit optimization for expressions that
|
||||
-- have aggregates within them
|
||||
|
|
Loading…
Reference in New Issue