From c433c66f2bc9a7dce27a50841a1e65b1704af84a Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 20 Nov 2020 10:29:59 +0100 Subject: [PATCH] Do not execute subplans multiple times with cursors Before this commit, we let AdaptiveExecutorPreExecutorRun() to be effective multiple times on every FETCH on cursors. That does not affect the correctness of the query results, but adds significant overhead. --- .../distributed/executor/adaptive_executor.c | 12 + .../distributed/executor/citus_custom_scan.c | 6 + src/include/distributed/citus_custom_scan.h | 1 + src/test/regress/expected/cursors.out | 218 ++++++++++++++++++ src/test/regress/multi_schedule | 2 +- src/test/regress/sql/cursors.sql | 112 +++++++++ 6 files changed, 350 insertions(+), 1 deletion(-) create mode 100644 src/test/regress/expected/cursors.out create mode 100644 src/test/regress/sql/cursors.sql diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index b5442dbab..37c7f65e2 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -660,6 +660,16 @@ static void SetAttributeInputMetadata(DistributedExecution *execution, void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState) { + if (scanState->finishedPreScan) + { + /* + * Cursors (and hence RETURN QUERY syntax in pl/pgsql functions) + * may trigger AdaptiveExecutorPreExecutorRun() on every fetch + * operation. Though, we should only execute PreScan once. + */ + return; + } + DistributedPlan *distributedPlan = scanState->distributedPlan; /* @@ -670,6 +680,8 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState) LockPartitionsForDistributedPlan(distributedPlan); ExecuteSubPlans(distributedPlan); + + scanState->finishedPreScan = true; } diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index c1abeeeaa..4a59512f2 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -558,6 +558,9 @@ AdaptiveExecutorCreateScan(CustomScan *scan) scanState->customScanState.methods = &AdaptiveExecutorCustomExecMethods; scanState->PreExecScan = &CitusPreExecScan; + scanState->finishedPreScan = false; + scanState->finishedRemoteScan = false; + return (Node *) scanState; } @@ -578,6 +581,9 @@ NonPushableInsertSelectCreateScan(CustomScan *scan) scanState->customScanState.methods = &NonPushableInsertSelectCustomExecMethods; + scanState->finishedPreScan = false; + scanState->finishedRemoteScan = false; + return (Node *) scanState; } diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index e6d5389f7..92301fceb 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -20,6 +20,7 @@ typedef struct CitusScanState CustomScanState customScanState; /* underlying custom scan node */ /* function that gets called before postgres starts its execution */ + bool finishedPreScan; /* flag to check if the pre scan is finished */ void (*PreExecScan)(struct CitusScanState *scanState); DistributedPlan *distributedPlan; /* distributed execution plan */ diff --git a/src/test/regress/expected/cursors.out b/src/test/regress/expected/cursors.out new file mode 100644 index 000000000..c95322668 --- /dev/null +++ b/src/test/regress/expected/cursors.out @@ -0,0 +1,218 @@ +CREATE SCHEMA cursors; +SET search_path TO cursors; +CREATE TABLE distributed_table (key int, value text); +SELECT create_distributed_table('distributed_table', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- load some data, but not very small amounts because RETURN QUERY in plpgsql +-- hard codes the cursor fetch to 50 rows on PG 12, though they might increase +-- it sometime in the future, so be mindful +INSERT INTO distributed_table SELECT i % 10, i::text FROM generate_series(0, 1000) i; +CREATE OR REPLACE FUNCTION simple_cursor_on_dist_table(cursor_name refcursor) RETURNS refcursor AS ' +BEGIN + OPEN $1 FOR SELECT DISTINCT key FROM distributed_table ORDER BY 1; + RETURN $1; +END; +' LANGUAGE plpgsql; +CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table(cursor_name refcursor) RETURNS refcursor AS ' +BEGIN + OPEN $1 FOR + WITH cte_1 AS (SELECT * FROM distributed_table OFFSET 0) + SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1; + RETURN $1; +END; +' LANGUAGE plpgsql; +CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table_with_param(cursor_name refcursor, filter text) RETURNS refcursor AS ' +BEGIN + OPEN $1 FOR + WITH cte_1 AS (SELECT * FROM distributed_table WHERE value < $2 OFFSET 0) + SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1; + RETURN $1; +END; +' LANGUAGE plpgsql; +-- pretty basic query with cursors +-- Citus should plan/execute once and pull +-- the results to coordinator, then serve it +-- from the coordinator +BEGIN; + SELECT simple_cursor_on_dist_table('cursor_1'); + simple_cursor_on_dist_table +--------------------------------------------------------------------- + cursor_1 +(1 row) + + SET LOCAL citus.log_intermediate_results TO ON; + SET LOCAL client_min_messages TO DEBUG1; + FETCH 5 IN cursor_1; + key +--------------------------------------------------------------------- + 0 + 1 + 2 + 3 + 4 +(5 rows) + + FETCH 50 IN cursor_1; + key +--------------------------------------------------------------------- + 5 + 6 + 7 + 8 + 9 +(5 rows) + + FETCH ALL IN cursor_1; + key +--------------------------------------------------------------------- +(0 rows) + +COMMIT; +BEGIN; + SELECT cursor_with_intermediate_result_on_dist_table('cursor_1'); + cursor_with_intermediate_result_on_dist_table +--------------------------------------------------------------------- + cursor_1 +(1 row) + + -- multiple FETCH commands should not trigger re-running the subplans + SET LOCAL citus.log_intermediate_results TO ON; + SET LOCAL client_min_messages TO DEBUG1; + FETCH 5 IN cursor_1; +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + key +--------------------------------------------------------------------- + 0 + 1 + 2 + 3 + 4 +(5 rows) + + FETCH 1 IN cursor_1; + key +--------------------------------------------------------------------- + 5 +(1 row) + + FETCH ALL IN cursor_1; + key +--------------------------------------------------------------------- + 6 + 7 + 8 + 9 +(4 rows) + + FETCH 5 IN cursor_1; + key +--------------------------------------------------------------------- +(0 rows) + +COMMIT; +BEGIN; + SELECT cursor_with_intermediate_result_on_dist_table_with_param('cursor_1', '600'); + cursor_with_intermediate_result_on_dist_table_with_param +--------------------------------------------------------------------- + cursor_1 +(1 row) + + -- multiple FETCH commands should not trigger re-running the subplans + -- also test with parameters + SET LOCAL citus.log_intermediate_results TO ON; + SET LOCAL client_min_messages TO DEBUG1; + FETCH 1 IN cursor_1; +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + key +--------------------------------------------------------------------- + 0 +(1 row) + + FETCH 1 IN cursor_1; + key +--------------------------------------------------------------------- + 1 +(1 row) + + FETCH 1 IN cursor_1; + key +--------------------------------------------------------------------- + 2 +(1 row) + + FETCH 1 IN cursor_1; + key +--------------------------------------------------------------------- + 3 +(1 row) + + FETCH 1 IN cursor_1; + key +--------------------------------------------------------------------- + 4 +(1 row) + + FETCH 1 IN cursor_1; + key +--------------------------------------------------------------------- + 5 +(1 row) + + FETCH ALL IN cursor_1; + key +--------------------------------------------------------------------- + 6 + 7 + 8 + 9 +(4 rows) + +COMMIT; + CREATE OR REPLACE FUNCTION value_counter() RETURNS TABLE(counter text) LANGUAGE PLPGSQL AS $function$ + BEGIN + return query +WITH cte AS + (SELECT dt.value + FROM distributed_table dt + WHERE dt.value in + (SELECT value + FROM distributed_table p + GROUP BY p.value + HAVING count(*) > 0)) + + SELECT * FROM cte; +END; +$function$ ; +SET citus.log_intermediate_results TO ON; +SET client_min_messages TO DEBUG1; +\set VERBOSITY terse +SELECT count(*) from (SELECT value_counter()) as foo; +DEBUG: CTE cte is going to be inlined via distributed planning +DEBUG: generating subplan XXX_1 for subquery SELECT value FROM cursors.distributed_table p GROUP BY value HAVING (count(*) OPERATOR(pg_catalog.>) 0) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT value FROM (SELECT dt.value FROM cursors.distributed_table dt WHERE (dt.value OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)))) cte +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- + 1001 +(1 row) + +BEGIN; + SELECT count(*) from (SELECT value_counter()) as foo; +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- + 1001 +(1 row) + +COMMIT; +-- suppress NOTICEs +SET client_min_messages TO ERROR; +DROP SCHEMA cursors CASCADE; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 4f6ed8ee4..241a4fd3e 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -108,7 +108,7 @@ test: ch_bench_subquery_repartition test: multi_agg_type_conversion multi_count_type_conversion test: multi_partition_pruning single_hash_repartition_join test: multi_join_pruning multi_hash_pruning intermediate_result_pruning -test: multi_null_minmax_value_pruning +test: multi_null_minmax_value_pruning cursors test: multi_query_directory_cleanup test: multi_task_assignment_policy multi_cross_shard test: multi_utility_statements diff --git a/src/test/regress/sql/cursors.sql b/src/test/regress/sql/cursors.sql new file mode 100644 index 000000000..5e2a4cca4 --- /dev/null +++ b/src/test/regress/sql/cursors.sql @@ -0,0 +1,112 @@ +CREATE SCHEMA cursors; +SET search_path TO cursors; + +CREATE TABLE distributed_table (key int, value text); +SELECT create_distributed_table('distributed_table', 'key'); + + +-- load some data, but not very small amounts because RETURN QUERY in plpgsql +-- hard codes the cursor fetch to 50 rows on PG 12, though they might increase +-- it sometime in the future, so be mindful +INSERT INTO distributed_table SELECT i % 10, i::text FROM generate_series(0, 1000) i; + + +CREATE OR REPLACE FUNCTION simple_cursor_on_dist_table(cursor_name refcursor) RETURNS refcursor AS ' +BEGIN + OPEN $1 FOR SELECT DISTINCT key FROM distributed_table ORDER BY 1; + RETURN $1; +END; +' LANGUAGE plpgsql; + + +CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table(cursor_name refcursor) RETURNS refcursor AS ' +BEGIN + OPEN $1 FOR + WITH cte_1 AS (SELECT * FROM distributed_table OFFSET 0) + SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1; + RETURN $1; +END; +' LANGUAGE plpgsql; + + +CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table_with_param(cursor_name refcursor, filter text) RETURNS refcursor AS ' +BEGIN + OPEN $1 FOR + WITH cte_1 AS (SELECT * FROM distributed_table WHERE value < $2 OFFSET 0) + SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1; + RETURN $1; +END; +' LANGUAGE plpgsql; + + +-- pretty basic query with cursors +-- Citus should plan/execute once and pull +-- the results to coordinator, then serve it +-- from the coordinator +BEGIN; + SELECT simple_cursor_on_dist_table('cursor_1'); + SET LOCAL citus.log_intermediate_results TO ON; + SET LOCAL client_min_messages TO DEBUG1; + FETCH 5 IN cursor_1; + FETCH 50 IN cursor_1; + FETCH ALL IN cursor_1; +COMMIT; + + +BEGIN; + SELECT cursor_with_intermediate_result_on_dist_table('cursor_1'); + + -- multiple FETCH commands should not trigger re-running the subplans + SET LOCAL citus.log_intermediate_results TO ON; + SET LOCAL client_min_messages TO DEBUG1; + FETCH 5 IN cursor_1; + FETCH 1 IN cursor_1; + FETCH ALL IN cursor_1; + FETCH 5 IN cursor_1; +COMMIT; + + +BEGIN; + SELECT cursor_with_intermediate_result_on_dist_table_with_param('cursor_1', '600'); + + -- multiple FETCH commands should not trigger re-running the subplans + -- also test with parameters + SET LOCAL citus.log_intermediate_results TO ON; + SET LOCAL client_min_messages TO DEBUG1; + FETCH 1 IN cursor_1; + FETCH 1 IN cursor_1; + FETCH 1 IN cursor_1; + FETCH 1 IN cursor_1; + FETCH 1 IN cursor_1; + FETCH 1 IN cursor_1; + FETCH ALL IN cursor_1; + +COMMIT; + + CREATE OR REPLACE FUNCTION value_counter() RETURNS TABLE(counter text) LANGUAGE PLPGSQL AS $function$ + BEGIN + return query +WITH cte AS + (SELECT dt.value + FROM distributed_table dt + WHERE dt.value in + (SELECT value + FROM distributed_table p + GROUP BY p.value + HAVING count(*) > 0)) + + SELECT * FROM cte; +END; +$function$ ; + +SET citus.log_intermediate_results TO ON; +SET client_min_messages TO DEBUG1; +\set VERBOSITY terse +SELECT count(*) from (SELECT value_counter()) as foo; +BEGIN; + SELECT count(*) from (SELECT value_counter()) as foo; +COMMIT; + +-- suppress NOTICEs +SET client_min_messages TO ERROR; +DROP SCHEMA cursors CASCADE;