From 11adb9b034fac55ef1a1a47764815d310dd49bb8 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 2 Oct 2017 20:17:51 -0400 Subject: [PATCH] Push down LIMIT and HAVING when grouped by partition key. (#1641) We can do this because all rows belonging to a group are in the same shard when grouping by distribution column on a range/hash distributed table. --- .../planner/multi_logical_optimizer.c | 114 ++++++++--- .../expected/multi_having_pushdown.out | 185 ++++++++++++++++++ .../regress/expected/multi_limit_clause.out | 95 +++++++++ src/test/regress/multi_schedule | 6 +- .../regress/multi_task_tracker_extra_schedule | 4 +- .../regress/sql/multi_having_pushdown.sql | 57 ++++++ src/test/regress/sql/multi_limit_clause.sql | 45 +++++ 7 files changed, 479 insertions(+), 27 deletions(-) create mode 100644 src/test/regress/expected/multi_having_pushdown.out create mode 100644 src/test/regress/sql/multi_having_pushdown.sql diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 63e0ad86d..e679da44c 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -109,12 +109,14 @@ static void RemoveUnaryNode(MultiUnaryNode *unaryNode); static void PullUpUnaryNode(MultiUnaryNode *unaryNode); static void ParentSetNewChild(MultiNode *parentNode, MultiNode *oldChildNode, MultiNode *newChildNode); +static bool GroupedByDisjointPartitionColumn(List *tableNodeList, + MultiExtendedOp *opNode); /* Local functions forward declarations for aggregate expressions */ static void ApplyExtendedOpNodes(MultiExtendedOp *originalNode, MultiExtendedOp *masterNode, MultiExtendedOp *workerNode); -static void TransformSubqueryNode(MultiTable *subqueryNode); +static void TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList); static MultiExtendedOp * MasterExtendedOpNode(MultiExtendedOp *originalOpNode); static Node * MasterAggregateMutator(Node *originalNode, MasterAggregateWalkerContext *walkerContext); @@ -123,7 +125,8 @@ static Expr * MasterAggregateExpression(Aggref *originalAggregate, static Expr * MasterAverageExpression(Oid sumAggregateType, Oid countAggregateType, AttrNumber *columnId); static Expr * AddTypeConversion(Node *originalAggregate, Node *newExpression); -static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode); +static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, + bool groupedByDisjointPartitionColumn); static bool WorkerAggregateWalker(Node *node, WorkerAggregateWalkerContext *walkerContext); static List * WorkerAggregateExpressionList(Aggref *originalAggregate, @@ -151,8 +154,10 @@ static bool TablePartitioningSupportsDistinct(List *tableNodeList, static bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column); /* Local functions forward declarations for limit clauses */ -static Node * WorkerLimitCount(MultiExtendedOp *originalOpNode); -static List * WorkerSortClauseList(MultiExtendedOp *originalOpNode); +static Node * WorkerLimitCount(MultiExtendedOp *originalOpNode, + bool groupedByDisjointPartitionColumn); +static List * WorkerSortClauseList(MultiExtendedOp *originalOpNode, + bool groupedByDisjointPartitionColumn); static bool CanPushDownLimitApproximate(List *sortClauseList, List *targetList); static bool HasOrderByAggregate(List *sortClauseList, List *targetList); static bool HasOrderByAverage(List *sortClauseList, List *targetList); @@ -177,6 +182,7 @@ void MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan) { bool hasOrderByHllType = false; + bool groupedByDisjointPartitionColumn = false; List *selectNodeList = NIL; List *projectNodeList = NIL; List *collectNodeList = NIL; @@ -251,19 +257,23 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan) extendedOpNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp); extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList); + tableNodeList = FindNodesOfType(logicalPlanNode, T_MultiTable); + groupedByDisjointPartitionColumn = GroupedByDisjointPartitionColumn(tableNodeList, + extendedOpNode); + masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode); - workerExtendedOpNode = WorkerExtendedOpNode(extendedOpNode); + workerExtendedOpNode = WorkerExtendedOpNode(extendedOpNode, + groupedByDisjointPartitionColumn); ApplyExtendedOpNodes(extendedOpNode, masterExtendedOpNode, workerExtendedOpNode); - tableNodeList = FindNodesOfType(logicalPlanNode, T_MultiTable); foreach(tableNodeCell, tableNodeList) { MultiTable *tableNode = (MultiTable *) lfirst(tableNodeCell); if (tableNode->relationId == SUBQUERY_RELATION_ID) { ErrorIfContainsUnsupportedAggregate((MultiNode *) tableNode); - TransformSubqueryNode(tableNode); + TransformSubqueryNode(tableNode, tableNodeList); } } @@ -1150,14 +1160,17 @@ ApplyExtendedOpNodes(MultiExtendedOp *originalNode, MultiExtendedOp *masterNode, * operator node. */ static void -TransformSubqueryNode(MultiTable *subqueryNode) +TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList) { MultiExtendedOp *extendedOpNode = (MultiExtendedOp *) ChildNode((MultiUnaryNode *) subqueryNode); MultiNode *collectNode = ChildNode((MultiUnaryNode *) extendedOpNode); MultiNode *collectChildNode = ChildNode((MultiUnaryNode *) collectNode); + bool groupedByDisjointPartitionColumn = + GroupedByDisjointPartitionColumn(tableNodeList, extendedOpNode); MultiExtendedOp *masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode); - MultiExtendedOp *workerExtendedOpNode = WorkerExtendedOpNode(extendedOpNode); + MultiExtendedOp *workerExtendedOpNode = + WorkerExtendedOpNode(extendedOpNode, groupedByDisjointPartitionColumn); MultiPartition *partitionNode = CitusMakeNode(MultiPartition); List *groupClauseList = extendedOpNode->groupClauseList; List *targetEntryList = extendedOpNode->targetList; @@ -1759,7 +1772,8 @@ AddTypeConversion(Node *originalAggregate, Node *newExpression) * list of worker extended operator. */ static MultiExtendedOp * -WorkerExtendedOpNode(MultiExtendedOp *originalOpNode) +WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, + bool groupedByDisjointPartitionColumn) { MultiExtendedOp *workerExtendedOpNode = NULL; MultiNode *parentNode = ParentNode((MultiNode *) originalOpNode); @@ -1913,8 +1927,19 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode) workerExtendedOpNode->groupClauseList = groupClauseList; /* if we can push down the limit, also set related fields */ - workerExtendedOpNode->limitCount = WorkerLimitCount(originalOpNode); - workerExtendedOpNode->sortClauseList = WorkerSortClauseList(originalOpNode); + workerExtendedOpNode->limitCount = WorkerLimitCount(originalOpNode, + groupedByDisjointPartitionColumn); + workerExtendedOpNode->sortClauseList = + WorkerSortClauseList(originalOpNode, groupedByDisjointPartitionColumn); + + /* + * If grouped by a partition column whose values are shards have disjoint sets + * of partition values, we can push down the having qualifier. + */ + if (havingQual != NULL && groupedByDisjointPartitionColumn) + { + workerExtendedOpNode->havingQual = originalOpNode->havingQual; + } return workerExtendedOpNode; } @@ -2277,6 +2302,46 @@ TypeOid(Oid schemaId, const char *typeName) } +/* + * GroupedByDisjointPartitionColumn returns true if the query is grouped by the + * partition column of a table whose shards have disjoint sets of partition values. + */ +static bool +GroupedByDisjointPartitionColumn(List *tableNodeList, MultiExtendedOp *opNode) +{ + bool result = false; + ListCell *tableNodeCell = NULL; + + foreach(tableNodeCell, tableNodeList) + { + MultiTable *tableNode = (MultiTable *) lfirst(tableNodeCell); + Oid relationId = tableNode->relationId; + char partitionMethod = 0; + + if (relationId == SUBQUERY_RELATION_ID || !IsDistributedTable(relationId)) + { + continue; + } + + partitionMethod = PartitionMethod(relationId); + if (partitionMethod != DISTRIBUTE_BY_RANGE && + partitionMethod != DISTRIBUTE_BY_HASH) + { + continue; + } + + if (GroupedByColumn(opNode->groupClauseList, opNode->targetList, + tableNode->partitionColumn)) + { + result = true; + break; + } + } + + return result; +} + + /* * CountDistinctHashFunctionName resolves the hll_hash function name to use for * the given input type, and returns this function name. @@ -3178,9 +3243,11 @@ ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn) * If they can, the function returns the limit count. * * The limit push-down decision tree is as follows: - * group by? - * 1/ \0 - * order by? (exact pd) + * group by? + * 1/ \0 + * group by partition column? (exact pd) + * 0/ \1 + * order by? (exact pd) * 1/ \0 * has order by agg? (no pd) * 1/ \0 @@ -3195,7 +3262,8 @@ ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn) * returns null. */ static Node * -WorkerLimitCount(MultiExtendedOp *originalOpNode) +WorkerLimitCount(MultiExtendedOp *originalOpNode, + bool groupedByDisjointPartitionColumn) { Node *workerLimitNode = NULL; List *groupClauseList = originalOpNode->groupClauseList; @@ -3221,11 +3289,12 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode) IsA(originalOpNode->limitOffset, Const)); /* - * If we don't have group by clauses, or if we have order by clauses without - * aggregates, we can push down the original limit. Else if we have order by - * clauses with commutative aggregates, we can push down approximate limits. + * If we don't have group by clauses, or we have group by partition column, + * or if we have order by clauses without aggregates, we can push down the + * original limit. Else if we have order by clauses with commutative aggregates, + * we can push down approximate limits. */ - if (groupClauseList == NIL) + if (groupClauseList == NIL || groupedByDisjointPartitionColumn) { canPushDownLimit = true; } @@ -3293,7 +3362,8 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode) * the function returns null. */ static List * -WorkerSortClauseList(MultiExtendedOp *originalOpNode) +WorkerSortClauseList(MultiExtendedOp *originalOpNode, + bool groupedByDisjointPartitionColumn) { List *workerSortClauseList = NIL; List *groupClauseList = originalOpNode->groupClauseList; @@ -3314,7 +3384,7 @@ WorkerSortClauseList(MultiExtendedOp *originalOpNode) * in different task results. By ordering on the group by clause, we ensure * that query results are consistent. */ - if (groupClauseList == NIL) + if (groupClauseList == NIL || groupedByDisjointPartitionColumn) { workerSortClauseList = originalOpNode->sortClauseList; } diff --git a/src/test/regress/expected/multi_having_pushdown.out b/src/test/regress/expected/multi_having_pushdown.out new file mode 100644 index 000000000..6ba07c4ad --- /dev/null +++ b/src/test/regress/expected/multi_having_pushdown.out @@ -0,0 +1,185 @@ +-- +-- MULTI_HAVING_PUSHDOWN +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 590000; +CREATE TABLE lineitem_hash (LIKE lineitem); +SELECT create_distributed_table('lineitem_hash', 'l_orderkey', 'hash'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE orders_hash (LIKE orders); +SELECT create_distributed_table('orders_hash', 'o_orderkey', 'hash'); + create_distributed_table +-------------------------- + +(1 row) + +-- push down when table is distributed by hash and grouped by partition column +EXPLAIN (COSTS FALSE) + SELECT l_orderkey, sum(l_extendedprice * l_discount) as revenue + FROM lineitem_hash + GROUP BY l_orderkey HAVING sum(l_quantity) > 24 + ORDER BY 2 DESC, 1 ASC LIMIT 3; + QUERY PLAN +-------------------------------------------------------------------------------------------------------- + Limit + -> Sort + Sort Key: sum((sum(remote_scan.revenue))) DESC, remote_scan.l_orderkey + -> HashAggregate + Group Key: remote_scan.l_orderkey + Filter: (sum(remote_scan.worker_column_3) > '24'::numeric) + -> Custom Scan (Citus Real-Time) + Task Count: 32 + Tasks Shown: One of 32 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Sort + Sort Key: (sum((l_extendedprice * l_discount))) DESC, l_orderkey + -> HashAggregate + Group Key: l_orderkey + Filter: (sum(l_quantity) > '24'::numeric) + -> Seq Scan on lineitem_hash_590000 lineitem_hash +(18 rows) + +-- but don't push down when table is distributed by append +EXPLAIN (COSTS FALSE) + SELECT l_orderkey, sum(l_extendedprice * l_discount) as revenue + FROM lineitem + GROUP BY l_orderkey HAVING sum(l_quantity) > 24 + ORDER BY 2 DESC, 1 ASC LIMIT 3; + QUERY PLAN +-------------------------------------------------------------------------------- + Limit + -> Sort + Sort Key: sum((sum(remote_scan.revenue))) DESC, remote_scan.l_orderkey + -> HashAggregate + Group Key: remote_scan.l_orderkey + Filter: (sum(remote_scan.worker_column_3) > '24'::numeric) + -> Custom Scan (Citus Real-Time) + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: l_orderkey + -> Seq Scan on lineitem_290001 lineitem +(14 rows) + +-- and don't push down when not grouped by partition column +EXPLAIN (COSTS FALSE) + SELECT l_shipmode, sum(l_extendedprice * l_discount) as revenue + FROM lineitem_hash + GROUP BY l_shipmode HAVING sum(l_quantity) > 24 + ORDER BY 2 DESC, 1 ASC LIMIT 3; + QUERY PLAN +------------------------------------------------------------------------------------ + Limit + -> Sort + Sort Key: sum((sum(remote_scan.revenue))) DESC, remote_scan.l_shipmode + -> HashAggregate + Group Key: remote_scan.l_shipmode + Filter: (sum(remote_scan.worker_column_3) > '24'::numeric) + -> Custom Scan (Citus Real-Time) + Task Count: 32 + Tasks Shown: One of 32 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: l_shipmode + -> Seq Scan on lineitem_hash_590000 lineitem_hash +(14 rows) + +-- push down if grouped by multiple rows one of which is partition column +EXPLAIN (COSTS FALSE) + SELECT l_shipmode, l_orderkey, sum(l_extendedprice * l_discount) as revenue + FROM lineitem_hash + GROUP BY l_shipmode, l_orderkey HAVING sum(l_quantity) > 24 + ORDER BY 3 DESC, 1, 2 LIMIT 3; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------- + Limit + -> Sort + Sort Key: sum((sum(remote_scan.revenue))) DESC, remote_scan.l_shipmode, remote_scan.l_orderkey + -> HashAggregate + Group Key: remote_scan.l_shipmode, remote_scan.l_orderkey + Filter: (sum(remote_scan.worker_column_4) > '24'::numeric) + -> Custom Scan (Citus Real-Time) + Task Count: 32 + Tasks Shown: One of 32 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Sort + Sort Key: (sum((l_extendedprice * l_discount))) DESC, l_shipmode, l_orderkey + -> HashAggregate + Group Key: l_shipmode, l_orderkey + Filter: (sum(l_quantity) > '24'::numeric) + -> Seq Scan on lineitem_hash_590000 lineitem_hash +(18 rows) + +-- couple more checks with joins +EXPLAIN (COSTS FALSE) + SELECT sum(l_extendedprice * l_discount) as revenue + FROM lineitem_hash, orders_hash + WHERE o_orderkey = l_orderkey + GROUP BY l_orderkey, o_orderkey, l_shipmode HAVING sum(l_quantity) > 24 + ORDER BY 1 DESC LIMIT 3; + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------- + Limit + -> Sort + Sort Key: sum((sum(remote_scan.revenue))) DESC + -> HashAggregate + Group Key: remote_scan.worker_column_2, remote_scan.worker_column_3, remote_scan.worker_column_4 + Filter: (sum(remote_scan.worker_column_5) > '24'::numeric) + -> Custom Scan (Citus Real-Time) + Task Count: 32 + Tasks Shown: One of 32 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Sort + Sort Key: (sum((lineitem_hash.l_extendedprice * lineitem_hash.l_discount))) DESC + -> HashAggregate + Group Key: lineitem_hash.l_orderkey, orders_hash.o_orderkey, lineitem_hash.l_shipmode + Filter: (sum(lineitem_hash.l_quantity) > '24'::numeric) + -> Hash Join + Hash Cond: (orders_hash.o_orderkey = lineitem_hash.l_orderkey) + -> Seq Scan on orders_hash_590032 orders_hash + -> Hash + -> Seq Scan on lineitem_hash_590000 lineitem_hash +(22 rows) + +EXPLAIN (COSTS FALSE) + SELECT sum(l_extendedprice * l_discount) as revenue + FROM lineitem_hash, orders_hash + WHERE o_orderkey = l_orderkey + GROUP BY l_shipmode, o_clerk HAVING sum(l_quantity) > 24 + ORDER BY 1 DESC LIMIT 3; + QUERY PLAN +------------------------------------------------------------------------------------------------------ + Limit + -> Sort + Sort Key: sum((sum(remote_scan.revenue))) DESC + -> HashAggregate + Group Key: remote_scan.worker_column_2, remote_scan.worker_column_3 + Filter: (sum(remote_scan.worker_column_4) > '24'::numeric) + -> Custom Scan (Citus Real-Time) + Task Count: 32 + Tasks Shown: One of 32 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: lineitem_hash.l_shipmode, orders_hash.o_clerk + -> Hash Join + Hash Cond: (orders_hash.o_orderkey = lineitem_hash.l_orderkey) + -> Seq Scan on orders_hash_590032 orders_hash + -> Hash + -> Seq Scan on lineitem_hash_590000 lineitem_hash +(18 rows) + +DROP TABLE lineitem_hash; +DROP TABLE orders_hash; diff --git a/src/test/regress/expected/multi_limit_clause.out b/src/test/regress/expected/multi_limit_clause.out index 18e1adf20..483ad107b 100644 --- a/src/test/regress/expected/multi_limit_clause.out +++ b/src/test/regress/expected/multi_limit_clause.out @@ -1,6 +1,14 @@ -- -- MULTI_LIMIT_CLAUSE -- +CREATE TABLE lineitem_hash (LIKE lineitem); +SELECT create_distributed_table('lineitem_hash', 'l_orderkey', 'hash'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO lineitem_hash SELECT * FROM lineitem; -- Display debug messages on limit clause push down. SET client_min_messages TO DEBUG1; -- Check that we can correctly handle the Limit clause in distributed queries. @@ -203,4 +211,91 @@ DEBUG: push down of limit count: 1 1.00 | 0.00 | 99167.304347826087 (1 row) +-- We can push down LIMIT clause when we group by partition column of a hash +-- partitioned table. +SELECT l_orderkey, count(DISTINCT l_partkey) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 2 DESC, 1 DESC LIMIT 5; +DEBUG: push down of limit count: 5 + l_orderkey | count +------------+------- + 14885 | 7 + 14884 | 7 + 14821 | 7 + 14790 | 7 + 14785 | 7 +(5 rows) + +SELECT l_orderkey + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY l_orderkey LIMIT 5; +DEBUG: push down of limit count: 5 + l_orderkey +------------ + 1 + 2 + 3 + 4 + 5 +(5 rows) + +-- Don't push down if not grouped by partition column. +SELECT max(l_orderkey) + FROM lineitem_hash + GROUP BY l_linestatus + ORDER BY 1 DESC LIMIT 2; + max +------- + 14947 + 14916 +(2 rows) + +-- Don't push down if table is distributed by append +SELECT l_orderkey, max(l_shipdate) + FROM lineitem + GROUP BY l_orderkey + ORDER BY 2 DESC, 1 LIMIT 5; + l_orderkey | max +------------+------------ + 4678 | 11-27-1998 + 12384 | 11-26-1998 + 1124 | 11-25-1998 + 11523 | 11-22-1998 + 14694 | 11-21-1998 +(5 rows) + +-- Push down if grouped by multiple columns one of which is partition column. +SELECT + l_linestatus, l_orderkey, max(l_shipdate) + FROM lineitem_hash + GROUP BY l_linestatus, l_orderkey + ORDER BY 3 DESC, 1, 2 LIMIT 5; +DEBUG: push down of limit count: 5 + l_linestatus | l_orderkey | max +--------------+------------+------------ + O | 4678 | 11-27-1998 + O | 12384 | 11-26-1998 + O | 1124 | 11-25-1998 + O | 11523 | 11-22-1998 + O | 14694 | 11-21-1998 +(5 rows) + +-- Don't push down if grouped by multiple columns none of which is partition column. +SELECT + l_linestatus, l_shipmode, max(l_shipdate) + FROM lineitem_hash + GROUP BY l_linestatus, l_shipmode + ORDER BY 3 DESC, 1, 2 LIMIT 5; + l_linestatus | l_shipmode | max +--------------+------------+------------ + O | AIR | 11-27-1998 + O | RAIL | 11-26-1998 + O | SHIP | 11-21-1998 + O | REG AIR | 11-19-1998 + O | TRUCK | 11-17-1998 +(5 rows) + SET client_min_messages TO NOTICE; +DROP TABLE lineitem_hash; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 1b42d6602..b24532b17 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -57,9 +57,9 @@ test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc test: multi_reference_table test: multi_outer_join_reference test: multi_single_relation_subquery -test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause multi_limit_clause_approximate -test: multi_average_expression multi_working_columns -test: multi_array_agg +test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate +test: multi_average_expression multi_working_columns multi_having_pushdown +test: multi_array_agg multi_limit_clause test: multi_agg_type_conversion multi_count_type_conversion test: multi_partition_pruning test: multi_join_pruning multi_hash_pruning diff --git a/src/test/regress/multi_task_tracker_extra_schedule b/src/test/regress/multi_task_tracker_extra_schedule index a581f4336..585d19ab6 100644 --- a/src/test/regress/multi_task_tracker_extra_schedule +++ b/src/test/regress/multi_task_tracker_extra_schedule @@ -30,9 +30,9 @@ test: multi_load_data # Miscellaneous tests to check our query planning behavior # ---------- test: multi_basic_queries multi_complex_expressions -test: multi_agg_distinct multi_limit_clause multi_limit_clause_approximate +test: multi_agg_distinct multi_limit_clause_approximate test: multi_average_expression multi_working_columns -test: multi_array_agg +test: multi_array_agg multi_limit_clause test: multi_agg_type_conversion multi_count_type_conversion test: multi_hash_pruning test: multi_query_directory_cleanup diff --git a/src/test/regress/sql/multi_having_pushdown.sql b/src/test/regress/sql/multi_having_pushdown.sql new file mode 100644 index 000000000..99f3c2971 --- /dev/null +++ b/src/test/regress/sql/multi_having_pushdown.sql @@ -0,0 +1,57 @@ +-- +-- MULTI_HAVING_PUSHDOWN +-- + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 590000; + +CREATE TABLE lineitem_hash (LIKE lineitem); +SELECT create_distributed_table('lineitem_hash', 'l_orderkey', 'hash'); + +CREATE TABLE orders_hash (LIKE orders); +SELECT create_distributed_table('orders_hash', 'o_orderkey', 'hash'); + +-- push down when table is distributed by hash and grouped by partition column +EXPLAIN (COSTS FALSE) + SELECT l_orderkey, sum(l_extendedprice * l_discount) as revenue + FROM lineitem_hash + GROUP BY l_orderkey HAVING sum(l_quantity) > 24 + ORDER BY 2 DESC, 1 ASC LIMIT 3; + +-- but don't push down when table is distributed by append +EXPLAIN (COSTS FALSE) + SELECT l_orderkey, sum(l_extendedprice * l_discount) as revenue + FROM lineitem + GROUP BY l_orderkey HAVING sum(l_quantity) > 24 + ORDER BY 2 DESC, 1 ASC LIMIT 3; + +-- and don't push down when not grouped by partition column +EXPLAIN (COSTS FALSE) + SELECT l_shipmode, sum(l_extendedprice * l_discount) as revenue + FROM lineitem_hash + GROUP BY l_shipmode HAVING sum(l_quantity) > 24 + ORDER BY 2 DESC, 1 ASC LIMIT 3; + +-- push down if grouped by multiple rows one of which is partition column +EXPLAIN (COSTS FALSE) + SELECT l_shipmode, l_orderkey, sum(l_extendedprice * l_discount) as revenue + FROM lineitem_hash + GROUP BY l_shipmode, l_orderkey HAVING sum(l_quantity) > 24 + ORDER BY 3 DESC, 1, 2 LIMIT 3; + +-- couple more checks with joins +EXPLAIN (COSTS FALSE) + SELECT sum(l_extendedprice * l_discount) as revenue + FROM lineitem_hash, orders_hash + WHERE o_orderkey = l_orderkey + GROUP BY l_orderkey, o_orderkey, l_shipmode HAVING sum(l_quantity) > 24 + ORDER BY 1 DESC LIMIT 3; + +EXPLAIN (COSTS FALSE) + SELECT sum(l_extendedprice * l_discount) as revenue + FROM lineitem_hash, orders_hash + WHERE o_orderkey = l_orderkey + GROUP BY l_shipmode, o_clerk HAVING sum(l_quantity) > 24 + ORDER BY 1 DESC LIMIT 3; + +DROP TABLE lineitem_hash; +DROP TABLE orders_hash; diff --git a/src/test/regress/sql/multi_limit_clause.sql b/src/test/regress/sql/multi_limit_clause.sql index 6ddc9f770..47a8bd3e3 100644 --- a/src/test/regress/sql/multi_limit_clause.sql +++ b/src/test/regress/sql/multi_limit_clause.sql @@ -3,6 +3,10 @@ -- +CREATE TABLE lineitem_hash (LIKE lineitem); +SELECT create_distributed_table('lineitem_hash', 'l_orderkey', 'hash'); +INSERT INTO lineitem_hash SELECT * FROM lineitem; + -- Display debug messages on limit clause push down. SET client_min_messages TO DEBUG1; @@ -59,4 +63,45 @@ SELECT l_quantity, l_discount, avg(l_partkey) FROM lineitem GROUP BY l_quantity, l_discount ORDER BY l_quantity, l_discount LIMIT 1; + +-- We can push down LIMIT clause when we group by partition column of a hash +-- partitioned table. +SELECT l_orderkey, count(DISTINCT l_partkey) + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY 2 DESC, 1 DESC LIMIT 5; + +SELECT l_orderkey + FROM lineitem_hash + GROUP BY l_orderkey + ORDER BY l_orderkey LIMIT 5; + +-- Don't push down if not grouped by partition column. +SELECT max(l_orderkey) + FROM lineitem_hash + GROUP BY l_linestatus + ORDER BY 1 DESC LIMIT 2; + +-- Don't push down if table is distributed by append +SELECT l_orderkey, max(l_shipdate) + FROM lineitem + GROUP BY l_orderkey + ORDER BY 2 DESC, 1 LIMIT 5; + +-- Push down if grouped by multiple columns one of which is partition column. +SELECT + l_linestatus, l_orderkey, max(l_shipdate) + FROM lineitem_hash + GROUP BY l_linestatus, l_orderkey + ORDER BY 3 DESC, 1, 2 LIMIT 5; + +-- Don't push down if grouped by multiple columns none of which is partition column. +SELECT + l_linestatus, l_shipmode, max(l_shipdate) + FROM lineitem_hash + GROUP BY l_linestatus, l_shipmode + ORDER BY 3 DESC, 1, 2 LIMIT 5; + + SET client_min_messages TO NOTICE; +DROP TABLE lineitem_hash;