Merge pull request #596 from citusdata/limit_offset_support

Add LIMIT/OFFSET Support
pull/651/head
Eren Başak 2016-07-18 12:38:08 +03:00 committed by GitHub
commit 654a4ac71d
9 changed files with 271 additions and 34 deletions

View File

@ -130,6 +130,7 @@ static Oid AggregateFunctionOid(const char *functionName, Oid inputType);
static char * CountDistinctHashFunctionName(Oid argumentType); static char * CountDistinctHashFunctionName(Oid argumentType);
static int CountDistinctStorageSize(double approximationErrorRate); static int CountDistinctStorageSize(double approximationErrorRate);
static Const * MakeIntegerConst(int32 integerValue); static Const * MakeIntegerConst(int32 integerValue);
static Const * MakeIntegerConstInt64(int64 integerValue);
/* Local functions forward declarations for aggregate expression checks */ /* Local functions forward declarations for aggregate expression checks */
static void ErrorIfContainsUnsupportedAggregate(MultiNode *logicalPlanNode); static void ErrorIfContainsUnsupportedAggregate(MultiNode *logicalPlanNode);
@ -1282,6 +1283,7 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode)
masterExtendedOpNode->groupClauseList = originalOpNode->groupClauseList; masterExtendedOpNode->groupClauseList = originalOpNode->groupClauseList;
masterExtendedOpNode->sortClauseList = originalOpNode->sortClauseList; masterExtendedOpNode->sortClauseList = originalOpNode->sortClauseList;
masterExtendedOpNode->limitCount = originalOpNode->limitCount; masterExtendedOpNode->limitCount = originalOpNode->limitCount;
masterExtendedOpNode->limitOffset = originalOpNode->limitOffset;
return masterExtendedOpNode; return masterExtendedOpNode;
} }
@ -2220,6 +2222,24 @@ MakeIntegerConst(int32 integerValue)
} }
/* Makes a 64-bit integer constant node from the given value, and returns that node. */
static Const *
MakeIntegerConstInt64(int64 integerValue)
{
const int typeCollationId = get_typcollation(INT8OID);
const int16 typeLength = get_typlen(INT8OID);
const int32 typeModifier = -1;
const bool typeIsNull = false;
const bool typePassByValue = true;
Datum integer64Datum = Int64GetDatum(integerValue);
Const *integer64Const = makeConst(INT8OID, typeModifier, typeCollationId, typeLength,
integer64Datum, typeIsNull, typePassByValue);
return integer64Const;
}
/* /*
* ErrorIfContainsUnsupportedAggregate extracts aggregate expressions from the * ErrorIfContainsUnsupportedAggregate extracts aggregate expressions from the
* logical plan, walks over them and uses helper functions to check if we can * logical plan, walks over them and uses helper functions to check if we can
@ -2707,7 +2727,7 @@ ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHasLimit)
if (subqueryTree->limitOffset) if (subqueryTree->limitOffset)
{ {
preconditionsSatisfied = false; preconditionsSatisfied = false;
errorDetail = "Limit Offset clause is currently unsupported"; errorDetail = "Offset clause is currently unsupported";
} }
if (subqueryTree->limitCount && !outerQueryHasLimit) if (subqueryTree->limitCount && !outerQueryHasLimit)
@ -4080,16 +4100,35 @@ EqualOpExpressionLists(List *firstOpExpressionList, List *secondOpExpressionList
* WorkerLimitCount checks if the given extended node contains a limit node, and * WorkerLimitCount checks if the given extended node contains a limit node, and
* if that node can be pushed down. For this, the function checks if this limit * if that node can be pushed down. For this, the function checks if this limit
* count or a meaningful approximation of it can be pushed down to worker nodes. * count or a meaningful approximation of it can be pushed down to worker nodes.
* If they can, the function returns the limit count. Otherwise, the function * If they can, the function returns the limit count.
*
* The limit push-down decision tree is as follows:
* group by?
* 1/ \0
* order by? (exact pd)
* 1/ \0
* has order by agg? (no pd)
* 1/ \0
* can approximate? (exact pd)
* 1/ \0
* (approx pd) (no pd)
*
* When an offset is present, the offset value is added to limit because for a query
* with LIMIT x OFFSET y, (x+y) records should be pulled from the workers.
*
* If no limit is present or can be pushed down, then WorkerLimitCount
* returns null. * returns null.
*/ */
static Node * static Node *
WorkerLimitCount(MultiExtendedOp *originalOpNode) WorkerLimitCount(MultiExtendedOp *originalOpNode)
{ {
Node *workerLimitCount = NULL; Node *workerLimitNode = NULL;
List *groupClauseList = originalOpNode->groupClauseList; List *groupClauseList = originalOpNode->groupClauseList;
List *sortClauseList = originalOpNode->sortClauseList; List *sortClauseList = originalOpNode->sortClauseList;
List *targetList = originalOpNode->targetList; List *targetList = originalOpNode->targetList;
bool hasOrderByAggregate = HasOrderByAggregate(sortClauseList, targetList);
bool canPushDownLimit = false;
bool canApproximate = false;
/* no limit node to push down */ /* no limit node to push down */
if (originalOpNode->limitCount == NULL) if (originalOpNode->limitCount == NULL)
@ -4104,38 +4143,61 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode)
*/ */
if (groupClauseList == NIL) if (groupClauseList == NIL)
{ {
workerLimitCount = originalOpNode->limitCount; canPushDownLimit = true;
} }
else if (sortClauseList != NIL) else if (sortClauseList == NIL)
{ {
bool orderByNonAggregates = !(HasOrderByAggregate(sortClauseList, targetList)); canPushDownLimit = false;
bool canApproximate = CanPushDownLimitApproximate(sortClauseList, targetList); }
else if (!hasOrderByAggregate)
{
canPushDownLimit = true;
}
else
{
canApproximate = CanPushDownLimitApproximate(sortClauseList, targetList);
}
if (orderByNonAggregates) /* create the workerLimitNode according to the decisions above */
if (canPushDownLimit)
{ {
workerLimitCount = originalOpNode->limitCount; workerLimitNode = (Node *) copyObject(originalOpNode->limitCount);
} }
else if (canApproximate) else if (canApproximate)
{ {
Const *workerLimitConst = (Const *) copyObject(originalOpNode->limitCount); Const *workerLimitConst = (Const *) copyObject(originalOpNode->limitCount);
int64 workerLimitCountInt64 = (int64) LimitClauseRowFetchCount; int64 workerLimitCount = (int64) LimitClauseRowFetchCount;
workerLimitConst->constvalue = Int64GetDatum(workerLimitCountInt64); workerLimitConst->constvalue = Int64GetDatum(workerLimitCount);
workerLimitCount = (Node *) workerLimitConst; workerLimitNode = (Node *) workerLimitConst;
} }
/*
* If offset clause is present and limit can be pushed down (whether exactly or
* approximately), add the offset value to limit on workers
*/
if (workerLimitNode != NULL && originalOpNode->limitOffset != NULL)
{
Const *workerLimitConst = (Const *) workerLimitNode;
Const *workerOffsetConst = (Const *) originalOpNode->limitOffset;
int64 workerLimitCount = DatumGetInt64(workerLimitConst->constvalue);
int64 workerOffsetCount = DatumGetInt64(workerOffsetConst->constvalue);
workerLimitCount = workerLimitCount + workerOffsetCount;
workerLimitNode = (Node *) MakeIntegerConstInt64(workerLimitCount);
} }
/* display debug message on limit push down */ /* display debug message on limit push down */
if (workerLimitCount != NULL) if (workerLimitNode != NULL)
{ {
Const *workerLimitConst = (Const *) workerLimitCount; Const *workerLimitConst = (Const *) workerLimitNode;
int64 workerLimitCountInt64 = DatumGetInt64(workerLimitConst->constvalue); int64 workerLimitCount = DatumGetInt64(workerLimitConst->constvalue);
ereport(DEBUG1, (errmsg("push down of limit count: " INT64_FORMAT, ereport(DEBUG1, (errmsg("push down of limit count: " INT64_FORMAT,
workerLimitCountInt64))); workerLimitCount)));
} }
return workerLimitCount; return workerLimitNode;
} }

View File

@ -382,12 +382,6 @@ ErrorIfQueryNotSupported(Query *queryTree)
errorDetail = "Window functions are currently unsupported"; errorDetail = "Window functions are currently unsupported";
} }
if (queryTree->limitOffset)
{
preconditionsSatisfied = false;
errorDetail = "Limit Offset clause is currently unsupported";
}
if (queryTree->setOperations) if (queryTree->setOperations)
{ {
preconditionsSatisfied = false; preconditionsSatisfied = false;
@ -562,6 +556,12 @@ ErrorIfSubqueryNotSupported(Query *subqueryTree)
errorDetail = "Subqueries with limit are not supported yet"; errorDetail = "Subqueries with limit are not supported yet";
} }
if (subqueryTree->limitOffset != NULL)
{
preconditionsSatisfied = false;
errorDetail = "Subqueries with offset are not supported yet";
}
/* finally check and error out if not satisfied */ /* finally check and error out if not satisfied */
if (!preconditionsSatisfied) if (!preconditionsSatisfied)
{ {

View File

@ -246,7 +246,7 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName,
} }
/* (5) add a limit plan if needed */ /* (5) add a limit plan if needed */
if (masterQuery->limitCount) if (masterQuery->limitCount || masterQuery->limitOffset)
{ {
Node *limitCount = masterQuery->limitCount; Node *limitCount = masterQuery->limitCount;
Node *limitOffset = masterQuery->limitOffset; Node *limitOffset = masterQuery->limitOffset;

View File

@ -2,7 +2,6 @@
-- MULTI_BASIC_QUERIES -- MULTI_BASIC_QUERIES
-- --
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 440000; ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 440000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 440000;
-- Execute simple sum, average, and count queries on data recently uploaded to -- Execute simple sum, average, and count queries on data recently uploaded to
-- our partitioned table. -- our partitioned table.
SELECT count(*) FROM lineitem; SELECT count(*) FROM lineitem;

View File

@ -347,3 +347,115 @@ SELECT count(*) FROM lineitem JOIN orders ON l_orderkey = o_orderkey
SELECT count(*) FROM lineitem, orders WHERE l_orderkey + 1 = o_orderkey; SELECT count(*) FROM lineitem, orders WHERE l_orderkey + 1 = o_orderkey;
ERROR: cannot perform local joins that involve expressions ERROR: cannot perform local joins that involve expressions
DETAIL: local joins can be performed between columns only DETAIL: local joins can be performed between columns only
-- Check that we can issue limit/offset queries
-- OFFSET in subqueries are not supported
-- Error in the planner when subquery pushdown is off
SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq;
ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries with offset are not supported yet
SET citus.subquery_pushdown TO true;
-- Error in the optimizer when subquery pushdown is on
SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq;
ERROR: cannot push down this subquery
DETAIL: Offset clause is currently unsupported
SET citus.subquery_pushdown TO false;
-- Simple LIMIT/OFFSET with ORDER BY
SELECT o_orderkey FROM orders ORDER BY o_orderkey LIMIT 10 OFFSET 20;
o_orderkey
------------
69
70
71
96
97
98
99
100
101
102
(10 rows)
-- LIMIT/OFFSET with a subquery
SET client_min_messages TO 'debug1';
SET citus.task_executor_type TO 'task-tracker';
SELECT
customer_keys.o_custkey,
SUM(order_count) AS total_order_count
FROM
(SELECT o_custkey, o_orderstatus, COUNT(*) AS order_count
FROM orders GROUP BY o_custkey, o_orderstatus ) customer_keys
GROUP BY
customer_keys.o_custkey
ORDER BY
customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20;
DEBUG: push down of limit count: 30
DEBUG: building index "pg_toast_16953_index" on table "pg_toast_16953"
o_custkey | total_order_count
-----------+-------------------
1466 | 1
1465 | 2
1463 | 4
1462 | 10
1460 | 1
1459 | 6
1457 | 1
1456 | 3
1454 | 2
1453 | 5
(10 rows)
SET citus.task_executor_type TO 'real-time';
-- Ensure that we push down LIMIT and OFFSET properly
-- No Group-By -> Push Down
CREATE TEMP TABLE temp_limit_test_1 AS
SELECT o_custkey FROM orders LIMIT 10 OFFSET 15;
DEBUG: push down of limit count: 25
-- GROUP BY without ORDER BY -> No push-down
CREATE TEMP TABLE temp_limit_test_2 AS
SELECT o_custkey FROM orders GROUP BY o_custkey LIMIT 10 OFFSET 15;
-- GROUP BY and ORDER BY non-aggregate -> push-down
CREATE TEMP TABLE temp_limit_test_3 AS
SELECT o_custkey FROM orders GROUP BY o_custkey ORDER BY o_custkey LIMIT 10 OFFSET 15;
DEBUG: push down of limit count: 25
-- GROUP BY and ORDER BY aggregate -> No push-down
CREATE TEMP TABLE temp_limit_test_4 AS
SELECT o_custkey, COUNT(*) AS ccnt FROM orders GROUP BY o_custkey ORDER BY ccnt DESC LIMIT 10 OFFSET 15;
-- OFFSET without LIMIT
SELECT o_custkey FROM orders ORDER BY o_custkey OFFSET 2980;
o_custkey
-----------
1498
1499
1499
1499
(4 rows)
-- LIMIT/OFFSET with Joins
SELECT
li.l_partkey,
o.o_custkey,
li.l_quantity
FROM
lineitem li JOIN orders o ON li.l_orderkey = o.o_orderkey
WHERE
li.l_quantity > 25
ORDER BY
li.l_quantity
LIMIT 10 OFFSET 20;
DEBUG: push down of limit count: 30
l_partkey | o_custkey | l_quantity
-----------+-----------+------------
135912 | 509 | 26.00
75351 | 1261 | 26.00
199475 | 1052 | 26.00
91309 | 8 | 26.00
53624 | 400 | 26.00
182736 | 1048 | 26.00
59694 | 163 | 26.00
20481 | 173 | 26.00
78748 | 1499 | 26.00
7614 | 1397 | 26.00
(10 rows)
RESET client_min_messages;

View File

@ -4,7 +4,6 @@
-- This test checks that we simply emit an error message instead of trying to -- This test checks that we simply emit an error message instead of trying to
-- process a distributed unsupported SQL subquery. -- process a distributed unsupported SQL subquery.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1030000; ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1030000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1030000;
SELECT * FROM lineitem WHERE l_orderkey IN SELECT * FROM lineitem WHERE l_orderkey IN
(SELECT l_orderkey FROM lineitem WHERE l_quantity > 0); (SELECT l_orderkey FROM lineitem WHERE l_quantity > 0);
ERROR: cannot perform distributed planning on this query ERROR: cannot perform distributed planning on this query

View File

@ -4,7 +4,6 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 440000; ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 440000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 440000;
-- Execute simple sum, average, and count queries on data recently uploaded to -- Execute simple sum, average, and count queries on data recently uploaded to

View File

@ -158,3 +158,70 @@ SELECT count(*) FROM lineitem JOIN orders ON l_orderkey = o_orderkey
-- Check that we make sure local joins are between columns only. -- Check that we make sure local joins are between columns only.
SELECT count(*) FROM lineitem, orders WHERE l_orderkey + 1 = o_orderkey; SELECT count(*) FROM lineitem, orders WHERE l_orderkey + 1 = o_orderkey;
-- Check that we can issue limit/offset queries
-- OFFSET in subqueries are not supported
-- Error in the planner when subquery pushdown is off
SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq;
SET citus.subquery_pushdown TO true;
-- Error in the optimizer when subquery pushdown is on
SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq;
SET citus.subquery_pushdown TO false;
-- Simple LIMIT/OFFSET with ORDER BY
SELECT o_orderkey FROM orders ORDER BY o_orderkey LIMIT 10 OFFSET 20;
-- LIMIT/OFFSET with a subquery
SET client_min_messages TO 'debug1';
SET citus.task_executor_type TO 'task-tracker';
SELECT
customer_keys.o_custkey,
SUM(order_count) AS total_order_count
FROM
(SELECT o_custkey, o_orderstatus, COUNT(*) AS order_count
FROM orders GROUP BY o_custkey, o_orderstatus ) customer_keys
GROUP BY
customer_keys.o_custkey
ORDER BY
customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20;
SET citus.task_executor_type TO 'real-time';
-- Ensure that we push down LIMIT and OFFSET properly
-- No Group-By -> Push Down
CREATE TEMP TABLE temp_limit_test_1 AS
SELECT o_custkey FROM orders LIMIT 10 OFFSET 15;
-- GROUP BY without ORDER BY -> No push-down
CREATE TEMP TABLE temp_limit_test_2 AS
SELECT o_custkey FROM orders GROUP BY o_custkey LIMIT 10 OFFSET 15;
-- GROUP BY and ORDER BY non-aggregate -> push-down
CREATE TEMP TABLE temp_limit_test_3 AS
SELECT o_custkey FROM orders GROUP BY o_custkey ORDER BY o_custkey LIMIT 10 OFFSET 15;
-- GROUP BY and ORDER BY aggregate -> No push-down
CREATE TEMP TABLE temp_limit_test_4 AS
SELECT o_custkey, COUNT(*) AS ccnt FROM orders GROUP BY o_custkey ORDER BY ccnt DESC LIMIT 10 OFFSET 15;
-- OFFSET without LIMIT
SELECT o_custkey FROM orders ORDER BY o_custkey OFFSET 2980;
-- LIMIT/OFFSET with Joins
SELECT
li.l_partkey,
o.o_custkey,
li.l_quantity
FROM
lineitem li JOIN orders o ON li.l_orderkey = o.o_orderkey
WHERE
li.l_quantity > 25
ORDER BY
li.l_quantity
LIMIT 10 OFFSET 20;
RESET client_min_messages;

View File

@ -7,7 +7,6 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1030000; ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1030000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1030000;
SELECT * FROM lineitem WHERE l_orderkey IN SELECT * FROM lineitem WHERE l_orderkey IN