mirror of https://github.com/citusdata/citus.git
Fix task-tracker ignoring LIMIT/OFFSET with subqueries which pass single relation repartition test
parent
004bfca7b2
commit
176396460a
|
@ -579,6 +579,7 @@ TaskTrackerCreateScan(CustomScan *scan)
|
||||||
scanState->distributedPlan = GetDistributedPlan(scan);
|
scanState->distributedPlan = GetDistributedPlan(scan);
|
||||||
|
|
||||||
scanState->customScanState.methods = &TaskTrackerCustomExecMethods;
|
scanState->customScanState.methods = &TaskTrackerCustomExecMethods;
|
||||||
|
scanState->PreExecScan = &CitusPreExecScan;
|
||||||
|
|
||||||
return (Node *) scanState;
|
return (Node *) scanState;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2894,7 +2894,6 @@ TaskTrackerExecScan(CustomScanState *node)
|
||||||
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
||||||
Job *workerJob = distributedPlan->workerJob;
|
Job *workerJob = distributedPlan->workerJob;
|
||||||
Query *jobQuery = workerJob->jobQuery;
|
Query *jobQuery = workerJob->jobQuery;
|
||||||
elog(WARNING, "query %s", nodeToString(distributedPlan->masterQuery));
|
|
||||||
|
|
||||||
ErrorIfTransactionAccessedPlacementsLocally();
|
ErrorIfTransactionAccessedPlacementsLocally();
|
||||||
DisableLocalExecution();
|
DisableLocalExecution();
|
||||||
|
|
|
@ -258,14 +258,8 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
planContext.plan = standard_planner(planContext.query,
|
planContext.plan = standard_planner(planContext.query,
|
||||||
planContext.cursorOptions,
|
planContext.cursorOptions,
|
||||||
planContext.boundParams);
|
planContext.boundParams);
|
||||||
|
|
||||||
if (needsDistributedPlanning)
|
if (needsDistributedPlanning)
|
||||||
{
|
{
|
||||||
/*
|
|
||||||
* standard_planner rewrites simple queries like 'select 10' to PARAM_EXEC nodes,
|
|
||||||
* which we're unable to handle. Meanwhile we only optimize rewrites to Const.
|
|
||||||
* So deoptimize non-Const LIMIT/OFFSET, standard_planner will handle it again later.
|
|
||||||
*/
|
|
||||||
result = PlanDistributedStmt(&planContext, rteIdCounter);
|
result = PlanDistributedStmt(&planContext, rteIdCounter);
|
||||||
}
|
}
|
||||||
else if ((result = TryToDelegateFunctionCall(&planContext)) == NULL)
|
else if ((result = TryToDelegateFunctionCall(&planContext)) == NULL)
|
||||||
|
|
|
@ -331,6 +331,16 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context
|
||||||
RecursivelyPlanAllSubqueries(query->havingQual, context);
|
RecursivelyPlanAllSubqueries(query->havingQual, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (query->limitCount != NULL)
|
||||||
|
{
|
||||||
|
RecursivelyPlanAllSubqueries(query->limitCount, context);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (query->limitOffset != NULL)
|
||||||
|
{
|
||||||
|
RecursivelyPlanAllSubqueries(query->limitOffset, context);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If the query doesn't have distribution key equality,
|
* If the query doesn't have distribution key equality,
|
||||||
* recursively plan some of its subqueries.
|
* recursively plan some of its subqueries.
|
||||||
|
|
|
@ -552,8 +552,14 @@ SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT 10 OFFSET my_limi
|
||||||
(10 rows)
|
(10 rows)
|
||||||
|
|
||||||
DROP FUNCTION my_limit();
|
DROP FUNCTION my_limit();
|
||||||
-- subqueries should error out
|
SET citus.task_executor_type TO 'task-tracker';
|
||||||
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT min(l_linenumber) FROM lineitem_hash) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash);
|
WITH cte AS (SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4) SELECT * FROM cte ORDER BY 1 LIMIT (SELECT min(l_linenumber) FROM lineitem);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT min(l_linenumber) FROM lineitem) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash);
|
||||||
l_orderkey
|
l_orderkey
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
8997
|
8997
|
||||||
|
@ -589,4 +595,66 @@ SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT 10 OFFSET (SELECT
|
||||||
7
|
7
|
||||||
(10 rows)
|
(10 rows)
|
||||||
|
|
||||||
DROP TABLE lineitem_hash;
|
RESET citus.task_executor_type;
|
||||||
|
WITH cte AS (SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4) SELECT * FROM cte ORDER BY 1 LIMIT (SELECT min(l_linenumber) FROM lineitem);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT min(l_linenumber) FROM lineitem) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash);
|
||||||
|
l_orderkey
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
8997
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT 10);
|
||||||
|
l_orderkey
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
1
|
||||||
|
1
|
||||||
|
1
|
||||||
|
1
|
||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
||||||
|
3
|
||||||
|
3
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
|
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT 10 OFFSET (SELECT 10);
|
||||||
|
l_orderkey
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
3
|
||||||
|
3
|
||||||
|
3
|
||||||
|
4
|
||||||
|
5
|
||||||
|
5
|
||||||
|
5
|
||||||
|
6
|
||||||
|
7
|
||||||
|
7
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
|
-- test insert/select executor with recursively planned subquery in LIMIT
|
||||||
|
CREATE TABLE insertselect_test (id int, key int);
|
||||||
|
SELECT create_distributed_table('insertselect_test', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO insertselect_test
|
||||||
|
SELECT l_orderkey, l_linenumber
|
||||||
|
FROM lineitem_hash
|
||||||
|
ORDER BY 1, 2
|
||||||
|
LIMIT (SELECT min(l_linenumber) FROM lineitem) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash);
|
||||||
|
SELECT * FROM insertselect_test;
|
||||||
|
id | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
8997 | 2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE lineitem_hash, insertselect_test;
|
||||||
|
|
|
@ -236,17 +236,25 @@ SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT 10 OFFSET my_limi
|
||||||
|
|
||||||
DROP FUNCTION my_limit();
|
DROP FUNCTION my_limit();
|
||||||
|
|
||||||
-- subqueries should error out
|
SET citus.task_executor_type TO 'task-tracker';
|
||||||
select count(*) from lineitem;
|
WITH cte AS (SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4) SELECT * FROM cte ORDER BY 1 LIMIT (SELECT min(l_linenumber) FROM lineitem);
|
||||||
set citus.task_executor_type to 'task-tracker';
|
|
||||||
set client_min_messages to debug4;
|
|
||||||
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT min(l_linenumber) FROM lineitem) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash);
|
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT min(l_linenumber) FROM lineitem) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash);
|
||||||
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT 10);
|
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT 10);
|
||||||
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT 10 OFFSET (SELECT 10);
|
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT 10 OFFSET (SELECT 10);
|
||||||
reset client_min_messages;
|
RESET citus.task_executor_type;
|
||||||
reset citus.task_executor_type;
|
WITH cte AS (SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4) SELECT * FROM cte ORDER BY 1 LIMIT (SELECT min(l_linenumber) FROM lineitem);
|
||||||
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT min(l_linenumber) FROM lineitem) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash);
|
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT min(l_linenumber) FROM lineitem) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash);
|
||||||
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT 10);
|
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT 10);
|
||||||
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT 10 OFFSET (SELECT 10);
|
SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT 10 OFFSET (SELECT 10);
|
||||||
|
|
||||||
DROP TABLE lineitem_hash;
|
-- test insert/select executor with recursively planned subquery in LIMIT
|
||||||
|
CREATE TABLE insertselect_test (id int, key int);
|
||||||
|
SELECT create_distributed_table('insertselect_test', 'id');
|
||||||
|
INSERT INTO insertselect_test
|
||||||
|
SELECT l_orderkey, l_linenumber
|
||||||
|
FROM lineitem_hash
|
||||||
|
ORDER BY 1, 2
|
||||||
|
LIMIT (SELECT min(l_linenumber) FROM lineitem) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash);
|
||||||
|
SELECT * FROM insertselect_test;
|
||||||
|
|
||||||
|
DROP TABLE lineitem_hash, insertselect_test;
|
||||||
|
|
Loading…
Reference in New Issue