diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index bc0017191..436ce34be 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -57,6 +57,13 @@ bool WritableStandbyCoordinator = false; /* sort the returning to get consistent outputs, used only for testing */ bool SortReturning = false; +/* + * How many nested executors have we started? This can happen for SQL + * UDF calls. The outer query starts an executor, then postgres opens + * another executor to run the SQL UDF. + */ +int ExecutorLevel = 0; + /* local function forward declarations */ static Relation StubRelation(TupleDesc tupleDescriptor); @@ -72,23 +79,6 @@ CitusExecutorStart(QueryDesc *queryDesc, int eflags) { PlannedStmt *plannedStmt = queryDesc->plannedstmt; - if (CitusHasBeenLoaded()) - { - if (IsLocalReferenceTableJoinPlan(plannedStmt) && - IsMultiStatementTransaction()) - { - /* - * Currently we don't support this to avoid problems with tuple - * visibility, locking, etc. For example, change to the reference - * table can go through a MultiConnection, which won't be visible - * to the locally planned queries. - */ - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot join local tables and reference tables in " - "a transaction block"))); - } - } - /* * We cannot modify XactReadOnly on Windows because it is not * declared with PGDLLIMPORT. @@ -132,16 +122,25 @@ CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { DestReceiver *dest = queryDesc->dest; - int originalLevel = FunctionCallLevel; + int originalLevel = ExecutorLevel; - if (dest->mydest == DestSPI) + ExecutorLevel++; + if (CitusHasBeenLoaded()) { - /* - * If the query runs via SPI, we assume we're in a function call - * and we should treat statements as part of a bigger transaction. - * We reset this counter to 0 in the abort handler. - */ - FunctionCallLevel++; + if (IsLocalReferenceTableJoinPlan(queryDesc->plannedstmt) && + IsMultiStatementTransaction()) + { + /* + * Currently we don't support this to avoid problems with tuple + * visibility, locking, etc. For example, change to the reference + * table can go through a MultiConnection, which won't be visible + * to the locally planned queries. + */ + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot join local tables and reference tables in " + "a transaction block, udf block, or distributed " + "CTE subquery"))); + } } /* @@ -176,15 +175,11 @@ CitusExecutorRun(QueryDesc *queryDesc, standard_ExecutorRun(queryDesc, direction, count, execute_once); } - if (dest->mydest == DestSPI) - { - /* - * Restore the original value. It is not sufficient to decrease - * the value because exceptions might cause us to go back a few - * levels at once. - */ - FunctionCallLevel = originalLevel; - } + /* + * Restore the original value. It is not sufficient to decrease the value + * because exceptions might cause us to go back a few levels at once. + */ + ExecutorLevel = originalLevel; } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 3e63492bb..0ca2b9d61 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -94,9 +94,6 @@ bool CoordinatedTransactionUses2PC = false; /* if disabled, distributed statements in a function may run as separate transactions */ bool FunctionOpensTransactionBlock = true; -/* stack depth of UDF calls */ -int FunctionCallLevel = 0; - /* transaction management functions */ static void BeginCoordinatedTransaction(void); @@ -110,6 +107,7 @@ static void AdjustMaxPreparedTransactions(void); static void PushSubXact(SubTransactionId subId); static void PopSubXact(SubTransactionId subId); static void SwallowErrors(void (*func)()); +static bool MaybeExecutingUDF(void); /* @@ -303,7 +301,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) dlist_init(&InProgressTransactions); activeSetStmts = NULL; CoordinatedTransactionUses2PC = false; - FunctionCallLevel = 0; + ExecutorLevel = 0; /* * We should reset SubPlanLevel in case a transaction is aborted, @@ -681,7 +679,7 @@ IsMultiStatementTransaction(void) /* in (a transaction within) a stored procedure */ return true; } - else if (FunctionCallLevel > 0 && FunctionOpensTransactionBlock) + else if (MaybeExecutingUDF() && FunctionOpensTransactionBlock) { /* in a language-handler function call, open a transaction if configured to do so */ return true; @@ -691,3 +689,15 @@ IsMultiStatementTransaction(void) return false; } } + + +/* + * MaybeExecutingUDF returns true if we are possibly executing a function call. + * We use nested level of executor to check this, so this can return true for + * CTEs, etc. which also start nested executors. + */ +static bool +MaybeExecutingUDF(void) +{ + return ExecutorLevel > 1; +} diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 9ea5c4cbd..37ef2d55b 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -32,6 +32,7 @@ extern bool ForceMaxQueryParallelization; extern int MaxAdaptiveExecutorPoolSize; extern int ExecutorSlowStartInterval; extern bool SortReturning; +extern int ExecutorLevel; extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags); diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index ccf63c315..0c382c161 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -100,9 +100,6 @@ extern int StoredProcedureLevel; /* number of nested DO block levels we are currently in */ extern int DoBlockLevel; -/* number of nested function call levels we are currently in */ -extern int FunctionCallLevel; - /* SET LOCAL statements active in the current (sub-)transaction. */ extern StringInfo activeSetStmts; diff --git a/src/test/regress/expected/multi_sql_function.out b/src/test/regress/expected/multi_sql_function.out index 771e836ab..dc4c2547f 100644 --- a/src/test/regress/expected/multi_sql_function.out +++ b/src/test/regress/expected/multi_sql_function.out @@ -314,7 +314,7 @@ $$ LANGUAGE SQL STABLE; CREATE OR REPLACE FUNCTION test_parameterized_sql_function_in_subquery_where(org_id_val integer) RETURNS TABLE (a bigint) AS $$ - SELECT count(*) AS count_val from test_parameterized_sql as t1 where + SELECT count(*) AS count_val from test_parameterized_sql as t1 where org_id IN (SELECT org_id FROM test_parameterized_sql as t2 WHERE t2.org_id = t1.org_id AND org_id = org_id_val); $$ LANGUAGE SQL STABLE; INSERT INTO test_parameterized_sql VALUES(1, 1); @@ -332,8 +332,36 @@ ERROR: could not create distributed plan DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus. HINT: Consider using PL/pgSQL functions instead. CONTEXT: SQL function "test_parameterized_sql_function_in_subquery_where" statement 1 -DROP TABLE temp_table; -DROP TABLE test_parameterized_sql; +-- test that sql function calls are treated as multi-statement transactions +-- and are rolled back properly. Single-row inserts for not-replicated tables +-- don't go over 2PC if they are not part of a bigger transaction. +CREATE TABLE table_with_unique_constraint (a int UNIQUE); +SELECT create_distributed_table('table_with_unique_constraint', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO table_with_unique_constraint VALUES (1), (2), (3); +CREATE OR REPLACE FUNCTION insert_twice() RETURNS VOID +AS $$ + INSERT INTO table_with_unique_constraint VALUES (4); + INSERT INTO table_with_unique_constraint VALUES (4); +$$ LANGUAGE SQL; +SELECT insert_twice(); +ERROR: duplicate key value violates unique constraint "table_with_unique_constraint_a_key_1230009" +DETAIL: Key (a)=(4) already exists. +CONTEXT: while executing command on localhost:57638 +SQL function "insert_twice" statement 2 +SELECT * FROM table_with_unique_constraint ORDER BY a; + a +--- + 1 + 2 + 3 +(3 rows) + +DROP TABLE temp_table, test_parameterized_sql, table_with_unique_constraint; -- clean-up functions DROP FUNCTION sql_test_no_1(); DROP FUNCTION sql_test_no_2(); diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out index bd4c9a8ee..e6a81dea1 100644 --- a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -116,7 +116,7 @@ SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1 -- error if in transaction block BEGIN; SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; -ERROR: cannot join local tables and reference tables in a transaction block +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery ROLLBACK; -- error if in a transaction block even if reference table is not in search path CREATE SCHEMA s1; @@ -129,34 +129,44 @@ SELECT create_reference_table('s1.ref'); BEGIN; SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1; -ERROR: cannot join local tables and reference tables in a transaction block +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery ROLLBACK; BEGIN; WITH t1 AS ( SELECT random() r, a FROM local_table ) SELECT count(*) FROM t1, numbers WHERE t1.a = numbers.a AND r < 0.5; -ERROR: cannot join local tables and reference tables in a transaction block +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery END; BEGIN; WITH t1 AS ( SELECT random() r, a FROM numbers ) SELECT count(*) FROM t1, local_table WHERE t1.a = local_table.a AND r < 0.5; -ERROR: cannot join local tables and reference tables in a transaction block +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery END; BEGIN; SELECT count(*) FROM local_table WHERE EXISTS(SELECT random() FROM numbers WHERE local_table.a = numbers.a); -ERROR: cannot join local tables and reference tables in a transaction block +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery END; BEGIN; SELECT count(*) FROM numbers WHERE EXISTS(SELECT random() FROM local_table WHERE local_table.a = numbers.a); -ERROR: cannot join local tables and reference tables in a transaction block +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery END; DROP SCHEMA s1 CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to table s1.ref drop cascades to table s1.ref_8000002 +-- error if inside a SQL UDF call +CREATE or replace FUNCTION test_reference_local_join_func() +RETURNS SETOF RECORD AS $$ +SET LOCAL citus.enable_local_execution to false; +INSERT INTO numbers VALUES (2); +SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; +$$ LANGUAGE sql; +SELECT test_reference_local_join_func(); +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery +CONTEXT: SQL function "test_reference_local_join_func" statement 3 -- shouldn't plan locally if modifications happen in CTEs, ... WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) SELECT * FROM numbers, local_table; ERROR: relation local_table is not distributed @@ -210,7 +220,7 @@ $Q$); coordinator_plan ------------------------------------------------ Custom Scan (Citus Adaptive) - -> Distributed Subplan 19_1 + -> Distributed Subplan 20_1 -> Seq Scan on local_table Filter: ((a >= 1) AND (a <= 10)) Task Count: 1 @@ -235,7 +245,7 @@ $Q$); BEGIN; SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a; -ERROR: cannot join local tables and reference tables in a transaction block +ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery END; -- -- Joins between reference tables, local tables, and function calls shouldn't diff --git a/src/test/regress/sql/multi_sql_function.sql b/src/test/regress/sql/multi_sql_function.sql index 83fb7c231..fc191b228 100644 --- a/src/test/regress/sql/multi_sql_function.sql +++ b/src/test/regress/sql/multi_sql_function.sql @@ -149,8 +149,25 @@ SELECT * FROM test_parameterized_sql_function(1); SELECT test_parameterized_sql_function(1); SELECT test_parameterized_sql_function_in_subquery_where(1); -DROP TABLE temp_table; -DROP TABLE test_parameterized_sql; +-- test that sql function calls are treated as multi-statement transactions +-- and are rolled back properly. Single-row inserts for not-replicated tables +-- don't go over 2PC if they are not part of a bigger transaction. +CREATE TABLE table_with_unique_constraint (a int UNIQUE); +SELECT create_distributed_table('table_with_unique_constraint', 'a'); + +INSERT INTO table_with_unique_constraint VALUES (1), (2), (3); + +CREATE OR REPLACE FUNCTION insert_twice() RETURNS VOID +AS $$ + INSERT INTO table_with_unique_constraint VALUES (4); + INSERT INTO table_with_unique_constraint VALUES (4); +$$ LANGUAGE SQL; + +SELECT insert_twice(); + +SELECT * FROM table_with_unique_constraint ORDER BY a; + +DROP TABLE temp_table, test_parameterized_sql, table_with_unique_constraint; -- clean-up functions DROP FUNCTION sql_test_no_1(); diff --git a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql index 1557233f7..44ed93b10 100644 --- a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql +++ b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql @@ -86,6 +86,16 @@ END; DROP SCHEMA s1 CASCADE; +-- error if inside a SQL UDF call +CREATE or replace FUNCTION test_reference_local_join_func() +RETURNS SETOF RECORD AS $$ +SET LOCAL citus.enable_local_execution to false; +INSERT INTO numbers VALUES (2); +SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1; +$$ LANGUAGE sql; + +SELECT test_reference_local_join_func(); + -- shouldn't plan locally if modifications happen in CTEs, ... WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) SELECT * FROM numbers, local_table; WITH t AS (SELECT *, random() x FROM numbers FOR UPDATE) SELECT * FROM numbers, local_table