mirror of https://github.com/citusdata/citus.git
Push down LIMIT and HAVING when grouped by partition key. (#1641)
We can do this because all rows belonging to a group are in the same shard when grouping by distribution column on a range/hash distributed table.pull/1682/head
parent
03bddcbfab
commit
11adb9b034
|
@ -109,12 +109,14 @@ static void RemoveUnaryNode(MultiUnaryNode *unaryNode);
|
|||
static void PullUpUnaryNode(MultiUnaryNode *unaryNode);
|
||||
static void ParentSetNewChild(MultiNode *parentNode, MultiNode *oldChildNode,
|
||||
MultiNode *newChildNode);
|
||||
static bool GroupedByDisjointPartitionColumn(List *tableNodeList,
|
||||
MultiExtendedOp *opNode);
|
||||
|
||||
/* Local functions forward declarations for aggregate expressions */
|
||||
static void ApplyExtendedOpNodes(MultiExtendedOp *originalNode,
|
||||
MultiExtendedOp *masterNode,
|
||||
MultiExtendedOp *workerNode);
|
||||
static void TransformSubqueryNode(MultiTable *subqueryNode);
|
||||
static void TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList);
|
||||
static MultiExtendedOp * MasterExtendedOpNode(MultiExtendedOp *originalOpNode);
|
||||
static Node * MasterAggregateMutator(Node *originalNode,
|
||||
MasterAggregateWalkerContext *walkerContext);
|
||||
|
@ -123,7 +125,8 @@ static Expr * MasterAggregateExpression(Aggref *originalAggregate,
|
|||
static Expr * MasterAverageExpression(Oid sumAggregateType, Oid countAggregateType,
|
||||
AttrNumber *columnId);
|
||||
static Expr * AddTypeConversion(Node *originalAggregate, Node *newExpression);
|
||||
static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode);
|
||||
static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
||||
bool groupedByDisjointPartitionColumn);
|
||||
static bool WorkerAggregateWalker(Node *node,
|
||||
WorkerAggregateWalkerContext *walkerContext);
|
||||
static List * WorkerAggregateExpressionList(Aggref *originalAggregate,
|
||||
|
@ -151,8 +154,10 @@ static bool TablePartitioningSupportsDistinct(List *tableNodeList,
|
|||
static bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column);
|
||||
|
||||
/* Local functions forward declarations for limit clauses */
|
||||
static Node * WorkerLimitCount(MultiExtendedOp *originalOpNode);
|
||||
static List * WorkerSortClauseList(MultiExtendedOp *originalOpNode);
|
||||
static Node * WorkerLimitCount(MultiExtendedOp *originalOpNode,
|
||||
bool groupedByDisjointPartitionColumn);
|
||||
static List * WorkerSortClauseList(MultiExtendedOp *originalOpNode,
|
||||
bool groupedByDisjointPartitionColumn);
|
||||
static bool CanPushDownLimitApproximate(List *sortClauseList, List *targetList);
|
||||
static bool HasOrderByAggregate(List *sortClauseList, List *targetList);
|
||||
static bool HasOrderByAverage(List *sortClauseList, List *targetList);
|
||||
|
@ -177,6 +182,7 @@ void
|
|||
MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan)
|
||||
{
|
||||
bool hasOrderByHllType = false;
|
||||
bool groupedByDisjointPartitionColumn = false;
|
||||
List *selectNodeList = NIL;
|
||||
List *projectNodeList = NIL;
|
||||
List *collectNodeList = NIL;
|
||||
|
@ -251,19 +257,23 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan)
|
|||
extendedOpNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp);
|
||||
extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList);
|
||||
|
||||
tableNodeList = FindNodesOfType(logicalPlanNode, T_MultiTable);
|
||||
groupedByDisjointPartitionColumn = GroupedByDisjointPartitionColumn(tableNodeList,
|
||||
extendedOpNode);
|
||||
|
||||
masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode);
|
||||
workerExtendedOpNode = WorkerExtendedOpNode(extendedOpNode);
|
||||
workerExtendedOpNode = WorkerExtendedOpNode(extendedOpNode,
|
||||
groupedByDisjointPartitionColumn);
|
||||
|
||||
ApplyExtendedOpNodes(extendedOpNode, masterExtendedOpNode, workerExtendedOpNode);
|
||||
|
||||
tableNodeList = FindNodesOfType(logicalPlanNode, T_MultiTable);
|
||||
foreach(tableNodeCell, tableNodeList)
|
||||
{
|
||||
MultiTable *tableNode = (MultiTable *) lfirst(tableNodeCell);
|
||||
if (tableNode->relationId == SUBQUERY_RELATION_ID)
|
||||
{
|
||||
ErrorIfContainsUnsupportedAggregate((MultiNode *) tableNode);
|
||||
TransformSubqueryNode(tableNode);
|
||||
TransformSubqueryNode(tableNode, tableNodeList);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1150,14 +1160,17 @@ ApplyExtendedOpNodes(MultiExtendedOp *originalNode, MultiExtendedOp *masterNode,
|
|||
* operator node.
|
||||
*/
|
||||
static void
|
||||
TransformSubqueryNode(MultiTable *subqueryNode)
|
||||
TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList)
|
||||
{
|
||||
MultiExtendedOp *extendedOpNode =
|
||||
(MultiExtendedOp *) ChildNode((MultiUnaryNode *) subqueryNode);
|
||||
MultiNode *collectNode = ChildNode((MultiUnaryNode *) extendedOpNode);
|
||||
MultiNode *collectChildNode = ChildNode((MultiUnaryNode *) collectNode);
|
||||
bool groupedByDisjointPartitionColumn =
|
||||
GroupedByDisjointPartitionColumn(tableNodeList, extendedOpNode);
|
||||
MultiExtendedOp *masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode);
|
||||
MultiExtendedOp *workerExtendedOpNode = WorkerExtendedOpNode(extendedOpNode);
|
||||
MultiExtendedOp *workerExtendedOpNode =
|
||||
WorkerExtendedOpNode(extendedOpNode, groupedByDisjointPartitionColumn);
|
||||
MultiPartition *partitionNode = CitusMakeNode(MultiPartition);
|
||||
List *groupClauseList = extendedOpNode->groupClauseList;
|
||||
List *targetEntryList = extendedOpNode->targetList;
|
||||
|
@ -1759,7 +1772,8 @@ AddTypeConversion(Node *originalAggregate, Node *newExpression)
|
|||
* list of worker extended operator.
|
||||
*/
|
||||
static MultiExtendedOp *
|
||||
WorkerExtendedOpNode(MultiExtendedOp *originalOpNode)
|
||||
WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
|
||||
bool groupedByDisjointPartitionColumn)
|
||||
{
|
||||
MultiExtendedOp *workerExtendedOpNode = NULL;
|
||||
MultiNode *parentNode = ParentNode((MultiNode *) originalOpNode);
|
||||
|
@ -1913,8 +1927,19 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode)
|
|||
workerExtendedOpNode->groupClauseList = groupClauseList;
|
||||
|
||||
/* if we can push down the limit, also set related fields */
|
||||
workerExtendedOpNode->limitCount = WorkerLimitCount(originalOpNode);
|
||||
workerExtendedOpNode->sortClauseList = WorkerSortClauseList(originalOpNode);
|
||||
workerExtendedOpNode->limitCount = WorkerLimitCount(originalOpNode,
|
||||
groupedByDisjointPartitionColumn);
|
||||
workerExtendedOpNode->sortClauseList =
|
||||
WorkerSortClauseList(originalOpNode, groupedByDisjointPartitionColumn);
|
||||
|
||||
/*
|
||||
* If grouped by a partition column whose values are shards have disjoint sets
|
||||
* of partition values, we can push down the having qualifier.
|
||||
*/
|
||||
if (havingQual != NULL && groupedByDisjointPartitionColumn)
|
||||
{
|
||||
workerExtendedOpNode->havingQual = originalOpNode->havingQual;
|
||||
}
|
||||
|
||||
return workerExtendedOpNode;
|
||||
}
|
||||
|
@ -2277,6 +2302,46 @@ TypeOid(Oid schemaId, const char *typeName)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GroupedByDisjointPartitionColumn returns true if the query is grouped by the
|
||||
* partition column of a table whose shards have disjoint sets of partition values.
|
||||
*/
|
||||
static bool
|
||||
GroupedByDisjointPartitionColumn(List *tableNodeList, MultiExtendedOp *opNode)
|
||||
{
|
||||
bool result = false;
|
||||
ListCell *tableNodeCell = NULL;
|
||||
|
||||
foreach(tableNodeCell, tableNodeList)
|
||||
{
|
||||
MultiTable *tableNode = (MultiTable *) lfirst(tableNodeCell);
|
||||
Oid relationId = tableNode->relationId;
|
||||
char partitionMethod = 0;
|
||||
|
||||
if (relationId == SUBQUERY_RELATION_ID || !IsDistributedTable(relationId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
partitionMethod = PartitionMethod(relationId);
|
||||
if (partitionMethod != DISTRIBUTE_BY_RANGE &&
|
||||
partitionMethod != DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (GroupedByColumn(opNode->groupClauseList, opNode->targetList,
|
||||
tableNode->partitionColumn))
|
||||
{
|
||||
result = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CountDistinctHashFunctionName resolves the hll_hash function name to use for
|
||||
* the given input type, and returns this function name.
|
||||
|
@ -3180,6 +3245,8 @@ ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn)
|
|||
* The limit push-down decision tree is as follows:
|
||||
* group by?
|
||||
* 1/ \0
|
||||
* group by partition column? (exact pd)
|
||||
* 0/ \1
|
||||
* order by? (exact pd)
|
||||
* 1/ \0
|
||||
* has order by agg? (no pd)
|
||||
|
@ -3195,7 +3262,8 @@ ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn)
|
|||
* returns null.
|
||||
*/
|
||||
static Node *
|
||||
WorkerLimitCount(MultiExtendedOp *originalOpNode)
|
||||
WorkerLimitCount(MultiExtendedOp *originalOpNode,
|
||||
bool groupedByDisjointPartitionColumn)
|
||||
{
|
||||
Node *workerLimitNode = NULL;
|
||||
List *groupClauseList = originalOpNode->groupClauseList;
|
||||
|
@ -3221,11 +3289,12 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode)
|
|||
IsA(originalOpNode->limitOffset, Const));
|
||||
|
||||
/*
|
||||
* If we don't have group by clauses, or if we have order by clauses without
|
||||
* aggregates, we can push down the original limit. Else if we have order by
|
||||
* clauses with commutative aggregates, we can push down approximate limits.
|
||||
* If we don't have group by clauses, or we have group by partition column,
|
||||
* or if we have order by clauses without aggregates, we can push down the
|
||||
* original limit. Else if we have order by clauses with commutative aggregates,
|
||||
* we can push down approximate limits.
|
||||
*/
|
||||
if (groupClauseList == NIL)
|
||||
if (groupClauseList == NIL || groupedByDisjointPartitionColumn)
|
||||
{
|
||||
canPushDownLimit = true;
|
||||
}
|
||||
|
@ -3293,7 +3362,8 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode)
|
|||
* the function returns null.
|
||||
*/
|
||||
static List *
|
||||
WorkerSortClauseList(MultiExtendedOp *originalOpNode)
|
||||
WorkerSortClauseList(MultiExtendedOp *originalOpNode,
|
||||
bool groupedByDisjointPartitionColumn)
|
||||
{
|
||||
List *workerSortClauseList = NIL;
|
||||
List *groupClauseList = originalOpNode->groupClauseList;
|
||||
|
@ -3314,7 +3384,7 @@ WorkerSortClauseList(MultiExtendedOp *originalOpNode)
|
|||
* in different task results. By ordering on the group by clause, we ensure
|
||||
* that query results are consistent.
|
||||
*/
|
||||
if (groupClauseList == NIL)
|
||||
if (groupClauseList == NIL || groupedByDisjointPartitionColumn)
|
||||
{
|
||||
workerSortClauseList = originalOpNode->sortClauseList;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,185 @@
|
|||
--
|
||||
-- MULTI_HAVING_PUSHDOWN
|
||||
--
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 590000;
|
||||
CREATE TABLE lineitem_hash (LIKE lineitem);
|
||||
SELECT create_distributed_table('lineitem_hash', 'l_orderkey', 'hash');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE orders_hash (LIKE orders);
|
||||
SELECT create_distributed_table('orders_hash', 'o_orderkey', 'hash');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- push down when table is distributed by hash and grouped by partition column
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT l_orderkey, sum(l_extendedprice * l_discount) as revenue
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey HAVING sum(l_quantity) > 24
|
||||
ORDER BY 2 DESC, 1 ASC LIMIT 3;
|
||||
QUERY PLAN
|
||||
--------------------------------------------------------------------------------------------------------
|
||||
Limit
|
||||
-> Sort
|
||||
Sort Key: sum((sum(remote_scan.revenue))) DESC, remote_scan.l_orderkey
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.l_orderkey
|
||||
Filter: (sum(remote_scan.worker_column_3) > '24'::numeric)
|
||||
-> Custom Scan (Citus Real-Time)
|
||||
Task Count: 32
|
||||
Tasks Shown: One of 32
|
||||
-> Task
|
||||
Node: host=localhost port=57637 dbname=regression
|
||||
-> Limit
|
||||
-> Sort
|
||||
Sort Key: (sum((l_extendedprice * l_discount))) DESC, l_orderkey
|
||||
-> HashAggregate
|
||||
Group Key: l_orderkey
|
||||
Filter: (sum(l_quantity) > '24'::numeric)
|
||||
-> Seq Scan on lineitem_hash_590000 lineitem_hash
|
||||
(18 rows)
|
||||
|
||||
-- but don't push down when table is distributed by append
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT l_orderkey, sum(l_extendedprice * l_discount) as revenue
|
||||
FROM lineitem
|
||||
GROUP BY l_orderkey HAVING sum(l_quantity) > 24
|
||||
ORDER BY 2 DESC, 1 ASC LIMIT 3;
|
||||
QUERY PLAN
|
||||
--------------------------------------------------------------------------------
|
||||
Limit
|
||||
-> Sort
|
||||
Sort Key: sum((sum(remote_scan.revenue))) DESC, remote_scan.l_orderkey
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.l_orderkey
|
||||
Filter: (sum(remote_scan.worker_column_3) > '24'::numeric)
|
||||
-> Custom Scan (Citus Real-Time)
|
||||
Task Count: 8
|
||||
Tasks Shown: One of 8
|
||||
-> Task
|
||||
Node: host=localhost port=57637 dbname=regression
|
||||
-> HashAggregate
|
||||
Group Key: l_orderkey
|
||||
-> Seq Scan on lineitem_290001 lineitem
|
||||
(14 rows)
|
||||
|
||||
-- and don't push down when not grouped by partition column
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT l_shipmode, sum(l_extendedprice * l_discount) as revenue
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_shipmode HAVING sum(l_quantity) > 24
|
||||
ORDER BY 2 DESC, 1 ASC LIMIT 3;
|
||||
QUERY PLAN
|
||||
------------------------------------------------------------------------------------
|
||||
Limit
|
||||
-> Sort
|
||||
Sort Key: sum((sum(remote_scan.revenue))) DESC, remote_scan.l_shipmode
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.l_shipmode
|
||||
Filter: (sum(remote_scan.worker_column_3) > '24'::numeric)
|
||||
-> Custom Scan (Citus Real-Time)
|
||||
Task Count: 32
|
||||
Tasks Shown: One of 32
|
||||
-> Task
|
||||
Node: host=localhost port=57637 dbname=regression
|
||||
-> HashAggregate
|
||||
Group Key: l_shipmode
|
||||
-> Seq Scan on lineitem_hash_590000 lineitem_hash
|
||||
(14 rows)
|
||||
|
||||
-- push down if grouped by multiple rows one of which is partition column
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT l_shipmode, l_orderkey, sum(l_extendedprice * l_discount) as revenue
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_shipmode, l_orderkey HAVING sum(l_quantity) > 24
|
||||
ORDER BY 3 DESC, 1, 2 LIMIT 3;
|
||||
QUERY PLAN
|
||||
--------------------------------------------------------------------------------------------------------------------
|
||||
Limit
|
||||
-> Sort
|
||||
Sort Key: sum((sum(remote_scan.revenue))) DESC, remote_scan.l_shipmode, remote_scan.l_orderkey
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.l_shipmode, remote_scan.l_orderkey
|
||||
Filter: (sum(remote_scan.worker_column_4) > '24'::numeric)
|
||||
-> Custom Scan (Citus Real-Time)
|
||||
Task Count: 32
|
||||
Tasks Shown: One of 32
|
||||
-> Task
|
||||
Node: host=localhost port=57637 dbname=regression
|
||||
-> Limit
|
||||
-> Sort
|
||||
Sort Key: (sum((l_extendedprice * l_discount))) DESC, l_shipmode, l_orderkey
|
||||
-> HashAggregate
|
||||
Group Key: l_shipmode, l_orderkey
|
||||
Filter: (sum(l_quantity) > '24'::numeric)
|
||||
-> Seq Scan on lineitem_hash_590000 lineitem_hash
|
||||
(18 rows)
|
||||
|
||||
-- couple more checks with joins
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT sum(l_extendedprice * l_discount) as revenue
|
||||
FROM lineitem_hash, orders_hash
|
||||
WHERE o_orderkey = l_orderkey
|
||||
GROUP BY l_orderkey, o_orderkey, l_shipmode HAVING sum(l_quantity) > 24
|
||||
ORDER BY 1 DESC LIMIT 3;
|
||||
QUERY PLAN
|
||||
-----------------------------------------------------------------------------------------------------------------------------------
|
||||
Limit
|
||||
-> Sort
|
||||
Sort Key: sum((sum(remote_scan.revenue))) DESC
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.worker_column_2, remote_scan.worker_column_3, remote_scan.worker_column_4
|
||||
Filter: (sum(remote_scan.worker_column_5) > '24'::numeric)
|
||||
-> Custom Scan (Citus Real-Time)
|
||||
Task Count: 32
|
||||
Tasks Shown: One of 32
|
||||
-> Task
|
||||
Node: host=localhost port=57637 dbname=regression
|
||||
-> Limit
|
||||
-> Sort
|
||||
Sort Key: (sum((lineitem_hash.l_extendedprice * lineitem_hash.l_discount))) DESC
|
||||
-> HashAggregate
|
||||
Group Key: lineitem_hash.l_orderkey, orders_hash.o_orderkey, lineitem_hash.l_shipmode
|
||||
Filter: (sum(lineitem_hash.l_quantity) > '24'::numeric)
|
||||
-> Hash Join
|
||||
Hash Cond: (orders_hash.o_orderkey = lineitem_hash.l_orderkey)
|
||||
-> Seq Scan on orders_hash_590032 orders_hash
|
||||
-> Hash
|
||||
-> Seq Scan on lineitem_hash_590000 lineitem_hash
|
||||
(22 rows)
|
||||
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT sum(l_extendedprice * l_discount) as revenue
|
||||
FROM lineitem_hash, orders_hash
|
||||
WHERE o_orderkey = l_orderkey
|
||||
GROUP BY l_shipmode, o_clerk HAVING sum(l_quantity) > 24
|
||||
ORDER BY 1 DESC LIMIT 3;
|
||||
QUERY PLAN
|
||||
------------------------------------------------------------------------------------------------------
|
||||
Limit
|
||||
-> Sort
|
||||
Sort Key: sum((sum(remote_scan.revenue))) DESC
|
||||
-> HashAggregate
|
||||
Group Key: remote_scan.worker_column_2, remote_scan.worker_column_3
|
||||
Filter: (sum(remote_scan.worker_column_4) > '24'::numeric)
|
||||
-> Custom Scan (Citus Real-Time)
|
||||
Task Count: 32
|
||||
Tasks Shown: One of 32
|
||||
-> Task
|
||||
Node: host=localhost port=57637 dbname=regression
|
||||
-> HashAggregate
|
||||
Group Key: lineitem_hash.l_shipmode, orders_hash.o_clerk
|
||||
-> Hash Join
|
||||
Hash Cond: (orders_hash.o_orderkey = lineitem_hash.l_orderkey)
|
||||
-> Seq Scan on orders_hash_590032 orders_hash
|
||||
-> Hash
|
||||
-> Seq Scan on lineitem_hash_590000 lineitem_hash
|
||||
(18 rows)
|
||||
|
||||
DROP TABLE lineitem_hash;
|
||||
DROP TABLE orders_hash;
|
|
@ -1,6 +1,14 @@
|
|||
--
|
||||
-- MULTI_LIMIT_CLAUSE
|
||||
--
|
||||
CREATE TABLE lineitem_hash (LIKE lineitem);
|
||||
SELECT create_distributed_table('lineitem_hash', 'l_orderkey', 'hash');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO lineitem_hash SELECT * FROM lineitem;
|
||||
-- Display debug messages on limit clause push down.
|
||||
SET client_min_messages TO DEBUG1;
|
||||
-- Check that we can correctly handle the Limit clause in distributed queries.
|
||||
|
@ -203,4 +211,91 @@ DEBUG: push down of limit count: 1
|
|||
1.00 | 0.00 | 99167.304347826087
|
||||
(1 row)
|
||||
|
||||
-- We can push down LIMIT clause when we group by partition column of a hash
|
||||
-- partitioned table.
|
||||
SELECT l_orderkey, count(DISTINCT l_partkey)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey
|
||||
ORDER BY 2 DESC, 1 DESC LIMIT 5;
|
||||
DEBUG: push down of limit count: 5
|
||||
l_orderkey | count
|
||||
------------+-------
|
||||
14885 | 7
|
||||
14884 | 7
|
||||
14821 | 7
|
||||
14790 | 7
|
||||
14785 | 7
|
||||
(5 rows)
|
||||
|
||||
SELECT l_orderkey
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey
|
||||
ORDER BY l_orderkey LIMIT 5;
|
||||
DEBUG: push down of limit count: 5
|
||||
l_orderkey
|
||||
------------
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
(5 rows)
|
||||
|
||||
-- Don't push down if not grouped by partition column.
|
||||
SELECT max(l_orderkey)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_linestatus
|
||||
ORDER BY 1 DESC LIMIT 2;
|
||||
max
|
||||
-------
|
||||
14947
|
||||
14916
|
||||
(2 rows)
|
||||
|
||||
-- Don't push down if table is distributed by append
|
||||
SELECT l_orderkey, max(l_shipdate)
|
||||
FROM lineitem
|
||||
GROUP BY l_orderkey
|
||||
ORDER BY 2 DESC, 1 LIMIT 5;
|
||||
l_orderkey | max
|
||||
------------+------------
|
||||
4678 | 11-27-1998
|
||||
12384 | 11-26-1998
|
||||
1124 | 11-25-1998
|
||||
11523 | 11-22-1998
|
||||
14694 | 11-21-1998
|
||||
(5 rows)
|
||||
|
||||
-- Push down if grouped by multiple columns one of which is partition column.
|
||||
SELECT
|
||||
l_linestatus, l_orderkey, max(l_shipdate)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_linestatus, l_orderkey
|
||||
ORDER BY 3 DESC, 1, 2 LIMIT 5;
|
||||
DEBUG: push down of limit count: 5
|
||||
l_linestatus | l_orderkey | max
|
||||
--------------+------------+------------
|
||||
O | 4678 | 11-27-1998
|
||||
O | 12384 | 11-26-1998
|
||||
O | 1124 | 11-25-1998
|
||||
O | 11523 | 11-22-1998
|
||||
O | 14694 | 11-21-1998
|
||||
(5 rows)
|
||||
|
||||
-- Don't push down if grouped by multiple columns none of which is partition column.
|
||||
SELECT
|
||||
l_linestatus, l_shipmode, max(l_shipdate)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_linestatus, l_shipmode
|
||||
ORDER BY 3 DESC, 1, 2 LIMIT 5;
|
||||
l_linestatus | l_shipmode | max
|
||||
--------------+------------+------------
|
||||
O | AIR | 11-27-1998
|
||||
O | RAIL | 11-26-1998
|
||||
O | SHIP | 11-21-1998
|
||||
O | REG AIR | 11-19-1998
|
||||
O | TRUCK | 11-17-1998
|
||||
(5 rows)
|
||||
|
||||
SET client_min_messages TO NOTICE;
|
||||
DROP TABLE lineitem_hash;
|
||||
|
|
|
@ -57,9 +57,9 @@ test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
|
|||
test: multi_reference_table
|
||||
test: multi_outer_join_reference
|
||||
test: multi_single_relation_subquery
|
||||
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause multi_limit_clause_approximate
|
||||
test: multi_average_expression multi_working_columns
|
||||
test: multi_array_agg
|
||||
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate
|
||||
test: multi_average_expression multi_working_columns multi_having_pushdown
|
||||
test: multi_array_agg multi_limit_clause
|
||||
test: multi_agg_type_conversion multi_count_type_conversion
|
||||
test: multi_partition_pruning
|
||||
test: multi_join_pruning multi_hash_pruning
|
||||
|
|
|
@ -30,9 +30,9 @@ test: multi_load_data
|
|||
# Miscellaneous tests to check our query planning behavior
|
||||
# ----------
|
||||
test: multi_basic_queries multi_complex_expressions
|
||||
test: multi_agg_distinct multi_limit_clause multi_limit_clause_approximate
|
||||
test: multi_agg_distinct multi_limit_clause_approximate
|
||||
test: multi_average_expression multi_working_columns
|
||||
test: multi_array_agg
|
||||
test: multi_array_agg multi_limit_clause
|
||||
test: multi_agg_type_conversion multi_count_type_conversion
|
||||
test: multi_hash_pruning
|
||||
test: multi_query_directory_cleanup
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
--
|
||||
-- MULTI_HAVING_PUSHDOWN
|
||||
--
|
||||
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 590000;
|
||||
|
||||
CREATE TABLE lineitem_hash (LIKE lineitem);
|
||||
SELECT create_distributed_table('lineitem_hash', 'l_orderkey', 'hash');
|
||||
|
||||
CREATE TABLE orders_hash (LIKE orders);
|
||||
SELECT create_distributed_table('orders_hash', 'o_orderkey', 'hash');
|
||||
|
||||
-- push down when table is distributed by hash and grouped by partition column
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT l_orderkey, sum(l_extendedprice * l_discount) as revenue
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey HAVING sum(l_quantity) > 24
|
||||
ORDER BY 2 DESC, 1 ASC LIMIT 3;
|
||||
|
||||
-- but don't push down when table is distributed by append
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT l_orderkey, sum(l_extendedprice * l_discount) as revenue
|
||||
FROM lineitem
|
||||
GROUP BY l_orderkey HAVING sum(l_quantity) > 24
|
||||
ORDER BY 2 DESC, 1 ASC LIMIT 3;
|
||||
|
||||
-- and don't push down when not grouped by partition column
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT l_shipmode, sum(l_extendedprice * l_discount) as revenue
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_shipmode HAVING sum(l_quantity) > 24
|
||||
ORDER BY 2 DESC, 1 ASC LIMIT 3;
|
||||
|
||||
-- push down if grouped by multiple rows one of which is partition column
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT l_shipmode, l_orderkey, sum(l_extendedprice * l_discount) as revenue
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_shipmode, l_orderkey HAVING sum(l_quantity) > 24
|
||||
ORDER BY 3 DESC, 1, 2 LIMIT 3;
|
||||
|
||||
-- couple more checks with joins
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT sum(l_extendedprice * l_discount) as revenue
|
||||
FROM lineitem_hash, orders_hash
|
||||
WHERE o_orderkey = l_orderkey
|
||||
GROUP BY l_orderkey, o_orderkey, l_shipmode HAVING sum(l_quantity) > 24
|
||||
ORDER BY 1 DESC LIMIT 3;
|
||||
|
||||
EXPLAIN (COSTS FALSE)
|
||||
SELECT sum(l_extendedprice * l_discount) as revenue
|
||||
FROM lineitem_hash, orders_hash
|
||||
WHERE o_orderkey = l_orderkey
|
||||
GROUP BY l_shipmode, o_clerk HAVING sum(l_quantity) > 24
|
||||
ORDER BY 1 DESC LIMIT 3;
|
||||
|
||||
DROP TABLE lineitem_hash;
|
||||
DROP TABLE orders_hash;
|
|
@ -3,6 +3,10 @@
|
|||
--
|
||||
|
||||
|
||||
CREATE TABLE lineitem_hash (LIKE lineitem);
|
||||
SELECT create_distributed_table('lineitem_hash', 'l_orderkey', 'hash');
|
||||
INSERT INTO lineitem_hash SELECT * FROM lineitem;
|
||||
|
||||
-- Display debug messages on limit clause push down.
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
|
@ -59,4 +63,45 @@ SELECT l_quantity, l_discount, avg(l_partkey) FROM lineitem
|
|||
GROUP BY l_quantity, l_discount
|
||||
ORDER BY l_quantity, l_discount LIMIT 1;
|
||||
|
||||
|
||||
-- We can push down LIMIT clause when we group by partition column of a hash
|
||||
-- partitioned table.
|
||||
SELECT l_orderkey, count(DISTINCT l_partkey)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey
|
||||
ORDER BY 2 DESC, 1 DESC LIMIT 5;
|
||||
|
||||
SELECT l_orderkey
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey
|
||||
ORDER BY l_orderkey LIMIT 5;
|
||||
|
||||
-- Don't push down if not grouped by partition column.
|
||||
SELECT max(l_orderkey)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_linestatus
|
||||
ORDER BY 1 DESC LIMIT 2;
|
||||
|
||||
-- Don't push down if table is distributed by append
|
||||
SELECT l_orderkey, max(l_shipdate)
|
||||
FROM lineitem
|
||||
GROUP BY l_orderkey
|
||||
ORDER BY 2 DESC, 1 LIMIT 5;
|
||||
|
||||
-- Push down if grouped by multiple columns one of which is partition column.
|
||||
SELECT
|
||||
l_linestatus, l_orderkey, max(l_shipdate)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_linestatus, l_orderkey
|
||||
ORDER BY 3 DESC, 1, 2 LIMIT 5;
|
||||
|
||||
-- Don't push down if grouped by multiple columns none of which is partition column.
|
||||
SELECT
|
||||
l_linestatus, l_shipmode, max(l_shipdate)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_linestatus, l_shipmode
|
||||
ORDER BY 3 DESC, 1, 2 LIMIT 5;
|
||||
|
||||
|
||||
SET client_min_messages TO NOTICE;
|
||||
DROP TABLE lineitem_hash;
|
||||
|
|
Loading…
Reference in New Issue