From d28beb371116ba6257fe4f3991c54151d1af4256 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 2 Dec 2019 17:40:37 -0800 Subject: [PATCH] Detect SQL UDF Calls. --- .../distributed/executor/multi_executor.c | 37 ++++++++----------- .../transaction/transaction_management.c | 20 +++++++--- src/include/distributed/multi_executor.h | 1 + .../distributed/transaction_management.h | 3 -- .../regress/expected/multi_sql_function.out | 34 +++++++++++++++-- ...licate_reference_tables_to_coordinator.out | 26 ++++++++----- src/test/regress/sql/multi_sql_function.sql | 21 ++++++++++- ...licate_reference_tables_to_coordinator.sql | 10 +++++ 8 files changed, 108 insertions(+), 44 deletions(-) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index ea33cd898..466c1c023 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -56,6 +56,13 @@ bool WritableStandbyCoordinator = false; /* sort the returning to get consistent outputs, used only for testing */ bool SortReturning = false; +/* + * How many nested executors have we started? This can happen for SQL + * UDF calls. The outer query starts an executor, then postgres opens + * another executor to run the SQL UDF. + */ +int ExecutorLevel = 0; + /* local function forward declarations */ static Relation StubRelation(TupleDesc tupleDescriptor); @@ -114,18 +121,9 @@ CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { DestReceiver *dest = queryDesc->dest; - int originalLevel = FunctionCallLevel; - - if (dest->mydest == DestSPI) - { - /* - * If the query runs via SPI, we assume we're in a function call - * and we should treat statements as part of a bigger transaction. - * We reset this counter to 0 in the abort handler. - */ - FunctionCallLevel++; - } + int originalLevel = ExecutorLevel; + ExecutorLevel++; if (CitusHasBeenLoaded()) { if (IsLocalReferenceTableJoinPlan(queryDesc->plannedstmt) && @@ -139,7 +137,8 @@ CitusExecutorRun(QueryDesc *queryDesc, */ ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot join local tables and reference tables in " - "a transaction block"))); + "a transaction block, udf block, or distributed " + "CTE subquery"))); } } @@ -175,15 +174,11 @@ CitusExecutorRun(QueryDesc *queryDesc, standard_ExecutorRun(queryDesc, direction, count, execute_once); } - if (dest->mydest == DestSPI) - { - /* - * Restore the original value. It is not sufficient to decrease - * the value because exceptions might cause us to go back a few - * levels at once. - */ - FunctionCallLevel = originalLevel; - } + /* + * Restore the original value. It is not sufficient to decrease the value + * because exceptions might cause us to go back a few levels at once. + */ + ExecutorLevel = originalLevel; } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 3e63492bb..0ca2b9d61 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -94,9 +94,6 @@ bool CoordinatedTransactionUses2PC = false; /* if disabled, distributed statements in a function may run as separate transactions */ bool FunctionOpensTransactionBlock = true; -/* stack depth of UDF calls */ -int FunctionCallLevel = 0; - /* transaction management functions */ static void BeginCoordinatedTransaction(void); @@ -110,6 +107,7 @@ static void AdjustMaxPreparedTransactions(void); static void PushSubXact(SubTransactionId subId); static void PopSubXact(SubTransactionId subId); static void SwallowErrors(void (*func)()); +static bool MaybeExecutingUDF(void); /* @@ -303,7 +301,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) dlist_init(&InProgressTransactions); activeSetStmts = NULL; CoordinatedTransactionUses2PC = false; - FunctionCallLevel = 0; + ExecutorLevel = 0; /* * We should reset SubPlanLevel in case a transaction is aborted, @@ -681,7 +679,7 @@ IsMultiStatementTransaction(void) /* in (a transaction within) a stored procedure */ return true; } - else if (FunctionCallLevel > 0 && FunctionOpensTransactionBlock) + else if (MaybeExecutingUDF() && FunctionOpensTransactionBlock) { /* in a language-handler function call, open a transaction if configured to do so */ return true; @@ -691,3 +689,15 @@ IsMultiStatementTransaction(void) return false; } } + + +/* + * MaybeExecutingUDF returns true if we are possibly executing a function call. + * We use nested level of executor to check this, so this can return true for + * CTEs, etc. which also start nested executors. + */ +static bool +MaybeExecutingUDF(void) +{ + return ExecutorLevel > 1; +} diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 9ea5c4cbd..37ef2d55b 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -32,6 +32,7 @@ extern bool ForceMaxQueryParallelization; extern int MaxAdaptiveExecutorPoolSize; extern int ExecutorSlowStartInterval; extern bool SortReturning; +extern int ExecutorLevel; extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags); diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index ccf63c315..0c382c161 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -100,9 +100,6 @@ extern int StoredProcedureLevel; /* number of nested DO block levels we are currently in */ extern int DoBlockLevel; -/* number of nested function call levels we are currently in */ -extern int FunctionCallLevel; - /* SET LOCAL statements active in the current (sub-)transaction. */ extern StringInfo activeSetStmts; diff --git a/src/test/regress/expected/multi_sql_function.out b/src/test/regress/expected/multi_sql_function.out index 771e836ab..dc4c2547f 100644 --- a/src/test/regress/expected/multi_sql_function.out +++ b/src/test/regress/expected/multi_sql_function.out @@ -314,7 +314,7 @@ $$ LANGUAGE SQL STABLE; CREATE OR REPLACE FUNCTION test_parameterized_sql_function_in_subquery_where(org_id_val integer) RETURNS TABLE (a bigint) AS $$ - SELECT count(*) AS count_val from test_parameterized_sql as t1 where + SELECT count(*) AS count_val from test_parameterized_sql as t1 where org_id IN (SELECT org_id FROM test_parameterized_sql as t2 WHERE t2.org_id = t1.org_id AND org_id = org_id_val); $$ LANGUAGE SQL STABLE; INSERT INTO test_parameterized_sql VALUES(1, 1); @@ -332,8 +332,36 @@ ERROR: could not create distributed plan DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus. HINT: Consider using PL/pgSQL functions instead. CONTEXT: SQL function "test_parameterized_sql_function_in_subquery_where" statement 1 -DROP TABLE temp_table; -DROP TABLE test_parameterized_sql; +-- test that sql function calls are treated as multi-statement transactions +-- and are rolled back properly. Single-row inserts for not-replicated tables +-- don't go over 2PC if they are not part of a bigger transaction. +CREATE TABLE table_with_unique_constraint (a int UNIQUE); +SELECT create_distributed_table('table_with_unique_constraint', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO table_with_unique_constraint VALUES (1), (2), (3); +CREATE OR REPLACE FUNCTION insert_twice() RETURNS VOID +AS $$ + INSERT INTO table_with_unique_constraint VALUES (4); + INSERT INTO table_with_unique_constraint VALUES (4); +$$ LANGUAGE SQL; +SELECT insert_twice(); +ERROR: duplicate key value violates unique constraint "table_with_unique_constraint_a_key_1230009" +DETAIL: Key (a)=(4) already exists. +CONTEXT: while executing command on localhost:57638 +SQL function "insert_twice" statement 2 +SELECT * FROM table_with_unique_constraint ORDER BY a; + a +--- + 1 + 2 + 3 +(3 rows) + +DROP TABLE temp_table, test_parameterized_sql, table_with_unique_constraint; -- clean-up functions DROP FUNCTION sql_test_no_1(); DROP FUNCTION sql_test_no_2(); diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out index 08b9323d1..f825e5a50 100644 --- a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -128,7 +128,7 @@ ORDER BY 1,2,3; -- error if in transaction block BEGIN; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; -ERROR: cannot join local tables and reference tables in a transaction block +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery ROLLBACK; -- error if in a DO block DO $$ @@ -136,7 +136,7 @@ BEGIN PERFORM local_table.a, numbers.a FROM local_table NATURAL JOIN numbers; END; $$; -ERROR: cannot join local tables and reference tables in a transaction block +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery CONTEXT: SQL statement "SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers" PL/pgSQL function inline_code_block line 3 at PERFORM -- test plpgsql function @@ -154,7 +154,7 @@ SELECT test_reference_local_join_plpgsql_func(); LOG: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 (a) VALUES (4) CONTEXT: SQL statement "INSERT INTO numbers VALUES (4)" PL/pgSQL function test_reference_local_join_plpgsql_func() line 4 at SQL statement -ERROR: cannot join local tables and reference tables in a transaction block +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery CONTEXT: SQL statement "SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1" PL/pgSQL function test_reference_local_join_plpgsql_func() line 5 at PERFORM SELECT sum(a) FROM local_table; @@ -175,7 +175,7 @@ CREATE PROCEDURE test_reference_local_join_proc() AS $$ SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; $$ LANGUAGE sql; CALL test_reference_local_join_proc(); -ERROR: cannot join local tables and reference tables in a transaction block +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery CONTEXT: SQL function "test_reference_local_join_proc" statement 1 -- error if in a transaction block even if reference table is not in search path CREATE SCHEMA s1; @@ -188,12 +188,22 @@ SELECT create_reference_table('s1.ref'); BEGIN; SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1; -ERROR: cannot join local tables and reference tables in a transaction block +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery ROLLBACK; DROP SCHEMA s1 CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to table s1.ref drop cascades to table s1.ref_8000002 +-- error if inside a SQL UDF call +CREATE or replace FUNCTION test_reference_local_join_func() +RETURNS SETOF RECORD AS $$ +SET LOCAL citus.enable_local_execution to false; +INSERT INTO numbers VALUES (2); +SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; +$$ LANGUAGE sql; +SELECT test_reference_local_join_func(); +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery +CONTEXT: SQL function "test_reference_local_join_func" statement 3 -- shouldn't plan locally if modifications happen in CTEs, ... WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) SELECT * FROM numbers, local_table; @@ -226,11 +236,7 @@ ERROR: relation local_table is not distributed -- test CTE being reference/local join for distributed query WITH t as (SELECT n.a, random() x FROM numbers n NATURAL JOIN local_table l) SELECT a FROM t NATURAL JOIN dist; - a ----- - 20 -(1 row) - +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery -- error if FOR UPDATE/FOR SHARE SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE; ERROR: could not run distributed query with FOR UPDATE/SHARE commands diff --git a/src/test/regress/sql/multi_sql_function.sql b/src/test/regress/sql/multi_sql_function.sql index 83fb7c231..fc191b228 100644 --- a/src/test/regress/sql/multi_sql_function.sql +++ b/src/test/regress/sql/multi_sql_function.sql @@ -149,8 +149,25 @@ SELECT * FROM test_parameterized_sql_function(1); SELECT test_parameterized_sql_function(1); SELECT test_parameterized_sql_function_in_subquery_where(1); -DROP TABLE temp_table; -DROP TABLE test_parameterized_sql; +-- test that sql function calls are treated as multi-statement transactions +-- and are rolled back properly. Single-row inserts for not-replicated tables +-- don't go over 2PC if they are not part of a bigger transaction. +CREATE TABLE table_with_unique_constraint (a int UNIQUE); +SELECT create_distributed_table('table_with_unique_constraint', 'a'); + +INSERT INTO table_with_unique_constraint VALUES (1), (2), (3); + +CREATE OR REPLACE FUNCTION insert_twice() RETURNS VOID +AS $$ + INSERT INTO table_with_unique_constraint VALUES (4); + INSERT INTO table_with_unique_constraint VALUES (4); +$$ LANGUAGE SQL; + +SELECT insert_twice(); + +SELECT * FROM table_with_unique_constraint ORDER BY a; + +DROP TABLE temp_table, test_parameterized_sql, table_with_unique_constraint; -- clean-up functions DROP FUNCTION sql_test_no_1(); diff --git a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql index 9804cd9aa..b2e3667e0 100644 --- a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql +++ b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql @@ -98,6 +98,16 @@ ROLLBACK; DROP SCHEMA s1 CASCADE; +-- error if inside a SQL UDF call +CREATE or replace FUNCTION test_reference_local_join_func() +RETURNS SETOF RECORD AS $$ +SET LOCAL citus.enable_local_execution to false; +INSERT INTO numbers VALUES (2); +SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; +$$ LANGUAGE sql; + +SELECT test_reference_local_join_func(); + -- shouldn't plan locally if modifications happen in CTEs, ... WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) SELECT * FROM numbers, local_table;