mirror of https://github.com/citusdata/citus.git
Merge pull request #4331 from citusdata/pre_executor_run
Do not execute subplans multiple times with cursorspull/4334/head
commit
856e5c85cf
|
@ -660,6 +660,16 @@ static void SetAttributeInputMetadata(DistributedExecution *execution,
|
||||||
void
|
void
|
||||||
AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
|
AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
|
||||||
{
|
{
|
||||||
|
if (scanState->finishedPreScan)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Cursors (and hence RETURN QUERY syntax in pl/pgsql functions)
|
||||||
|
* may trigger AdaptiveExecutorPreExecutorRun() on every fetch
|
||||||
|
* operation. Though, we should only execute PreScan once.
|
||||||
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -670,6 +680,8 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
|
||||||
LockPartitionsForDistributedPlan(distributedPlan);
|
LockPartitionsForDistributedPlan(distributedPlan);
|
||||||
|
|
||||||
ExecuteSubPlans(distributedPlan);
|
ExecuteSubPlans(distributedPlan);
|
||||||
|
|
||||||
|
scanState->finishedPreScan = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -558,6 +558,9 @@ AdaptiveExecutorCreateScan(CustomScan *scan)
|
||||||
scanState->customScanState.methods = &AdaptiveExecutorCustomExecMethods;
|
scanState->customScanState.methods = &AdaptiveExecutorCustomExecMethods;
|
||||||
scanState->PreExecScan = &CitusPreExecScan;
|
scanState->PreExecScan = &CitusPreExecScan;
|
||||||
|
|
||||||
|
scanState->finishedPreScan = false;
|
||||||
|
scanState->finishedRemoteScan = false;
|
||||||
|
|
||||||
return (Node *) scanState;
|
return (Node *) scanState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -578,6 +581,9 @@ NonPushableInsertSelectCreateScan(CustomScan *scan)
|
||||||
scanState->customScanState.methods =
|
scanState->customScanState.methods =
|
||||||
&NonPushableInsertSelectCustomExecMethods;
|
&NonPushableInsertSelectCustomExecMethods;
|
||||||
|
|
||||||
|
scanState->finishedPreScan = false;
|
||||||
|
scanState->finishedRemoteScan = false;
|
||||||
|
|
||||||
return (Node *) scanState;
|
return (Node *) scanState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ typedef struct CitusScanState
|
||||||
CustomScanState customScanState; /* underlying custom scan node */
|
CustomScanState customScanState; /* underlying custom scan node */
|
||||||
|
|
||||||
/* function that gets called before postgres starts its execution */
|
/* function that gets called before postgres starts its execution */
|
||||||
|
bool finishedPreScan; /* flag to check if the pre scan is finished */
|
||||||
void (*PreExecScan)(struct CitusScanState *scanState);
|
void (*PreExecScan)(struct CitusScanState *scanState);
|
||||||
|
|
||||||
DistributedPlan *distributedPlan; /* distributed execution plan */
|
DistributedPlan *distributedPlan; /* distributed execution plan */
|
||||||
|
|
|
@ -0,0 +1,218 @@
|
||||||
|
CREATE SCHEMA cursors;
|
||||||
|
SET search_path TO cursors;
|
||||||
|
CREATE TABLE distributed_table (key int, value text);
|
||||||
|
SELECT create_distributed_table('distributed_table', 'key');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- load some data, but not very small amounts because RETURN QUERY in plpgsql
|
||||||
|
-- hard codes the cursor fetch to 50 rows on PG 12, though they might increase
|
||||||
|
-- it sometime in the future, so be mindful
|
||||||
|
INSERT INTO distributed_table SELECT i % 10, i::text FROM generate_series(0, 1000) i;
|
||||||
|
CREATE OR REPLACE FUNCTION simple_cursor_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
|
||||||
|
BEGIN
|
||||||
|
OPEN $1 FOR SELECT DISTINCT key FROM distributed_table ORDER BY 1;
|
||||||
|
RETURN $1;
|
||||||
|
END;
|
||||||
|
' LANGUAGE plpgsql;
|
||||||
|
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
|
||||||
|
BEGIN
|
||||||
|
OPEN $1 FOR
|
||||||
|
WITH cte_1 AS (SELECT * FROM distributed_table OFFSET 0)
|
||||||
|
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
|
||||||
|
RETURN $1;
|
||||||
|
END;
|
||||||
|
' LANGUAGE plpgsql;
|
||||||
|
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table_with_param(cursor_name refcursor, filter text) RETURNS refcursor AS '
|
||||||
|
BEGIN
|
||||||
|
OPEN $1 FOR
|
||||||
|
WITH cte_1 AS (SELECT * FROM distributed_table WHERE value < $2 OFFSET 0)
|
||||||
|
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
|
||||||
|
RETURN $1;
|
||||||
|
END;
|
||||||
|
' LANGUAGE plpgsql;
|
||||||
|
-- pretty basic query with cursors
|
||||||
|
-- Citus should plan/execute once and pull
|
||||||
|
-- the results to coordinator, then serve it
|
||||||
|
-- from the coordinator
|
||||||
|
BEGIN;
|
||||||
|
SELECT simple_cursor_on_dist_table('cursor_1');
|
||||||
|
simple_cursor_on_dist_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
cursor_1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET LOCAL citus.log_intermediate_results TO ON;
|
||||||
|
SET LOCAL client_min_messages TO DEBUG1;
|
||||||
|
FETCH 5 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
||||||
|
4
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
FETCH 50 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
5
|
||||||
|
6
|
||||||
|
7
|
||||||
|
8
|
||||||
|
9
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
FETCH ALL IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
BEGIN;
|
||||||
|
SELECT cursor_with_intermediate_result_on_dist_table('cursor_1');
|
||||||
|
cursor_with_intermediate_result_on_dist_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
cursor_1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- multiple FETCH commands should not trigger re-running the subplans
|
||||||
|
SET LOCAL citus.log_intermediate_results TO ON;
|
||||||
|
SET LOCAL client_min_messages TO DEBUG1;
|
||||||
|
FETCH 5 IN cursor_1;
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
||||||
|
4
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
FETCH ALL IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
6
|
||||||
|
7
|
||||||
|
8
|
||||||
|
9
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
FETCH 5 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
BEGIN;
|
||||||
|
SELECT cursor_with_intermediate_result_on_dist_table_with_param('cursor_1', '600');
|
||||||
|
cursor_with_intermediate_result_on_dist_table_with_param
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
cursor_1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- multiple FETCH commands should not trigger re-running the subplans
|
||||||
|
-- also test with parameters
|
||||||
|
SET LOCAL citus.log_intermediate_results TO ON;
|
||||||
|
SET LOCAL client_min_messages TO DEBUG1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
FETCH ALL IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
6
|
||||||
|
7
|
||||||
|
8
|
||||||
|
9
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
CREATE OR REPLACE FUNCTION value_counter() RETURNS TABLE(counter text) LANGUAGE PLPGSQL AS $function$
|
||||||
|
BEGIN
|
||||||
|
return query
|
||||||
|
WITH cte AS
|
||||||
|
(SELECT dt.value
|
||||||
|
FROM distributed_table dt
|
||||||
|
WHERE dt.value in
|
||||||
|
(SELECT value
|
||||||
|
FROM distributed_table p
|
||||||
|
GROUP BY p.value
|
||||||
|
HAVING count(*) > 0))
|
||||||
|
|
||||||
|
SELECT * FROM cte;
|
||||||
|
END;
|
||||||
|
$function$ ;
|
||||||
|
SET citus.log_intermediate_results TO ON;
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
\set VERBOSITY terse
|
||||||
|
SELECT count(*) from (SELECT value_counter()) as foo;
|
||||||
|
DEBUG: CTE cte is going to be inlined via distributed planning
|
||||||
|
DEBUG: generating subplan XXX_1 for subquery SELECT value FROM cursors.distributed_table p GROUP BY value HAVING (count(*) OPERATOR(pg_catalog.>) 0)
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT value FROM (SELECT dt.value FROM cursors.distributed_table dt WHERE (dt.value OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)))) cte
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1001
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) from (SELECT value_counter()) as foo;
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1001
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- suppress NOTICEs
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA cursors CASCADE;
|
|
@ -108,7 +108,7 @@ test: ch_bench_subquery_repartition
|
||||||
test: multi_agg_type_conversion multi_count_type_conversion
|
test: multi_agg_type_conversion multi_count_type_conversion
|
||||||
test: multi_partition_pruning single_hash_repartition_join
|
test: multi_partition_pruning single_hash_repartition_join
|
||||||
test: multi_join_pruning multi_hash_pruning intermediate_result_pruning
|
test: multi_join_pruning multi_hash_pruning intermediate_result_pruning
|
||||||
test: multi_null_minmax_value_pruning
|
test: multi_null_minmax_value_pruning cursors
|
||||||
test: multi_query_directory_cleanup
|
test: multi_query_directory_cleanup
|
||||||
test: multi_task_assignment_policy multi_cross_shard
|
test: multi_task_assignment_policy multi_cross_shard
|
||||||
test: multi_utility_statements
|
test: multi_utility_statements
|
||||||
|
|
|
@ -0,0 +1,112 @@
|
||||||
|
CREATE SCHEMA cursors;
|
||||||
|
SET search_path TO cursors;
|
||||||
|
|
||||||
|
CREATE TABLE distributed_table (key int, value text);
|
||||||
|
SELECT create_distributed_table('distributed_table', 'key');
|
||||||
|
|
||||||
|
|
||||||
|
-- load some data, but not very small amounts because RETURN QUERY in plpgsql
|
||||||
|
-- hard codes the cursor fetch to 50 rows on PG 12, though they might increase
|
||||||
|
-- it sometime in the future, so be mindful
|
||||||
|
INSERT INTO distributed_table SELECT i % 10, i::text FROM generate_series(0, 1000) i;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION simple_cursor_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
|
||||||
|
BEGIN
|
||||||
|
OPEN $1 FOR SELECT DISTINCT key FROM distributed_table ORDER BY 1;
|
||||||
|
RETURN $1;
|
||||||
|
END;
|
||||||
|
' LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
|
||||||
|
BEGIN
|
||||||
|
OPEN $1 FOR
|
||||||
|
WITH cte_1 AS (SELECT * FROM distributed_table OFFSET 0)
|
||||||
|
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
|
||||||
|
RETURN $1;
|
||||||
|
END;
|
||||||
|
' LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table_with_param(cursor_name refcursor, filter text) RETURNS refcursor AS '
|
||||||
|
BEGIN
|
||||||
|
OPEN $1 FOR
|
||||||
|
WITH cte_1 AS (SELECT * FROM distributed_table WHERE value < $2 OFFSET 0)
|
||||||
|
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
|
||||||
|
RETURN $1;
|
||||||
|
END;
|
||||||
|
' LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
|
||||||
|
-- pretty basic query with cursors
|
||||||
|
-- Citus should plan/execute once and pull
|
||||||
|
-- the results to coordinator, then serve it
|
||||||
|
-- from the coordinator
|
||||||
|
BEGIN;
|
||||||
|
SELECT simple_cursor_on_dist_table('cursor_1');
|
||||||
|
SET LOCAL citus.log_intermediate_results TO ON;
|
||||||
|
SET LOCAL client_min_messages TO DEBUG1;
|
||||||
|
FETCH 5 IN cursor_1;
|
||||||
|
FETCH 50 IN cursor_1;
|
||||||
|
FETCH ALL IN cursor_1;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT cursor_with_intermediate_result_on_dist_table('cursor_1');
|
||||||
|
|
||||||
|
-- multiple FETCH commands should not trigger re-running the subplans
|
||||||
|
SET LOCAL citus.log_intermediate_results TO ON;
|
||||||
|
SET LOCAL client_min_messages TO DEBUG1;
|
||||||
|
FETCH 5 IN cursor_1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
FETCH ALL IN cursor_1;
|
||||||
|
FETCH 5 IN cursor_1;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT cursor_with_intermediate_result_on_dist_table_with_param('cursor_1', '600');
|
||||||
|
|
||||||
|
-- multiple FETCH commands should not trigger re-running the subplans
|
||||||
|
-- also test with parameters
|
||||||
|
SET LOCAL citus.log_intermediate_results TO ON;
|
||||||
|
SET LOCAL client_min_messages TO DEBUG1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
FETCH ALL IN cursor_1;
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION value_counter() RETURNS TABLE(counter text) LANGUAGE PLPGSQL AS $function$
|
||||||
|
BEGIN
|
||||||
|
return query
|
||||||
|
WITH cte AS
|
||||||
|
(SELECT dt.value
|
||||||
|
FROM distributed_table dt
|
||||||
|
WHERE dt.value in
|
||||||
|
(SELECT value
|
||||||
|
FROM distributed_table p
|
||||||
|
GROUP BY p.value
|
||||||
|
HAVING count(*) > 0))
|
||||||
|
|
||||||
|
SELECT * FROM cte;
|
||||||
|
END;
|
||||||
|
$function$ ;
|
||||||
|
|
||||||
|
SET citus.log_intermediate_results TO ON;
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
\set VERBOSITY terse
|
||||||
|
SELECT count(*) from (SELECT value_counter()) as foo;
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) from (SELECT value_counter()) as foo;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- suppress NOTICEs
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA cursors CASCADE;
|
Loading…
Reference in New Issue