From 5e97e5c98e8256a6b3175b828e186adfde96002c Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 30 Sep 2019 13:07:26 -0700 Subject: [PATCH] Don't push down queries when in subqueries/ctes --- .../executor/insert_select_executor.c | 44 +++++++++- .../planner/function_call_delegation.c | 25 ++++++ .../distributed/planner/recursive_planning.c | 19 ++++ .../distributed/insert_select_executor.h | 1 + src/include/distributed/recursive_planning.h | 3 +- .../multi_mx_function_call_delegation.out | 88 ++++++++++++++++++- .../sql/multi_mx_function_call_delegation.sql | 45 ++++++++++ 7 files changed, 218 insertions(+), 7 deletions(-) diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 8a30f1f5e..523c3852d 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -38,6 +38,11 @@ #include "utils/snapmgr.h" +/* depth of current insert/select executor. */ +static int insertSelectExecutorLevel = 0; + + +static TupleTableSlot * CoordinatorInsertSelectExecScanInternal(CustomScanState *node); static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, Query *selectQuery, EState *executorState); static HTAB * ExecuteSelectIntoColocatedIntermediateResults(Oid targetRelationId, @@ -51,14 +56,41 @@ static List * BuildColumnNameListFromTargetList(Oid targetRelationId, static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList); +/* + * CoordinatorInsertSelectExecScan is a wrapper around + * CoordinatorInsertSelectExecScanInternal which also properly increments + * or decrements insertSelectExecutorLevel. + */ +TupleTableSlot * +CoordinatorInsertSelectExecScan(CustomScanState *node) +{ + TupleTableSlot *result = NULL; + insertSelectExecutorLevel++; + + PG_TRY(); + { + result = CoordinatorInsertSelectExecScanInternal(node); + } + PG_CATCH(); + { + insertSelectExecutorLevel--; + PG_RE_THROW(); + } + PG_END_TRY(); + + insertSelectExecutorLevel--; + return result; +} + + /* * CoordinatorInsertSelectExecScan executes an INSERT INTO distributed_table * SELECT .. query by setting up a DestReceiver that copies tuples into the * distributed table and then executing the SELECT query using that DestReceiver * as the tuple destination. */ -TupleTableSlot * -CoordinatorInsertSelectExecScan(CustomScanState *node) +static TupleTableSlot * +CoordinatorInsertSelectExecScanInternal(CustomScanState *node) { CitusScanState *scanState = (CitusScanState *) node; TupleTableSlot *resultSlot = NULL; @@ -343,3 +375,11 @@ PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList) return -1; } + + +/* ExecutingInsertSelect returns true if we are executing an INSERT ...SELECT query */ +bool +ExecutingInsertSelect(void) +{ + return insertSelectExecutorLevel > 0; +} diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index b9bdbb253..5f92d39d6 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -22,6 +22,8 @@ #include "distributed/commands/multi_copy.h" #include "distributed/connection_management.h" #include "distributed/function_call_delegation.h" +#include "distributed/insert_select_planner.h" +#include "distributed/insert_select_executor.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" @@ -29,6 +31,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/remote_commands.h" #include "distributed/shard_pruning.h" +#include "distributed/recursive_planning.h" #include "distributed/version_compat.h" #include "distributed/worker_manager.h" #include "nodes/makefuncs.h" @@ -220,6 +223,28 @@ TryToDelegateFunctionCall(Query *query) return NULL; } + /* + * This can be called while executing INSERT ... SELECT func(). insert_select_executor + * doesn't get the planned subquery and gets the actual struct Query, so the planning + * for these kinds of queries happens at the execution time. + */ + if (ExecutingInsertSelect()) + { + ereport(DEBUG1, (errmsg("not pushing down function calls in INSERT ... SELECT"))); + return NULL; + } + + /* + * This can be called in queries like SELECT ... WHERE EXISTS(SELECT func()), or other + * forms of CTEs or subqueries. We don't push-down in those cases. + */ + if (GeneratingSubplans()) + { + ereport(DEBUG1, (errmsg( + "not pushing down function calls in CTEs or Subqueries"))); + return NULL; + } + partitionValueDatum = partitionValue->constvalue; if (partitionValue->consttype != partitionColumn->vartype) diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 8bb474638..1413b2087 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -87,6 +87,9 @@ #include "utils/guc.h" +/* track depth of current recursive planner query */ +static int recursivePlanningDepth = 0; + /* * RecursivePlanningContext is used to recursively plan subqueries * and CTEs, pull results to the coordinator, and push it back into @@ -186,6 +189,8 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, RecursivePlanningContext context; DeferredErrorMessage *error = NULL; + recursivePlanningDepth++; + /* * Plan subqueries and CTEs that cannot be pushed down by recursively * calling the planner and add the resulting plans to subPlanList. @@ -213,6 +218,7 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, error = RecursivelyPlanSubqueriesAndCTEs(originalQuery, &context); if (error != NULL) { + recursivePlanningDepth--; RaiseDeferredError(error, ERROR); } @@ -227,6 +233,8 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, ApplyLogRedaction(subPlanString->data)))); } + recursivePlanningDepth--; + return context.subPlanList; } @@ -1734,3 +1742,14 @@ GenerateResultId(uint64 planId, uint32 subPlanId) return resultId->data; } + + +/* + * GeneratingSubplans returns true if we are currently in the process of + * generating subplans. + */ +bool +GeneratingSubplans(void) +{ + return recursivePlanningDepth > 0; +} diff --git a/src/include/distributed/insert_select_executor.h b/src/include/distributed/insert_select_executor.h index bbbac7725..a7b32854c 100644 --- a/src/include/distributed/insert_select_executor.h +++ b/src/include/distributed/insert_select_executor.h @@ -18,6 +18,7 @@ extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node); +extern bool ExecutingInsertSelect(void); #endif /* INSERT_SELECT_EXECUTOR_H */ diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index f0c8ca2ed..182358164 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -21,13 +21,12 @@ #include "nodes/relation.h" #endif - extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); extern char * GenerateResultId(uint64 planId, uint32 subPlanId); extern Query * BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList, char *resultId); - +extern bool GeneratingSubplans(void); #endif /* RECURSIVE_PLANNING_H */ diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index 5c54674c3..848ff9061 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -371,6 +371,88 @@ DETAIL: WARNING from localhost:57638 ERROR: error CONTEXT: while executing command on localhost:57638 PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_raise(integer) line 4 at RAISE +-- Don't push-down when doing INSERT INTO ... SELECT func(); +SET client_min_messages TO ERROR; +CREATE TABLE test (x int primary key); +SELECT create_distributed_table('test','x'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION delegated_function(a int) +RETURNS int +LANGUAGE plpgsql +AS $function$ +DECLARE +BEGIN + INSERT INTO multi_mx_function_call_delegation.test VALUES (a); + INSERT INTO multi_mx_function_call_delegation.test VALUES (a + 1); + RETURN a+2; +END; +$function$; +SELECT create_distributed_function('delegated_function(int)', 'a'); + create_distributed_function +----------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; +INSERT INTO test SELECT delegated_function(1); +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: not pushing down function calls in INSERT ... SELECT +-- Don't push down in subqueries or CTEs. +SELECT * FROM test WHERE not exists( + SELECT delegated_function(4) +); +DEBUG: not pushing down function calls in CTEs or Subqueries +DEBUG: generating subplan 30_1 for subquery SELECT multi_mx_function_call_delegation.delegated_function(4) AS delegated_function +DEBUG: Plan 30 query after replacing subqueries and CTEs: SELECT x FROM multi_mx_function_call_delegation.test WHERE (NOT (EXISTS (SELECT intermediate_result.delegated_function FROM read_intermediate_result('30_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)))) + x +--- +(0 rows) + +WITH r AS ( + SELECT delegated_function(7) +) SELECT * FROM test WHERE (SELECT count(*)=0 FROM r); +DEBUG: generating subplan 33_1 for CTE r: SELECT multi_mx_function_call_delegation.delegated_function(7) AS delegated_function +DEBUG: not pushing down function calls in CTEs or Subqueries +DEBUG: generating subplan 33_2 for subquery SELECT (count(*) OPERATOR(pg_catalog.=) 0) FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('33_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) r +DEBUG: Plan 33 query after replacing subqueries and CTEs: SELECT x FROM multi_mx_function_call_delegation.test WHERE (SELECT intermediate_result."?column?" FROM read_intermediate_result('33_2'::text, 'binary'::citus_copy_format) intermediate_result("?column?" boolean)) + x +--- +(0 rows) + +WITH r AS ( + SELECT delegated_function(10) +), t AS ( + SELECT count(*) c FROM r +) SELECT * FROM test, t WHERE t.c=0; +DEBUG: generating subplan 37_1 for CTE r: SELECT multi_mx_function_call_delegation.delegated_function(10) AS delegated_function +DEBUG: not pushing down function calls in CTEs or Subqueries +DEBUG: generating subplan 37_2 for CTE t: SELECT count(*) AS c FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('37_1'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) r +DEBUG: Plan 37 query after replacing subqueries and CTEs: SELECT test.x, t.c FROM multi_mx_function_call_delegation.test, (SELECT intermediate_result.c FROM read_intermediate_result('37_2'::text, 'binary'::citus_copy_format) intermediate_result(c bigint)) t WHERE (t.c OPERATOR(pg_catalog.=) 0) + x | c +---+--- +(0 rows) + +WITH r AS ( + SELECT count(*) FROM test +), s AS ( + SELECT delegated_function(13) +), t AS ( + SELECT count(*) c FROM s +) SELECT * FROM test, r, t WHERE t.c=0; +DEBUG: generating subplan 41_1 for CTE r: SELECT count(*) AS count FROM multi_mx_function_call_delegation.test +DEBUG: generating subplan 41_2 for CTE s: SELECT multi_mx_function_call_delegation.delegated_function(13) AS delegated_function +DEBUG: not pushing down function calls in CTEs or Subqueries +DEBUG: generating subplan 41_3 for CTE t: SELECT count(*) AS c FROM (SELECT intermediate_result.delegated_function FROM read_intermediate_result('41_2'::text, 'binary'::citus_copy_format) intermediate_result(delegated_function integer)) s +DEBUG: Plan 41 query after replacing subqueries and CTEs: SELECT test.x, r.count, t.c FROM multi_mx_function_call_delegation.test, (SELECT intermediate_result.count FROM read_intermediate_result('41_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) r, (SELECT intermediate_result.c FROM read_intermediate_result('41_3'::text, 'binary'::citus_copy_format) intermediate_result(c bigint)) t WHERE (t.c OPERATOR(pg_catalog.=) 0) + x | count | c +---+-------+--- +(0 rows) + -- Test that we don't propagate to non-metadata worker nodes select stop_metadata_sync_to_node('localhost', :worker_1_port); stop_metadata_sync_to_node @@ -386,10 +468,10 @@ select stop_metadata_sync_to_node('localhost', :worker_2_port); select mx_call_func(2, 0); DEBUG: the worker node does not have metadata -DEBUG: generating subplan 27_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +DEBUG: generating subplan 46_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -DEBUG: Plan 27 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('27_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +DEBUG: Plan 46 query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('46_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment mx_call_func @@ -536,4 +618,4 @@ DEBUG: pushing down the function call RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA multi_mx_function_call_delegation CASCADE; -NOTICE: drop cascades to 10 other objects +NOTICE: drop cascades to 12 other objects diff --git a/src/test/regress/sql/multi_mx_function_call_delegation.sql b/src/test/regress/sql/multi_mx_function_call_delegation.sql index ddf92f9bf..3f41b80ce 100644 --- a/src/test/regress/sql/multi_mx_function_call_delegation.sql +++ b/src/test/regress/sql/multi_mx_function_call_delegation.sql @@ -152,6 +152,51 @@ END;$$; select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1'); select mx_call_func_raise(2); +-- Don't push-down when doing INSERT INTO ... SELECT func(); +SET client_min_messages TO ERROR; +CREATE TABLE test (x int primary key); +SELECT create_distributed_table('test','x'); + +CREATE OR REPLACE FUNCTION delegated_function(a int) +RETURNS int +LANGUAGE plpgsql +AS $function$ +DECLARE +BEGIN + INSERT INTO multi_mx_function_call_delegation.test VALUES (a); + INSERT INTO multi_mx_function_call_delegation.test VALUES (a + 1); + RETURN a+2; +END; +$function$; + +SELECT create_distributed_function('delegated_function(int)', 'a'); + +SET client_min_messages TO DEBUG1; +INSERT INTO test SELECT delegated_function(1); + +-- Don't push down in subqueries or CTEs. +SELECT * FROM test WHERE not exists( + SELECT delegated_function(4) +); + +WITH r AS ( + SELECT delegated_function(7) +) SELECT * FROM test WHERE (SELECT count(*)=0 FROM r); + +WITH r AS ( + SELECT delegated_function(10) +), t AS ( + SELECT count(*) c FROM r +) SELECT * FROM test, t WHERE t.c=0; + +WITH r AS ( + SELECT count(*) FROM test +), s AS ( + SELECT delegated_function(13) +), t AS ( + SELECT count(*) c FROM s +) SELECT * FROM test, r, t WHERE t.c=0; + -- Test that we don't propagate to non-metadata worker nodes select stop_metadata_sync_to_node('localhost', :worker_1_port); select stop_metadata_sync_to_node('localhost', :worker_2_port);