Merge pull request #3257 from citusdata/fix_sql_udf_calls

Detect SQL UDF Calls.
pull/3269/head^2
Hadi Moshayedi 2019-12-05 14:40:00 -08:00 committed by GitHub
commit 9c254859bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 108 additions and 44 deletions

View File

@ -56,6 +56,13 @@ bool WritableStandbyCoordinator = false;
/* sort the returning to get consistent outputs, used only for testing */
bool SortReturning = false;
/*
* How many nested executors have we started? This can happen for SQL
* UDF calls. The outer query starts an executor, then postgres opens
* another executor to run the SQL UDF.
*/
int ExecutorLevel = 0;
/* local function forward declarations */
static Relation StubRelation(TupleDesc tupleDescriptor);
@ -114,18 +121,9 @@ CitusExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count, bool execute_once)
{
DestReceiver *dest = queryDesc->dest;
int originalLevel = FunctionCallLevel;
if (dest->mydest == DestSPI)
{
/*
* If the query runs via SPI, we assume we're in a function call
* and we should treat statements as part of a bigger transaction.
* We reset this counter to 0 in the abort handler.
*/
FunctionCallLevel++;
}
int originalLevel = ExecutorLevel;
ExecutorLevel++;
if (CitusHasBeenLoaded())
{
if (IsLocalReferenceTableJoinPlan(queryDesc->plannedstmt) &&
@ -139,7 +137,8 @@ CitusExecutorRun(QueryDesc *queryDesc,
*/
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot join local tables and reference tables in "
"a transaction block")));
"a transaction block, udf block, or distributed "
"CTE subquery")));
}
}
@ -175,15 +174,11 @@ CitusExecutorRun(QueryDesc *queryDesc,
standard_ExecutorRun(queryDesc, direction, count, execute_once);
}
if (dest->mydest == DestSPI)
{
/*
* Restore the original value. It is not sufficient to decrease
* the value because exceptions might cause us to go back a few
* levels at once.
*/
FunctionCallLevel = originalLevel;
}
/*
* Restore the original value. It is not sufficient to decrease the value
* because exceptions might cause us to go back a few levels at once.
*/
ExecutorLevel = originalLevel;
}

View File

@ -94,9 +94,6 @@ bool CoordinatedTransactionUses2PC = false;
/* if disabled, distributed statements in a function may run as separate transactions */
bool FunctionOpensTransactionBlock = true;
/* stack depth of UDF calls */
int FunctionCallLevel = 0;
/* transaction management functions */
static void BeginCoordinatedTransaction(void);
@ -110,6 +107,7 @@ static void AdjustMaxPreparedTransactions(void);
static void PushSubXact(SubTransactionId subId);
static void PopSubXact(SubTransactionId subId);
static void SwallowErrors(void (*func)());
static bool MaybeExecutingUDF(void);
/*
@ -303,7 +301,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
dlist_init(&InProgressTransactions);
activeSetStmts = NULL;
CoordinatedTransactionUses2PC = false;
FunctionCallLevel = 0;
ExecutorLevel = 0;
/*
* We should reset SubPlanLevel in case a transaction is aborted,
@ -681,7 +679,7 @@ IsMultiStatementTransaction(void)
/* in (a transaction within) a stored procedure */
return true;
}
else if (FunctionCallLevel > 0 && FunctionOpensTransactionBlock)
else if (MaybeExecutingUDF() && FunctionOpensTransactionBlock)
{
/* in a language-handler function call, open a transaction if configured to do so */
return true;
@ -691,3 +689,15 @@ IsMultiStatementTransaction(void)
return false;
}
}
/*
* MaybeExecutingUDF returns true if we are possibly executing a function call.
* We use nested level of executor to check this, so this can return true for
* CTEs, etc. which also start nested executors.
*/
static bool
MaybeExecutingUDF(void)
{
return ExecutorLevel > 1;
}

View File

@ -32,6 +32,7 @@ extern bool ForceMaxQueryParallelization;
extern int MaxAdaptiveExecutorPoolSize;
extern int ExecutorSlowStartInterval;
extern bool SortReturning;
extern int ExecutorLevel;
extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags);

View File

@ -100,9 +100,6 @@ extern int StoredProcedureLevel;
/* number of nested DO block levels we are currently in */
extern int DoBlockLevel;
/* number of nested function call levels we are currently in */
extern int FunctionCallLevel;
/* SET LOCAL statements active in the current (sub-)transaction. */
extern StringInfo activeSetStmts;

View File

@ -332,8 +332,36 @@ ERROR: could not create distributed plan
DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus.
HINT: Consider using PL/pgSQL functions instead.
CONTEXT: SQL function "test_parameterized_sql_function_in_subquery_where" statement 1
DROP TABLE temp_table;
DROP TABLE test_parameterized_sql;
-- test that sql function calls are treated as multi-statement transactions
-- and are rolled back properly. Single-row inserts for not-replicated tables
-- don't go over 2PC if they are not part of a bigger transaction.
CREATE TABLE table_with_unique_constraint (a int UNIQUE);
SELECT create_distributed_table('table_with_unique_constraint', 'a');
create_distributed_table
--------------------------
(1 row)
INSERT INTO table_with_unique_constraint VALUES (1), (2), (3);
CREATE OR REPLACE FUNCTION insert_twice() RETURNS VOID
AS $$
INSERT INTO table_with_unique_constraint VALUES (4);
INSERT INTO table_with_unique_constraint VALUES (4);
$$ LANGUAGE SQL;
SELECT insert_twice();
ERROR: duplicate key value violates unique constraint "table_with_unique_constraint_a_key_1230009"
DETAIL: Key (a)=(4) already exists.
CONTEXT: while executing command on localhost:57638
SQL function "insert_twice" statement 2
SELECT * FROM table_with_unique_constraint ORDER BY a;
a
---
1
2
3
(3 rows)
DROP TABLE temp_table, test_parameterized_sql, table_with_unique_constraint;
-- clean-up functions
DROP FUNCTION sql_test_no_1();
DROP FUNCTION sql_test_no_2();

View File

@ -128,7 +128,7 @@ ORDER BY 1,2,3;
-- error if in transaction block
BEGIN;
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
ERROR: cannot join local tables and reference tables in a transaction block
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
ROLLBACK;
-- error if in a DO block
DO $$
@ -136,7 +136,7 @@ BEGIN
PERFORM local_table.a, numbers.a FROM local_table NATURAL JOIN numbers;
END;
$$;
ERROR: cannot join local tables and reference tables in a transaction block
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
CONTEXT: SQL statement "SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers"
PL/pgSQL function inline_code_block line 3 at PERFORM
-- test plpgsql function
@ -154,7 +154,7 @@ SELECT test_reference_local_join_plpgsql_func();
LOG: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 (a) VALUES (4)
CONTEXT: SQL statement "INSERT INTO numbers VALUES (4)"
PL/pgSQL function test_reference_local_join_plpgsql_func() line 4 at SQL statement
ERROR: cannot join local tables and reference tables in a transaction block
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
CONTEXT: SQL statement "SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1"
PL/pgSQL function test_reference_local_join_plpgsql_func() line 5 at PERFORM
SELECT sum(a) FROM local_table;
@ -175,7 +175,7 @@ CREATE PROCEDURE test_reference_local_join_proc() AS $$
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
$$ LANGUAGE sql;
CALL test_reference_local_join_proc();
ERROR: cannot join local tables and reference tables in a transaction block
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
CONTEXT: SQL function "test_reference_local_join_proc" statement 1
-- error if in a transaction block even if reference table is not in search path
CREATE SCHEMA s1;
@ -188,12 +188,22 @@ SELECT create_reference_table('s1.ref');
BEGIN;
SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1;
ERROR: cannot join local tables and reference tables in a transaction block
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
ROLLBACK;
DROP SCHEMA s1 CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table s1.ref
drop cascades to table s1.ref_8000002
-- error if inside a SQL UDF call
CREATE or replace FUNCTION test_reference_local_join_func()
RETURNS SETOF RECORD AS $$
SET LOCAL citus.enable_local_execution to false;
INSERT INTO numbers VALUES (2);
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
$$ LANGUAGE sql;
SELECT test_reference_local_join_func();
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
CONTEXT: SQL function "test_reference_local_join_func" statement 3
-- shouldn't plan locally if modifications happen in CTEs, ...
WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *)
SELECT * FROM numbers, local_table;
@ -226,11 +236,7 @@ ERROR: relation local_table is not distributed
-- test CTE being reference/local join for distributed query
WITH t as (SELECT n.a, random() x FROM numbers n NATURAL JOIN local_table l)
SELECT a FROM t NATURAL JOIN dist;
a
----
20
(1 row)
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
-- error if FOR UPDATE/FOR SHARE
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE;
ERROR: could not run distributed query with FOR UPDATE/SHARE commands

View File

@ -149,8 +149,25 @@ SELECT * FROM test_parameterized_sql_function(1);
SELECT test_parameterized_sql_function(1);
SELECT test_parameterized_sql_function_in_subquery_where(1);
DROP TABLE temp_table;
DROP TABLE test_parameterized_sql;
-- test that sql function calls are treated as multi-statement transactions
-- and are rolled back properly. Single-row inserts for not-replicated tables
-- don't go over 2PC if they are not part of a bigger transaction.
CREATE TABLE table_with_unique_constraint (a int UNIQUE);
SELECT create_distributed_table('table_with_unique_constraint', 'a');
INSERT INTO table_with_unique_constraint VALUES (1), (2), (3);
CREATE OR REPLACE FUNCTION insert_twice() RETURNS VOID
AS $$
INSERT INTO table_with_unique_constraint VALUES (4);
INSERT INTO table_with_unique_constraint VALUES (4);
$$ LANGUAGE SQL;
SELECT insert_twice();
SELECT * FROM table_with_unique_constraint ORDER BY a;
DROP TABLE temp_table, test_parameterized_sql, table_with_unique_constraint;
-- clean-up functions
DROP FUNCTION sql_test_no_1();

View File

@ -98,6 +98,16 @@ ROLLBACK;
DROP SCHEMA s1 CASCADE;
-- error if inside a SQL UDF call
CREATE or replace FUNCTION test_reference_local_join_func()
RETURNS SETOF RECORD AS $$
SET LOCAL citus.enable_local_execution to false;
INSERT INTO numbers VALUES (2);
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
$$ LANGUAGE sql;
SELECT test_reference_local_join_func();
-- shouldn't plan locally if modifications happen in CTEs, ...
WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *)
SELECT * FROM numbers, local_table;