From 5b54e28f937ffdd614aa44394758e4d771524cfa Mon Sep 17 00:00:00 2001 From: Eren Date: Fri, 10 Jun 2016 17:38:59 +0300 Subject: [PATCH] Add LIMIT/OFFSET Support Fixes #394 This change adds LIMIT/OFFSET support for non router-plannable distributed queries. In cases that we can push the LIMIT down, we add the OFFSET value to that LIMIT in the worker queries. When a query with LIMIT x OFFSET y is issued, the query is propagated to the workers as LIMIT (x+y) OFFSET 0, and on the master table, the original LIMIT and OFFSET values are used. With this change, we can use OFFSET wherever we can use LIMIT. --- .../planner/multi_logical_optimizer.c | 108 +++++++++++++---- .../planner/multi_logical_planner.c | 12 +- .../planner/multi_master_planner.c | 2 +- .../regress/expected/multi_basic_queries.out | 1 - .../expected/multi_complex_expressions.out | 112 ++++++++++++++++++ .../expected/multi_verify_no_subquery.out | 1 - src/test/regress/sql/multi_basic_queries.sql | 1 - .../regress/sql/multi_complex_expressions.sql | 67 +++++++++++ .../regress/sql/multi_verify_no_subquery.sql | 1 - 9 files changed, 271 insertions(+), 34 deletions(-) diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 3b771ecb1..91b98e7b8 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -130,6 +130,7 @@ static Oid AggregateFunctionOid(const char *functionName, Oid inputType); static char * CountDistinctHashFunctionName(Oid argumentType); static int CountDistinctStorageSize(double approximationErrorRate); static Const * MakeIntegerConst(int32 integerValue); +static Const * MakeIntegerConstInt64(int64 integerValue); /* Local functions forward declarations for aggregate expression checks */ static void ErrorIfContainsUnsupportedAggregate(MultiNode *logicalPlanNode); @@ -1282,6 +1283,7 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode) masterExtendedOpNode->groupClauseList = originalOpNode->groupClauseList; masterExtendedOpNode->sortClauseList = originalOpNode->sortClauseList; masterExtendedOpNode->limitCount = originalOpNode->limitCount; + masterExtendedOpNode->limitOffset = originalOpNode->limitOffset; return masterExtendedOpNode; } @@ -2220,6 +2222,24 @@ MakeIntegerConst(int32 integerValue) } +/* Makes a 64-bit integer constant node from the given value, and returns that node. */ +static Const * +MakeIntegerConstInt64(int64 integerValue) +{ + const int typeCollationId = get_typcollation(INT8OID); + const int16 typeLength = get_typlen(INT8OID); + const int32 typeModifier = -1; + const bool typeIsNull = false; + const bool typePassByValue = true; + + Datum integer64Datum = Int64GetDatum(integerValue); + Const *integer64Const = makeConst(INT8OID, typeModifier, typeCollationId, typeLength, + integer64Datum, typeIsNull, typePassByValue); + + return integer64Const; +} + + /* * ErrorIfContainsUnsupportedAggregate extracts aggregate expressions from the * logical plan, walks over them and uses helper functions to check if we can @@ -2707,7 +2727,7 @@ ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHasLimit) if (subqueryTree->limitOffset) { preconditionsSatisfied = false; - errorDetail = "Limit Offset clause is currently unsupported"; + errorDetail = "Offset clause is currently unsupported"; } if (subqueryTree->limitCount && !outerQueryHasLimit) @@ -4080,16 +4100,35 @@ EqualOpExpressionLists(List *firstOpExpressionList, List *secondOpExpressionList * WorkerLimitCount checks if the given extended node contains a limit node, and * if that node can be pushed down. For this, the function checks if this limit * count or a meaningful approximation of it can be pushed down to worker nodes. - * If they can, the function returns the limit count. Otherwise, the function + * If they can, the function returns the limit count. + * + * The limit push-down decision tree is as follows: + * group by? + * 1/ \0 + * order by? (exact pd) + * 1/ \0 + * has order by agg? (no pd) + * 1/ \0 + * can approximate? (exact pd) + * 1/ \0 + * (approx pd) (no pd) + * + * When an offset is present, the offset value is added to limit because for a query + * with LIMIT x OFFSET y, (x+y) records should be pulled from the workers. + * + * If no limit is present or can be pushed down, then WorkerLimitCount * returns null. */ static Node * WorkerLimitCount(MultiExtendedOp *originalOpNode) { - Node *workerLimitCount = NULL; + Node *workerLimitNode = NULL; List *groupClauseList = originalOpNode->groupClauseList; List *sortClauseList = originalOpNode->sortClauseList; List *targetList = originalOpNode->targetList; + bool hasOrderByAggregate = HasOrderByAggregate(sortClauseList, targetList); + bool canPushDownLimit = false; + bool canApproximate = false; /* no limit node to push down */ if (originalOpNode->limitCount == NULL) @@ -4104,38 +4143,61 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode) */ if (groupClauseList == NIL) { - workerLimitCount = originalOpNode->limitCount; + canPushDownLimit = true; } - else if (sortClauseList != NIL) + else if (sortClauseList == NIL) { - bool orderByNonAggregates = !(HasOrderByAggregate(sortClauseList, targetList)); - bool canApproximate = CanPushDownLimitApproximate(sortClauseList, targetList); + canPushDownLimit = false; + } + else if (!hasOrderByAggregate) + { + canPushDownLimit = true; + } + else + { + canApproximate = CanPushDownLimitApproximate(sortClauseList, targetList); + } - if (orderByNonAggregates) - { - workerLimitCount = originalOpNode->limitCount; - } - else if (canApproximate) - { - Const *workerLimitConst = (Const *) copyObject(originalOpNode->limitCount); - int64 workerLimitCountInt64 = (int64) LimitClauseRowFetchCount; - workerLimitConst->constvalue = Int64GetDatum(workerLimitCountInt64); + /* create the workerLimitNode according to the decisions above */ + if (canPushDownLimit) + { + workerLimitNode = (Node *) copyObject(originalOpNode->limitCount); + } + else if (canApproximate) + { + Const *workerLimitConst = (Const *) copyObject(originalOpNode->limitCount); + int64 workerLimitCount = (int64) LimitClauseRowFetchCount; + workerLimitConst->constvalue = Int64GetDatum(workerLimitCount); - workerLimitCount = (Node *) workerLimitConst; - } + workerLimitNode = (Node *) workerLimitConst; + } + + /* + * If offset clause is present and limit can be pushed down (whether exactly or + * approximately), add the offset value to limit on workers + */ + if (workerLimitNode != NULL && originalOpNode->limitOffset != NULL) + { + Const *workerLimitConst = (Const *) workerLimitNode; + Const *workerOffsetConst = (Const *) originalOpNode->limitOffset; + int64 workerLimitCount = DatumGetInt64(workerLimitConst->constvalue); + int64 workerOffsetCount = DatumGetInt64(workerOffsetConst->constvalue); + + workerLimitCount = workerLimitCount + workerOffsetCount; + workerLimitNode = (Node *) MakeIntegerConstInt64(workerLimitCount); } /* display debug message on limit push down */ - if (workerLimitCount != NULL) + if (workerLimitNode != NULL) { - Const *workerLimitConst = (Const *) workerLimitCount; - int64 workerLimitCountInt64 = DatumGetInt64(workerLimitConst->constvalue); + Const *workerLimitConst = (Const *) workerLimitNode; + int64 workerLimitCount = DatumGetInt64(workerLimitConst->constvalue); ereport(DEBUG1, (errmsg("push down of limit count: " INT64_FORMAT, - workerLimitCountInt64))); + workerLimitCount))); } - return workerLimitCount; + return workerLimitNode; } diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 79be0cbe3..76e21fe24 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -382,12 +382,6 @@ ErrorIfQueryNotSupported(Query *queryTree) errorDetail = "Window functions are currently unsupported"; } - if (queryTree->limitOffset) - { - preconditionsSatisfied = false; - errorDetail = "Limit Offset clause is currently unsupported"; - } - if (queryTree->setOperations) { preconditionsSatisfied = false; @@ -562,6 +556,12 @@ ErrorIfSubqueryNotSupported(Query *subqueryTree) errorDetail = "Subqueries with limit are not supported yet"; } + if (subqueryTree->limitOffset != NULL) + { + preconditionsSatisfied = false; + errorDetail = "Subqueries with offset are not supported yet"; + } + /* finally check and error out if not satisfied */ if (!preconditionsSatisfied) { diff --git a/src/backend/distributed/planner/multi_master_planner.c b/src/backend/distributed/planner/multi_master_planner.c index 6fea9712c..32c59774d 100644 --- a/src/backend/distributed/planner/multi_master_planner.c +++ b/src/backend/distributed/planner/multi_master_planner.c @@ -246,7 +246,7 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName, } /* (5) add a limit plan if needed */ - if (masterQuery->limitCount) + if (masterQuery->limitCount || masterQuery->limitOffset) { Node *limitCount = masterQuery->limitCount; Node *limitOffset = masterQuery->limitOffset; diff --git a/src/test/regress/expected/multi_basic_queries.out b/src/test/regress/expected/multi_basic_queries.out index 5a3eaaf04..c225ed6ef 100644 --- a/src/test/regress/expected/multi_basic_queries.out +++ b/src/test/regress/expected/multi_basic_queries.out @@ -2,7 +2,6 @@ -- MULTI_BASIC_QUERIES -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 440000; -ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 440000; -- Execute simple sum, average, and count queries on data recently uploaded to -- our partitioned table. SELECT count(*) FROM lineitem; diff --git a/src/test/regress/expected/multi_complex_expressions.out b/src/test/regress/expected/multi_complex_expressions.out index 3217e1681..4790b9753 100644 --- a/src/test/regress/expected/multi_complex_expressions.out +++ b/src/test/regress/expected/multi_complex_expressions.out @@ -347,3 +347,115 @@ SELECT count(*) FROM lineitem JOIN orders ON l_orderkey = o_orderkey SELECT count(*) FROM lineitem, orders WHERE l_orderkey + 1 = o_orderkey; ERROR: cannot perform local joins that involve expressions DETAIL: local joins can be performed between columns only +-- Check that we can issue limit/offset queries +-- OFFSET in subqueries are not supported +-- Error in the planner when subquery pushdown is off +SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; +ERROR: cannot perform distributed planning on this query +DETAIL: Subqueries with offset are not supported yet +SET citus.subquery_pushdown TO true; +-- Error in the optimizer when subquery pushdown is on +SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; +ERROR: cannot push down this subquery +DETAIL: Offset clause is currently unsupported +SET citus.subquery_pushdown TO false; +-- Simple LIMIT/OFFSET with ORDER BY +SELECT o_orderkey FROM orders ORDER BY o_orderkey LIMIT 10 OFFSET 20; + o_orderkey +------------ + 69 + 70 + 71 + 96 + 97 + 98 + 99 + 100 + 101 + 102 +(10 rows) + +-- LIMIT/OFFSET with a subquery +SET client_min_messages TO 'debug1'; +SET citus.task_executor_type TO 'task-tracker'; +SELECT + customer_keys.o_custkey, + SUM(order_count) AS total_order_count +FROM + (SELECT o_custkey, o_orderstatus, COUNT(*) AS order_count + FROM orders GROUP BY o_custkey, o_orderstatus ) customer_keys +GROUP BY + customer_keys.o_custkey +ORDER BY + customer_keys.o_custkey DESC +LIMIT 10 OFFSET 20; +DEBUG: push down of limit count: 30 +DEBUG: building index "pg_toast_16953_index" on table "pg_toast_16953" + o_custkey | total_order_count +-----------+------------------- + 1466 | 1 + 1465 | 2 + 1463 | 4 + 1462 | 10 + 1460 | 1 + 1459 | 6 + 1457 | 1 + 1456 | 3 + 1454 | 2 + 1453 | 5 +(10 rows) + +SET citus.task_executor_type TO 'real-time'; +-- Ensure that we push down LIMIT and OFFSET properly +-- No Group-By -> Push Down +CREATE TEMP TABLE temp_limit_test_1 AS +SELECT o_custkey FROM orders LIMIT 10 OFFSET 15; +DEBUG: push down of limit count: 25 +-- GROUP BY without ORDER BY -> No push-down +CREATE TEMP TABLE temp_limit_test_2 AS +SELECT o_custkey FROM orders GROUP BY o_custkey LIMIT 10 OFFSET 15; +-- GROUP BY and ORDER BY non-aggregate -> push-down +CREATE TEMP TABLE temp_limit_test_3 AS +SELECT o_custkey FROM orders GROUP BY o_custkey ORDER BY o_custkey LIMIT 10 OFFSET 15; +DEBUG: push down of limit count: 25 +-- GROUP BY and ORDER BY aggregate -> No push-down +CREATE TEMP TABLE temp_limit_test_4 AS +SELECT o_custkey, COUNT(*) AS ccnt FROM orders GROUP BY o_custkey ORDER BY ccnt DESC LIMIT 10 OFFSET 15; +-- OFFSET without LIMIT +SELECT o_custkey FROM orders ORDER BY o_custkey OFFSET 2980; + o_custkey +----------- + 1498 + 1499 + 1499 + 1499 +(4 rows) + +-- LIMIT/OFFSET with Joins +SELECT + li.l_partkey, + o.o_custkey, + li.l_quantity +FROM + lineitem li JOIN orders o ON li.l_orderkey = o.o_orderkey +WHERE + li.l_quantity > 25 +ORDER BY + li.l_quantity +LIMIT 10 OFFSET 20; +DEBUG: push down of limit count: 30 + l_partkey | o_custkey | l_quantity +-----------+-----------+------------ + 135912 | 509 | 26.00 + 75351 | 1261 | 26.00 + 199475 | 1052 | 26.00 + 91309 | 8 | 26.00 + 53624 | 400 | 26.00 + 182736 | 1048 | 26.00 + 59694 | 163 | 26.00 + 20481 | 173 | 26.00 + 78748 | 1499 | 26.00 + 7614 | 1397 | 26.00 +(10 rows) + +RESET client_min_messages; diff --git a/src/test/regress/expected/multi_verify_no_subquery.out b/src/test/regress/expected/multi_verify_no_subquery.out index 74cef3a26..0995761cf 100644 --- a/src/test/regress/expected/multi_verify_no_subquery.out +++ b/src/test/regress/expected/multi_verify_no_subquery.out @@ -4,7 +4,6 @@ -- This test checks that we simply emit an error message instead of trying to -- process a distributed unsupported SQL subquery. ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1030000; -ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1030000; SELECT * FROM lineitem WHERE l_orderkey IN (SELECT l_orderkey FROM lineitem WHERE l_quantity > 0); ERROR: cannot perform distributed planning on this query diff --git a/src/test/regress/sql/multi_basic_queries.sql b/src/test/regress/sql/multi_basic_queries.sql index f00d0a148..27160ea01 100644 --- a/src/test/regress/sql/multi_basic_queries.sql +++ b/src/test/regress/sql/multi_basic_queries.sql @@ -4,7 +4,6 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 440000; -ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 440000; -- Execute simple sum, average, and count queries on data recently uploaded to diff --git a/src/test/regress/sql/multi_complex_expressions.sql b/src/test/regress/sql/multi_complex_expressions.sql index 89205be85..842c6f6cb 100644 --- a/src/test/regress/sql/multi_complex_expressions.sql +++ b/src/test/regress/sql/multi_complex_expressions.sql @@ -158,3 +158,70 @@ SELECT count(*) FROM lineitem JOIN orders ON l_orderkey = o_orderkey -- Check that we make sure local joins are between columns only. SELECT count(*) FROM lineitem, orders WHERE l_orderkey + 1 = o_orderkey; + +-- Check that we can issue limit/offset queries + +-- OFFSET in subqueries are not supported +-- Error in the planner when subquery pushdown is off +SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; +SET citus.subquery_pushdown TO true; + +-- Error in the optimizer when subquery pushdown is on +SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; +SET citus.subquery_pushdown TO false; + +-- Simple LIMIT/OFFSET with ORDER BY +SELECT o_orderkey FROM orders ORDER BY o_orderkey LIMIT 10 OFFSET 20; + +-- LIMIT/OFFSET with a subquery +SET client_min_messages TO 'debug1'; +SET citus.task_executor_type TO 'task-tracker'; + +SELECT + customer_keys.o_custkey, + SUM(order_count) AS total_order_count +FROM + (SELECT o_custkey, o_orderstatus, COUNT(*) AS order_count + FROM orders GROUP BY o_custkey, o_orderstatus ) customer_keys +GROUP BY + customer_keys.o_custkey +ORDER BY + customer_keys.o_custkey DESC +LIMIT 10 OFFSET 20; + +SET citus.task_executor_type TO 'real-time'; + +-- Ensure that we push down LIMIT and OFFSET properly +-- No Group-By -> Push Down +CREATE TEMP TABLE temp_limit_test_1 AS +SELECT o_custkey FROM orders LIMIT 10 OFFSET 15; + +-- GROUP BY without ORDER BY -> No push-down +CREATE TEMP TABLE temp_limit_test_2 AS +SELECT o_custkey FROM orders GROUP BY o_custkey LIMIT 10 OFFSET 15; + +-- GROUP BY and ORDER BY non-aggregate -> push-down +CREATE TEMP TABLE temp_limit_test_3 AS +SELECT o_custkey FROM orders GROUP BY o_custkey ORDER BY o_custkey LIMIT 10 OFFSET 15; + +-- GROUP BY and ORDER BY aggregate -> No push-down +CREATE TEMP TABLE temp_limit_test_4 AS +SELECT o_custkey, COUNT(*) AS ccnt FROM orders GROUP BY o_custkey ORDER BY ccnt DESC LIMIT 10 OFFSET 15; + +-- OFFSET without LIMIT +SELECT o_custkey FROM orders ORDER BY o_custkey OFFSET 2980; + +-- LIMIT/OFFSET with Joins +SELECT + li.l_partkey, + o.o_custkey, + li.l_quantity +FROM + lineitem li JOIN orders o ON li.l_orderkey = o.o_orderkey +WHERE + li.l_quantity > 25 +ORDER BY + li.l_quantity +LIMIT 10 OFFSET 20; + +RESET client_min_messages; diff --git a/src/test/regress/sql/multi_verify_no_subquery.sql b/src/test/regress/sql/multi_verify_no_subquery.sql index b94eddcca..d475964bf 100644 --- a/src/test/regress/sql/multi_verify_no_subquery.sql +++ b/src/test/regress/sql/multi_verify_no_subquery.sql @@ -7,7 +7,6 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1030000; -ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1030000; SELECT * FROM lineitem WHERE l_orderkey IN