From 176396460a3ba763b0fc09597005a6453724f3bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 14 Apr 2020 01:53:28 +0000 Subject: [PATCH] Fix task-tracker ignoring LIMIT/OFFSET with subqueries which pass single relation repartition test --- .../distributed/executor/citus_custom_scan.c | 1 + .../executor/multi_task_tracker_executor.c | 1 - .../distributed/planner/distributed_planner.c | 6 -- .../distributed/planner/recursive_planning.c | 10 +++ .../regress/expected/multi_limit_clause.out | 74 ++++++++++++++++++- src/test/regress/sql/multi_limit_clause.sql | 22 ++++-- 6 files changed, 97 insertions(+), 17 deletions(-) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 947c8778f..fca63860e 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -579,6 +579,7 @@ TaskTrackerCreateScan(CustomScan *scan) scanState->distributedPlan = GetDistributedPlan(scan); scanState->customScanState.methods = &TaskTrackerCustomExecMethods; + scanState->PreExecScan = &CitusPreExecScan; return (Node *) scanState; } diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 1421b1ab9..4ece5b34d 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -2894,7 +2894,6 @@ TaskTrackerExecScan(CustomScanState *node) DistributedPlan *distributedPlan = scanState->distributedPlan; Job *workerJob = distributedPlan->workerJob; Query *jobQuery = workerJob->jobQuery; - elog(WARNING, "query %s", nodeToString(distributedPlan->masterQuery)); ErrorIfTransactionAccessedPlacementsLocally(); DisableLocalExecution(); diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index dfd1cd5f6..e5dfe1cc8 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -258,14 +258,8 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) planContext.plan = standard_planner(planContext.query, planContext.cursorOptions, planContext.boundParams); - if (needsDistributedPlanning) { - /* - * standard_planner rewrites simple queries like 'select 10' to PARAM_EXEC nodes, - * which we're unable to handle. Meanwhile we only optimize rewrites to Const. - * So deoptimize non-Const LIMIT/OFFSET, standard_planner will handle it again later. - */ result = PlanDistributedStmt(&planContext, rteIdCounter); } else if ((result = TryToDelegateFunctionCall(&planContext)) == NULL) diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index d673b1545..701a52c6f 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -331,6 +331,16 @@ RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context RecursivelyPlanAllSubqueries(query->havingQual, context); } + if (query->limitCount != NULL) + { + RecursivelyPlanAllSubqueries(query->limitCount, context); + } + + if (query->limitOffset != NULL) + { + RecursivelyPlanAllSubqueries(query->limitOffset, context); + } + /* * If the query doesn't have distribution key equality, * recursively plan some of its subqueries. diff --git a/src/test/regress/expected/multi_limit_clause.out b/src/test/regress/expected/multi_limit_clause.out index 121f16992..4159743b5 100644 --- a/src/test/regress/expected/multi_limit_clause.out +++ b/src/test/regress/expected/multi_limit_clause.out @@ -552,8 +552,14 @@ SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT 10 OFFSET my_limi (10 rows) DROP FUNCTION my_limit(); --- subqueries should error out -SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT min(l_linenumber) FROM lineitem_hash) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash); +SET citus.task_executor_type TO 'task-tracker'; +WITH cte AS (SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4) SELECT * FROM cte ORDER BY 1 LIMIT (SELECT min(l_linenumber) FROM lineitem); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT min(l_linenumber) FROM lineitem) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash); l_orderkey --------------------------------------------------------------------- 8997 @@ -589,4 +595,66 @@ SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT 10 OFFSET (SELECT 7 (10 rows) -DROP TABLE lineitem_hash; +RESET citus.task_executor_type; +WITH cte AS (SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4) SELECT * FROM cte ORDER BY 1 LIMIT (SELECT min(l_linenumber) FROM lineitem); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT min(l_linenumber) FROM lineitem) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash); + l_orderkey +--------------------------------------------------------------------- + 8997 +(1 row) + +SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT 10); + l_orderkey +--------------------------------------------------------------------- + 1 + 1 + 1 + 1 + 1 + 1 + 2 + 3 + 3 + 3 +(10 rows) + +SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT 10 OFFSET (SELECT 10); + l_orderkey +--------------------------------------------------------------------- + 3 + 3 + 3 + 4 + 5 + 5 + 5 + 6 + 7 + 7 +(10 rows) + +-- test insert/select executor with recursively planned subquery in LIMIT +CREATE TABLE insertselect_test (id int, key int); +SELECT create_distributed_table('insertselect_test', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO insertselect_test +SELECT l_orderkey, l_linenumber +FROM lineitem_hash +ORDER BY 1, 2 +LIMIT (SELECT min(l_linenumber) FROM lineitem) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash); +SELECT * FROM insertselect_test; + id | key +--------------------------------------------------------------------- + 8997 | 2 +(1 row) + +DROP TABLE lineitem_hash, insertselect_test; diff --git a/src/test/regress/sql/multi_limit_clause.sql b/src/test/regress/sql/multi_limit_clause.sql index 7226ccefa..b55218da5 100644 --- a/src/test/regress/sql/multi_limit_clause.sql +++ b/src/test/regress/sql/multi_limit_clause.sql @@ -236,17 +236,25 @@ SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT 10 OFFSET my_limi DROP FUNCTION my_limit(); --- subqueries should error out -select count(*) from lineitem; -set citus.task_executor_type to 'task-tracker'; -set client_min_messages to debug4; +SET citus.task_executor_type TO 'task-tracker'; +WITH cte AS (SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4) SELECT * FROM cte ORDER BY 1 LIMIT (SELECT min(l_linenumber) FROM lineitem); SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT min(l_linenumber) FROM lineitem) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash); SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT 10); SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT 10 OFFSET (SELECT 10); -reset client_min_messages; -reset citus.task_executor_type; +RESET citus.task_executor_type; +WITH cte AS (SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4) SELECT * FROM cte ORDER BY 1 LIMIT (SELECT min(l_linenumber) FROM lineitem); SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT min(l_linenumber) FROM lineitem) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash); SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT (SELECT 10); SELECT l_orderkey FROM lineitem_hash ORDER BY l_orderkey LIMIT 10 OFFSET (SELECT 10); -DROP TABLE lineitem_hash; +-- test insert/select executor with recursively planned subquery in LIMIT +CREATE TABLE insertselect_test (id int, key int); +SELECT create_distributed_table('insertselect_test', 'id'); +INSERT INTO insertselect_test +SELECT l_orderkey, l_linenumber +FROM lineitem_hash +ORDER BY 1, 2 +LIMIT (SELECT min(l_linenumber) FROM lineitem) OFFSET (SELECT (count(*)/2)::int FROM lineitem_hash); +SELECT * FROM insertselect_test; + +DROP TABLE lineitem_hash, insertselect_test;