diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index b5442dbab..37c7f65e2 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -660,6 +660,16 @@ static void SetAttributeInputMetadata(DistributedExecution *execution, void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState) { + if (scanState->finishedPreScan) + { + /* + * Cursors (and hence RETURN QUERY syntax in pl/pgsql functions) + * may trigger AdaptiveExecutorPreExecutorRun() on every fetch + * operation. Though, we should only execute PreScan once. + */ + return; + } + DistributedPlan *distributedPlan = scanState->distributedPlan; /* @@ -670,6 +680,8 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState) LockPartitionsForDistributedPlan(distributedPlan); ExecuteSubPlans(distributedPlan); + + scanState->finishedPreScan = true; } diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index c1abeeeaa..4a59512f2 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -558,6 +558,9 @@ AdaptiveExecutorCreateScan(CustomScan *scan) scanState->customScanState.methods = &AdaptiveExecutorCustomExecMethods; scanState->PreExecScan = &CitusPreExecScan; + scanState->finishedPreScan = false; + scanState->finishedRemoteScan = false; + return (Node *) scanState; } @@ -578,6 +581,9 @@ NonPushableInsertSelectCreateScan(CustomScan *scan) scanState->customScanState.methods = &NonPushableInsertSelectCustomExecMethods; + scanState->finishedPreScan = false; + scanState->finishedRemoteScan = false; + return (Node *) scanState; } diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index e6d5389f7..92301fceb 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -20,6 +20,7 @@ typedef struct CitusScanState CustomScanState customScanState; /* underlying custom scan node */ /* function that gets called before postgres starts its execution */ + bool finishedPreScan; /* flag to check if the pre scan is finished */ void (*PreExecScan)(struct CitusScanState *scanState); DistributedPlan *distributedPlan; /* distributed execution plan */ diff --git a/src/test/regress/expected/cursors.out b/src/test/regress/expected/cursors.out new file mode 100644 index 000000000..c95322668 --- /dev/null +++ b/src/test/regress/expected/cursors.out @@ -0,0 +1,218 @@ +CREATE SCHEMA cursors; +SET search_path TO cursors; +CREATE TABLE distributed_table (key int, value text); +SELECT create_distributed_table('distributed_table', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- load some data, but not very small amounts because RETURN QUERY in plpgsql +-- hard codes the cursor fetch to 50 rows on PG 12, though they might increase +-- it sometime in the future, so be mindful +INSERT INTO distributed_table SELECT i % 10, i::text FROM generate_series(0, 1000) i; +CREATE OR REPLACE FUNCTION simple_cursor_on_dist_table(cursor_name refcursor) RETURNS refcursor AS ' +BEGIN + OPEN $1 FOR SELECT DISTINCT key FROM distributed_table ORDER BY 1; + RETURN $1; +END; +' LANGUAGE plpgsql; +CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table(cursor_name refcursor) RETURNS refcursor AS ' +BEGIN + OPEN $1 FOR + WITH cte_1 AS (SELECT * FROM distributed_table OFFSET 0) + SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1; + RETURN $1; +END; +' LANGUAGE plpgsql; +CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table_with_param(cursor_name refcursor, filter text) RETURNS refcursor AS ' +BEGIN + OPEN $1 FOR + WITH cte_1 AS (SELECT * FROM distributed_table WHERE value < $2 OFFSET 0) + SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1; + RETURN $1; +END; +' LANGUAGE plpgsql; +-- pretty basic query with cursors +-- Citus should plan/execute once and pull +-- the results to coordinator, then serve it +-- from the coordinator +BEGIN; + SELECT simple_cursor_on_dist_table('cursor_1'); + simple_cursor_on_dist_table +--------------------------------------------------------------------- + cursor_1 +(1 row) + + SET LOCAL citus.log_intermediate_results TO ON; + SET LOCAL client_min_messages TO DEBUG1; + FETCH 5 IN cursor_1; + key +--------------------------------------------------------------------- + 0 + 1 + 2 + 3 + 4 +(5 rows) + + FETCH 50 IN cursor_1; + key +--------------------------------------------------------------------- + 5 + 6 + 7 + 8 + 9 +(5 rows) + + FETCH ALL IN cursor_1; + key +--------------------------------------------------------------------- +(0 rows) + +COMMIT; +BEGIN; + SELECT cursor_with_intermediate_result_on_dist_table('cursor_1'); + cursor_with_intermediate_result_on_dist_table +--------------------------------------------------------------------- + cursor_1 +(1 row) + + -- multiple FETCH commands should not trigger re-running the subplans + SET LOCAL citus.log_intermediate_results TO ON; + SET LOCAL client_min_messages TO DEBUG1; + FETCH 5 IN cursor_1; +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + key +--------------------------------------------------------------------- + 0 + 1 + 2 + 3 + 4 +(5 rows) + + FETCH 1 IN cursor_1; + key +--------------------------------------------------------------------- + 5 +(1 row) + + FETCH ALL IN cursor_1; + key +--------------------------------------------------------------------- + 6 + 7 + 8 + 9 +(4 rows) + + FETCH 5 IN cursor_1; + key +--------------------------------------------------------------------- +(0 rows) + +COMMIT; +BEGIN; + SELECT cursor_with_intermediate_result_on_dist_table_with_param('cursor_1', '600'); + cursor_with_intermediate_result_on_dist_table_with_param +--------------------------------------------------------------------- + cursor_1 +(1 row) + + -- multiple FETCH commands should not trigger re-running the subplans + -- also test with parameters + SET LOCAL citus.log_intermediate_results TO ON; + SET LOCAL client_min_messages TO DEBUG1; + FETCH 1 IN cursor_1; +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + key +--------------------------------------------------------------------- + 0 +(1 row) + + FETCH 1 IN cursor_1; + key +--------------------------------------------------------------------- + 1 +(1 row) + + FETCH 1 IN cursor_1; + key +--------------------------------------------------------------------- + 2 +(1 row) + + FETCH 1 IN cursor_1; + key +--------------------------------------------------------------------- + 3 +(1 row) + + FETCH 1 IN cursor_1; + key +--------------------------------------------------------------------- + 4 +(1 row) + + FETCH 1 IN cursor_1; + key +--------------------------------------------------------------------- + 5 +(1 row) + + FETCH ALL IN cursor_1; + key +--------------------------------------------------------------------- + 6 + 7 + 8 + 9 +(4 rows) + +COMMIT; + CREATE OR REPLACE FUNCTION value_counter() RETURNS TABLE(counter text) LANGUAGE PLPGSQL AS $function$ + BEGIN + return query +WITH cte AS + (SELECT dt.value + FROM distributed_table dt + WHERE dt.value in + (SELECT value + FROM distributed_table p + GROUP BY p.value + HAVING count(*) > 0)) + + SELECT * FROM cte; +END; +$function$ ; +SET citus.log_intermediate_results TO ON; +SET client_min_messages TO DEBUG1; +\set VERBOSITY terse +SELECT count(*) from (SELECT value_counter()) as foo; +DEBUG: CTE cte is going to be inlined via distributed planning +DEBUG: generating subplan XXX_1 for subquery SELECT value FROM cursors.distributed_table p GROUP BY value HAVING (count(*) OPERATOR(pg_catalog.>) 0) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT value FROM (SELECT dt.value FROM cursors.distributed_table dt WHERE (dt.value OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)))) cte +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- + 1001 +(1 row) + +BEGIN; + SELECT count(*) from (SELECT value_counter()) as foo; +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- + 1001 +(1 row) + +COMMIT; +-- suppress NOTICEs +SET client_min_messages TO ERROR; +DROP SCHEMA cursors CASCADE; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 4f6ed8ee4..241a4fd3e 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -108,7 +108,7 @@ test: ch_bench_subquery_repartition test: multi_agg_type_conversion multi_count_type_conversion test: multi_partition_pruning single_hash_repartition_join test: multi_join_pruning multi_hash_pruning intermediate_result_pruning -test: multi_null_minmax_value_pruning +test: multi_null_minmax_value_pruning cursors test: multi_query_directory_cleanup test: multi_task_assignment_policy multi_cross_shard test: multi_utility_statements diff --git a/src/test/regress/sql/cursors.sql b/src/test/regress/sql/cursors.sql new file mode 100644 index 000000000..5e2a4cca4 --- /dev/null +++ b/src/test/regress/sql/cursors.sql @@ -0,0 +1,112 @@ +CREATE SCHEMA cursors; +SET search_path TO cursors; + +CREATE TABLE distributed_table (key int, value text); +SELECT create_distributed_table('distributed_table', 'key'); + + +-- load some data, but not very small amounts because RETURN QUERY in plpgsql +-- hard codes the cursor fetch to 50 rows on PG 12, though they might increase +-- it sometime in the future, so be mindful +INSERT INTO distributed_table SELECT i % 10, i::text FROM generate_series(0, 1000) i; + + +CREATE OR REPLACE FUNCTION simple_cursor_on_dist_table(cursor_name refcursor) RETURNS refcursor AS ' +BEGIN + OPEN $1 FOR SELECT DISTINCT key FROM distributed_table ORDER BY 1; + RETURN $1; +END; +' LANGUAGE plpgsql; + + +CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table(cursor_name refcursor) RETURNS refcursor AS ' +BEGIN + OPEN $1 FOR + WITH cte_1 AS (SELECT * FROM distributed_table OFFSET 0) + SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1; + RETURN $1; +END; +' LANGUAGE plpgsql; + + +CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table_with_param(cursor_name refcursor, filter text) RETURNS refcursor AS ' +BEGIN + OPEN $1 FOR + WITH cte_1 AS (SELECT * FROM distributed_table WHERE value < $2 OFFSET 0) + SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1; + RETURN $1; +END; +' LANGUAGE plpgsql; + + +-- pretty basic query with cursors +-- Citus should plan/execute once and pull +-- the results to coordinator, then serve it +-- from the coordinator +BEGIN; + SELECT simple_cursor_on_dist_table('cursor_1'); + SET LOCAL citus.log_intermediate_results TO ON; + SET LOCAL client_min_messages TO DEBUG1; + FETCH 5 IN cursor_1; + FETCH 50 IN cursor_1; + FETCH ALL IN cursor_1; +COMMIT; + + +BEGIN; + SELECT cursor_with_intermediate_result_on_dist_table('cursor_1'); + + -- multiple FETCH commands should not trigger re-running the subplans + SET LOCAL citus.log_intermediate_results TO ON; + SET LOCAL client_min_messages TO DEBUG1; + FETCH 5 IN cursor_1; + FETCH 1 IN cursor_1; + FETCH ALL IN cursor_1; + FETCH 5 IN cursor_1; +COMMIT; + + +BEGIN; + SELECT cursor_with_intermediate_result_on_dist_table_with_param('cursor_1', '600'); + + -- multiple FETCH commands should not trigger re-running the subplans + -- also test with parameters + SET LOCAL citus.log_intermediate_results TO ON; + SET LOCAL client_min_messages TO DEBUG1; + FETCH 1 IN cursor_1; + FETCH 1 IN cursor_1; + FETCH 1 IN cursor_1; + FETCH 1 IN cursor_1; + FETCH 1 IN cursor_1; + FETCH 1 IN cursor_1; + FETCH ALL IN cursor_1; + +COMMIT; + + CREATE OR REPLACE FUNCTION value_counter() RETURNS TABLE(counter text) LANGUAGE PLPGSQL AS $function$ + BEGIN + return query +WITH cte AS + (SELECT dt.value + FROM distributed_table dt + WHERE dt.value in + (SELECT value + FROM distributed_table p + GROUP BY p.value + HAVING count(*) > 0)) + + SELECT * FROM cte; +END; +$function$ ; + +SET citus.log_intermediate_results TO ON; +SET client_min_messages TO DEBUG1; +\set VERBOSITY terse +SELECT count(*) from (SELECT value_counter()) as foo; +BEGIN; + SELECT count(*) from (SELECT value_counter()) as foo; +COMMIT; + +-- suppress NOTICEs +SET client_min_messages TO ERROR; +DROP SCHEMA cursors CASCADE;