Detect SQL UDF Calls.

release-9.1
Hadi Moshayedi 2019-12-02 17:40:37 -08:00 committed by Marco Slot
parent 671e06feb0
commit 37b3ac3773
8 changed files with 123 additions and 55 deletions

View File

@ -57,6 +57,13 @@ bool WritableStandbyCoordinator = false;
/* sort the returning to get consistent outputs, used only for testing */
bool SortReturning = false;
/*
* How many nested executors have we started? This can happen for SQL
* UDF calls. The outer query starts an executor, then postgres opens
* another executor to run the SQL UDF.
*/
int ExecutorLevel = 0;
/* local function forward declarations */
static Relation StubRelation(TupleDesc tupleDescriptor);
@ -72,23 +79,6 @@ CitusExecutorStart(QueryDesc *queryDesc, int eflags)
{
PlannedStmt *plannedStmt = queryDesc->plannedstmt;
if (CitusHasBeenLoaded())
{
if (IsLocalReferenceTableJoinPlan(plannedStmt) &&
IsMultiStatementTransaction())
{
/*
* Currently we don't support this to avoid problems with tuple
* visibility, locking, etc. For example, change to the reference
* table can go through a MultiConnection, which won't be visible
* to the locally planned queries.
*/
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot join local tables and reference tables in "
"a transaction block")));
}
}
/*
* We cannot modify XactReadOnly on Windows because it is not
* declared with PGDLLIMPORT.
@ -132,16 +122,25 @@ CitusExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count, bool execute_once)
{
DestReceiver *dest = queryDesc->dest;
int originalLevel = FunctionCallLevel;
int originalLevel = ExecutorLevel;
if (dest->mydest == DestSPI)
ExecutorLevel++;
if (CitusHasBeenLoaded())
{
/*
* If the query runs via SPI, we assume we're in a function call
* and we should treat statements as part of a bigger transaction.
* We reset this counter to 0 in the abort handler.
*/
FunctionCallLevel++;
if (IsLocalReferenceTableJoinPlan(queryDesc->plannedstmt) &&
IsMultiStatementTransaction())
{
/*
* Currently we don't support this to avoid problems with tuple
* visibility, locking, etc. For example, change to the reference
* table can go through a MultiConnection, which won't be visible
* to the locally planned queries.
*/
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot join local tables and reference tables in "
"a transaction block, udf block, or distributed "
"CTE subquery")));
}
}
/*
@ -176,15 +175,11 @@ CitusExecutorRun(QueryDesc *queryDesc,
standard_ExecutorRun(queryDesc, direction, count, execute_once);
}
if (dest->mydest == DestSPI)
{
/*
* Restore the original value. It is not sufficient to decrease
* the value because exceptions might cause us to go back a few
* levels at once.
*/
FunctionCallLevel = originalLevel;
}
/*
* Restore the original value. It is not sufficient to decrease the value
* because exceptions might cause us to go back a few levels at once.
*/
ExecutorLevel = originalLevel;
}

View File

@ -94,9 +94,6 @@ bool CoordinatedTransactionUses2PC = false;
/* if disabled, distributed statements in a function may run as separate transactions */
bool FunctionOpensTransactionBlock = true;
/* stack depth of UDF calls */
int FunctionCallLevel = 0;
/* transaction management functions */
static void BeginCoordinatedTransaction(void);
@ -110,6 +107,7 @@ static void AdjustMaxPreparedTransactions(void);
static void PushSubXact(SubTransactionId subId);
static void PopSubXact(SubTransactionId subId);
static void SwallowErrors(void (*func)());
static bool MaybeExecutingUDF(void);
/*
@ -303,7 +301,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
dlist_init(&InProgressTransactions);
activeSetStmts = NULL;
CoordinatedTransactionUses2PC = false;
FunctionCallLevel = 0;
ExecutorLevel = 0;
/*
* We should reset SubPlanLevel in case a transaction is aborted,
@ -681,7 +679,7 @@ IsMultiStatementTransaction(void)
/* in (a transaction within) a stored procedure */
return true;
}
else if (FunctionCallLevel > 0 && FunctionOpensTransactionBlock)
else if (MaybeExecutingUDF() && FunctionOpensTransactionBlock)
{
/* in a language-handler function call, open a transaction if configured to do so */
return true;
@ -691,3 +689,15 @@ IsMultiStatementTransaction(void)
return false;
}
}
/*
* MaybeExecutingUDF returns true if we are possibly executing a function call.
* We use nested level of executor to check this, so this can return true for
* CTEs, etc. which also start nested executors.
*/
static bool
MaybeExecutingUDF(void)
{
return ExecutorLevel > 1;
}

View File

@ -32,6 +32,7 @@ extern bool ForceMaxQueryParallelization;
extern int MaxAdaptiveExecutorPoolSize;
extern int ExecutorSlowStartInterval;
extern bool SortReturning;
extern int ExecutorLevel;
extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags);

View File

@ -100,9 +100,6 @@ extern int StoredProcedureLevel;
/* number of nested DO block levels we are currently in */
extern int DoBlockLevel;
/* number of nested function call levels we are currently in */
extern int FunctionCallLevel;
/* SET LOCAL statements active in the current (sub-)transaction. */
extern StringInfo activeSetStmts;

View File

@ -314,7 +314,7 @@ $$ LANGUAGE SQL STABLE;
CREATE OR REPLACE FUNCTION test_parameterized_sql_function_in_subquery_where(org_id_val integer)
RETURNS TABLE (a bigint)
AS $$
SELECT count(*) AS count_val from test_parameterized_sql as t1 where
SELECT count(*) AS count_val from test_parameterized_sql as t1 where
org_id IN (SELECT org_id FROM test_parameterized_sql as t2 WHERE t2.org_id = t1.org_id AND org_id = org_id_val);
$$ LANGUAGE SQL STABLE;
INSERT INTO test_parameterized_sql VALUES(1, 1);
@ -332,8 +332,36 @@ ERROR: could not create distributed plan
DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus.
HINT: Consider using PL/pgSQL functions instead.
CONTEXT: SQL function "test_parameterized_sql_function_in_subquery_where" statement 1
DROP TABLE temp_table;
DROP TABLE test_parameterized_sql;
-- test that sql function calls are treated as multi-statement transactions
-- and are rolled back properly. Single-row inserts for not-replicated tables
-- don't go over 2PC if they are not part of a bigger transaction.
CREATE TABLE table_with_unique_constraint (a int UNIQUE);
SELECT create_distributed_table('table_with_unique_constraint', 'a');
create_distributed_table
--------------------------
(1 row)
INSERT INTO table_with_unique_constraint VALUES (1), (2), (3);
CREATE OR REPLACE FUNCTION insert_twice() RETURNS VOID
AS $$
INSERT INTO table_with_unique_constraint VALUES (4);
INSERT INTO table_with_unique_constraint VALUES (4);
$$ LANGUAGE SQL;
SELECT insert_twice();
ERROR: duplicate key value violates unique constraint "table_with_unique_constraint_a_key_1230009"
DETAIL: Key (a)=(4) already exists.
CONTEXT: while executing command on localhost:57638
SQL function "insert_twice" statement 2
SELECT * FROM table_with_unique_constraint ORDER BY a;
a
---
1
2
3
(3 rows)
DROP TABLE temp_table, test_parameterized_sql, table_with_unique_constraint;
-- clean-up functions
DROP FUNCTION sql_test_no_1();
DROP FUNCTION sql_test_no_2();

View File

@ -116,7 +116,7 @@ SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1
-- error if in transaction block
BEGIN;
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
ERROR: cannot join local tables and reference tables in a transaction block
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
ROLLBACK;
-- error if in a transaction block even if reference table is not in search path
CREATE SCHEMA s1;
@ -129,34 +129,44 @@ SELECT create_reference_table('s1.ref');
BEGIN;
SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1;
ERROR: cannot join local tables and reference tables in a transaction block
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
ROLLBACK;
BEGIN;
WITH t1 AS (
SELECT random() r, a FROM local_table
) SELECT count(*) FROM t1, numbers WHERE t1.a = numbers.a AND r < 0.5;
ERROR: cannot join local tables and reference tables in a transaction block
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
END;
BEGIN;
WITH t1 AS (
SELECT random() r, a FROM numbers
) SELECT count(*) FROM t1, local_table WHERE t1.a = local_table.a AND r < 0.5;
ERROR: cannot join local tables and reference tables in a transaction block
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
END;
BEGIN;
SELECT count(*) FROM local_table
WHERE EXISTS(SELECT random() FROM numbers WHERE local_table.a = numbers.a);
ERROR: cannot join local tables and reference tables in a transaction block
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
END;
BEGIN;
SELECT count(*) FROM numbers
WHERE EXISTS(SELECT random() FROM local_table WHERE local_table.a = numbers.a);
ERROR: cannot join local tables and reference tables in a transaction block
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
END;
DROP SCHEMA s1 CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table s1.ref
drop cascades to table s1.ref_8000002
-- error if inside a SQL UDF call
CREATE or replace FUNCTION test_reference_local_join_func()
RETURNS SETOF RECORD AS $$
SET LOCAL citus.enable_local_execution to false;
INSERT INTO numbers VALUES (2);
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
$$ LANGUAGE sql;
SELECT test_reference_local_join_func();
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
CONTEXT: SQL function "test_reference_local_join_func" statement 3
-- shouldn't plan locally if modifications happen in CTEs, ...
WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) SELECT * FROM numbers, local_table;
ERROR: relation local_table is not distributed
@ -210,7 +220,7 @@ $Q$);
coordinator_plan
------------------------------------------------
Custom Scan (Citus Adaptive)
-> Distributed Subplan 19_1
-> Distributed Subplan 20_1
-> Seq Scan on local_table
Filter: ((a >= 1) AND (a <= 10))
Task Count: 1
@ -235,7 +245,7 @@ $Q$);
BEGIN;
SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
ERROR: cannot join local tables and reference tables in a transaction block
ERROR: cannot join local tables and reference tables in a transaction block, udf block, or distributed CTE subquery
END;
--
-- Joins between reference tables, local tables, and function calls shouldn't

View File

@ -149,8 +149,25 @@ SELECT * FROM test_parameterized_sql_function(1);
SELECT test_parameterized_sql_function(1);
SELECT test_parameterized_sql_function_in_subquery_where(1);
DROP TABLE temp_table;
DROP TABLE test_parameterized_sql;
-- test that sql function calls are treated as multi-statement transactions
-- and are rolled back properly. Single-row inserts for not-replicated tables
-- don't go over 2PC if they are not part of a bigger transaction.
CREATE TABLE table_with_unique_constraint (a int UNIQUE);
SELECT create_distributed_table('table_with_unique_constraint', 'a');
INSERT INTO table_with_unique_constraint VALUES (1), (2), (3);
CREATE OR REPLACE FUNCTION insert_twice() RETURNS VOID
AS $$
INSERT INTO table_with_unique_constraint VALUES (4);
INSERT INTO table_with_unique_constraint VALUES (4);
$$ LANGUAGE SQL;
SELECT insert_twice();
SELECT * FROM table_with_unique_constraint ORDER BY a;
DROP TABLE temp_table, test_parameterized_sql, table_with_unique_constraint;
-- clean-up functions
DROP FUNCTION sql_test_no_1();

View File

@ -86,6 +86,16 @@ END;
DROP SCHEMA s1 CASCADE;
-- error if inside a SQL UDF call
CREATE or replace FUNCTION test_reference_local_join_func()
RETURNS SETOF RECORD AS $$
SET LOCAL citus.enable_local_execution to false;
INSERT INTO numbers VALUES (2);
SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
$$ LANGUAGE sql;
SELECT test_reference_local_join_func();
-- shouldn't plan locally if modifications happen in CTEs, ...
WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *) SELECT * FROM numbers, local_table;
WITH t AS (SELECT *, random() x FROM numbers FOR UPDATE) SELECT * FROM numbers, local_table