diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 890b78e45..4dfeedbd3 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -63,6 +63,9 @@ static List *plannerRestrictionContextList = NIL; int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log level */ static uint64 NextPlanId = 1; +/* keep track of planner call stack levels */ +int PlannerLevel = 0; + static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool IsUpdateOrDelete(Query *query); @@ -188,6 +191,14 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) PG_TRY(); { + /* + * We keep track of how many times we've recursed into the planner, primarily + * to detect whether we are in a function call. We need to make sure that the + * PlannerLevel is decremented exactly once at the end of this PG_TRY block, + * both in the happy case and when an error occurs. + */ + PlannerLevel++; + /* * For trivial queries, we're skipping the standard_planner() in * order to eliminate its overhead. @@ -242,14 +253,20 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) result->planTree->total_cost = FLT_MAX / 100000000; } } + + PlannerLevel--; } PG_CATCH(); { PopPlannerRestrictionContext(); + + PlannerLevel--; + PG_RE_THROW(); } PG_END_TRY(); + /* remove the context from the context list */ PopPlannerRestrictionContext(); diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 642482427..8201abb32 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -229,6 +229,17 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) ereport(DEBUG4, (errmsg("function is distributed"))); } + /* + * This can be called while executing INSERT ... SELECT func(). insert_select_executor + * doesn't get the planned subquery and gets the actual struct Query, so the planning + * for these kinds of queries happens at the execution time. + */ + if (ExecutingInsertSelect()) + { + ereport(DEBUG1, (errmsg("not pushing down function calls in INSERT ... SELECT"))); + return NULL; + } + if (IsMultiStatementTransaction()) { /* cannot delegate function calls in a multi-statement transaction */ @@ -288,17 +299,6 @@ TryToDelegateFunctionCall(Query *query, bool *hasExternParam) return NULL; } - /* - * This can be called while executing INSERT ... SELECT func(). insert_select_executor - * doesn't get the planned subquery and gets the actual struct Query, so the planning - * for these kinds of queries happens at the execution time. - */ - if (ExecutingInsertSelect()) - { - ereport(DEBUG1, (errmsg("not pushing down function calls in INSERT ... SELECT"))); - return NULL; - } - /* * This can be called in queries like SELECT ... WHERE EXISTS(SELECT func()), or other * forms of CTEs or subqueries. We don't push-down in those cases. diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index e32e95e42..e99a493b3 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,6 +21,7 @@ #include "access/xact.h" #include "distributed/backend_data.h" #include "distributed/connection_management.h" +#include "distributed/distributed_planner.h" #include "distributed/hash_helpers.h" #include "distributed/intermediate_results.h" #include "distributed/local_executor.h" @@ -291,6 +292,14 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) CoordinatedTransactionUses2PC = false; ExecutorLevel = 0; + /* + * Getting here without PlannerLevel 0 is a bug, however it is such a big + * problem that will persist between reuse of the backend we still assign 0 in + * production deploys, but during development and tests we want to crash. + */ + Assert(PlannerLevel == 0); + PlannerLevel = 0; + /* * We should reset SubPlanLevel in case a transaction is aborted, * otherwise this variable would stay +ve if the transaction is @@ -683,9 +692,12 @@ IsMultiStatementTransaction(void) * MaybeExecutingUDF returns true if we are possibly executing a function call. * We use nested level of executor to check this, so this can return true for * CTEs, etc. which also start nested executors. + * + * If the planner is being called from the executor, then we may also be in + * a UDF. */ static bool MaybeExecutingUDF(void) { - return ExecutorLevel > 1; + return ExecutorLevel > 1 || (ExecutorLevel == 1 && PlannerLevel > 0); } diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 1d54a901b..d620e47e5 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -32,6 +32,11 @@ #define CURSOR_OPT_FORCE_DISTRIBUTED 0x080000 + +/* level of planner calls */ +extern int PlannerLevel; + + typedef struct RelationRestrictionContext { bool hasDistributedRelation; diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 4b9fd2794..b537b8fb2 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -519,7 +519,7 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$3'); ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid HINT: Either provide a valid function argument name or a valid "$paramIndex" to create_distributed_function() SELECT create_distributed_function('add_with_param_names(int, int)', '$1a'); -ERROR: invalid input syntax for type integer: "1a" +ERROR: invalid input syntax for integer: "1a" -- non existing column name SELECT create_distributed_function('add_with_param_names(int, int)', 'aaa'); ERROR: cannot distribute the function "add_with_param_names" since the distribution argument is not valid @@ -745,6 +745,61 @@ SELECT public.wait_until_metadata_sync(); (1 row) +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 4; +CREATE TABLE test (id int, name text); +SELECT create_distributed_table('test','id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO test VALUES (3,'three'); +CREATE OR REPLACE FUNCTION increment(int) +RETURNS NUMERIC AS $$ +DECLARE ret_val NUMERIC; +BEGIN + SELECT max(id)::numeric+1 INTO ret_val FROM test WHERE id = $1; + RETURN ret_val; +END; +$$ LANGUAGE plpgsql; +SELECT create_distributed_function('increment(int)', '$1', colocate_with := 'test'); + create_distributed_function +----------------------------- + +(1 row) + +-- call a distributed function inside a pl/pgsql function +CREATE OR REPLACE FUNCTION test_func_calls_dist_func() +RETURNS NUMERIC AS $$ +DECLARE incremented_val NUMERIC; +BEGIN + SELECT INTO incremented_val increment(1); + RETURN incremented_val; +END; +$$ LANGUAGE plpgsql; +SELECT test_func_calls_dist_func(); + test_func_calls_dist_func +--------------------------- + +(1 row) + +SELECT test_func_calls_dist_func(); + test_func_calls_dist_func +--------------------------- + +(1 row) + +-- test an INSERT..SELECT via the coordinator just because it is kind of funky +INSERT INTO test SELECT increment(3); +SELECT * FROM test ORDER BY id; + id | name +----+------- + 3 | three + 4 | +(2 rows) + +DROP TABLE test; SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests2 CASCADE; diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index d8486a3a8..040a05aa7 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -422,6 +422,43 @@ SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); SELECT public.wait_until_metadata_sync(); +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 4; +CREATE TABLE test (id int, name text); +SELECT create_distributed_table('test','id'); +INSERT INTO test VALUES (3,'three'); + +CREATE OR REPLACE FUNCTION increment(int) +RETURNS NUMERIC AS $$ +DECLARE ret_val NUMERIC; +BEGIN + SELECT max(id)::numeric+1 INTO ret_val FROM test WHERE id = $1; + RETURN ret_val; +END; +$$ LANGUAGE plpgsql; + +SELECT create_distributed_function('increment(int)', '$1', colocate_with := 'test'); + +-- call a distributed function inside a pl/pgsql function + +CREATE OR REPLACE FUNCTION test_func_calls_dist_func() +RETURNS NUMERIC AS $$ +DECLARE incremented_val NUMERIC; +BEGIN + SELECT INTO incremented_val increment(1); + RETURN incremented_val; +END; +$$ LANGUAGE plpgsql; + +SELECT test_func_calls_dist_func(); +SELECT test_func_calls_dist_func(); + +-- test an INSERT..SELECT via the coordinator just because it is kind of funky +INSERT INTO test SELECT increment(3); +SELECT * FROM test ORDER BY id; + +DROP TABLE test; + SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests2 CASCADE;