From 77fbcfaf14e68274ab8dfe0cb107a8c53ac875ed Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 10 Nov 2022 18:08:43 +0100 Subject: [PATCH] Propagate BEGIN properties to worker nodes (#6483) Co-authored-by: Marco Slot --- src/backend/distributed/commands/begin.c | 61 ++++++++ .../distributed/commands/utility_hook.c | 14 +- .../transaction/remote_transaction.c | 75 +++++++++- .../transaction/transaction_management.c | 9 ++ src/include/distributed/commands.h | 2 + .../distributed/transaction_management.h | 27 ++++ .../expected/propagate_set_commands.out | 136 ++++++++++++++++++ .../regress/sql/propagate_set_commands.sql | 72 ++++++++++ 8 files changed, 393 insertions(+), 3 deletions(-) create mode 100644 src/backend/distributed/commands/begin.c diff --git a/src/backend/distributed/commands/begin.c b/src/backend/distributed/commands/begin.c new file mode 100644 index 000000000..3ff28ac20 --- /dev/null +++ b/src/backend/distributed/commands/begin.c @@ -0,0 +1,61 @@ +/*------------------------------------------------------------------------- + * + * begin.c + * Processing of the BEGIN command. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "c.h" + +#include "distributed/commands.h" +#include "distributed/listutils.h" +#include "distributed/transaction_management.h" +#include "nodes/parsenodes.h" + + +/* + * SaveBeginCommandProperties stores the transaction properties passed + * via BEGIN. + */ +void +SaveBeginCommandProperties(TransactionStmt *transactionStmt) +{ + DefElem *item = NULL; + + /* + * This loop is similar to the one in standard_ProcessUtility. + * + * While BEGIN can be quite frequent it will rarely have options set. + */ + foreach_ptr(item, transactionStmt->options) + { + A_Const *constant = (A_Const *) item->arg; + + if (strcmp(item->defname, "transaction_read_only") == 0) + { + if (intVal(&constant->val) == 1) + { + BeginXactReadOnly = BeginXactReadOnly_Enabled; + } + else + { + BeginXactReadOnly = BeginXactReadOnly_Disabled; + } + } + else if (strcmp(item->defname, "transaction_deferrable") == 0) + { + if (intVal(&constant->val) == 1) + { + BeginXactDeferrable = BeginXactDeferrable_Enabled; + } + else + { + BeginXactDeferrable = BeginXactDeferrable_Disabled; + } + } + } +} diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 4cc635ca7..4aba31468 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -70,6 +70,7 @@ #include "distributed/reference_table_utils.h" #include "distributed/resource_lock.h" #include "distributed/string_utils.h" +#include "distributed/transaction_management.h" #include "distributed/version_compat.h" #include "distributed/worker_shard_visibility.h" #include "distributed/worker_transaction.h" @@ -93,7 +94,6 @@ static int activeAlterTables = 0; static int activeDropSchemaOrDBs = 0; static bool ConstraintDropped = false; - ProcessUtility_hook_type PrevProcessUtility = NULL; int UtilityHookLevel = 0; @@ -171,6 +171,18 @@ multi_ProcessUtility(PlannedStmt *pstmt, parsetree = pstmt->utilityStmt; + if (IsA(parsetree, TransactionStmt)) + { + TransactionStmt *transactionStmt = (TransactionStmt *) parsetree; + + if (context == PROCESS_UTILITY_TOPLEVEL && + (transactionStmt->kind == TRANS_STMT_BEGIN || + transactionStmt->kind == TRANS_STMT_START)) + { + SaveBeginCommandProperties(transactionStmt); + } + } + if (IsA(parsetree, TransactionStmt) || IsA(parsetree, ListenStmt) || IsA(parsetree, NotifyStmt) || diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index bf619ebc0..aff357fb3 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -39,6 +39,7 @@ #define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u" +static char * BeginTransactionCommand(void); static char * AssignDistributedTransactionIdCommand(void); static void StartRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactionId subId); @@ -56,6 +57,14 @@ static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection static void Assign2PCIdentifier(MultiConnection *connection); +static char *IsolationLevelName[] = { + "READ UNCOMMITTED", + "READ COMMITTED", + "REPEATABLE READ", + "SERIALIZABLE" +}; + + /* * StartRemoteTransactionBegin initiates beginning the remote transaction in * a non-blocking manner. The function sends "BEGIN" followed by @@ -82,7 +91,7 @@ StartRemoteTransactionBegin(struct MultiConnection *connection) * behaviour. */ appendStringInfoString(beginAndSetDistributedTransactionId, - "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;"); + BeginTransactionCommand()); /* append context for in-progress SAVEPOINTs for this transaction */ List *activeSubXacts = ActiveSubXactContexts(); @@ -151,7 +160,7 @@ BeginAndSetDistributedTransactionIdCommand(void) * behaviour. */ appendStringInfoString(beginAndSetDistributedTransactionId, - "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;"); + BeginTransactionCommand()); appendStringInfoString(beginAndSetDistributedTransactionId, AssignDistributedTransactionIdCommand()); @@ -160,6 +169,68 @@ BeginAndSetDistributedTransactionIdCommand(void) } +/* + * BeginTransactionCommand returns the BEGIN command to use for the current isolation + * level. + * + * Transactions have 3 properties that we care about here: + * - XactIsoLevel (isolation level) + * - XactDeferrable (deferrable) + * - XactReadOnly (read only) + * + * These properties can be set in several ways: + * - via BEGIN TRANSACTION ISOLATION LEVEL ... + * - via default_transaction_isolation, ... + * - via SET TRANSACTION .. (or plain SET transaction_isolation ...) + * + * We want to make sure that the properties that are passed to the worker nodes + * match the coordinator as much as possible. However, we do not want to waste + * bytes repeating the current values ad infinitum. + * + * The trade-off we make is that we send the isolation level in all cases, + * but only set deferrable and read-only if they were explicitly specified + * in the BEGIN by the user. The implication is that we may not follow the + * default_transaction_* settings on the coordinator if they differ on the + * worker. + */ +static char * +BeginTransactionCommand(void) +{ + StringInfo beginCommand = makeStringInfo(); + + /* + * XactIsoLevel can only be set at the start of the transaction, before the + * first query. Since Citus does not send BEGIN until the first query, we + * can simply use the current values, and they will match the values for the + * outer transaction after any BEGIN and SET TRANSACTION that may have occurred. + */ + appendStringInfo(beginCommand, "BEGIN TRANSACTION ISOLATION LEVEL %s", + IsolationLevelName[XactIsoLevel]); + + if (BeginXactDeferrable == BeginXactDeferrable_Enabled) + { + appendStringInfoString(beginCommand, " DEFERRABLE"); + } + else if (BeginXactDeferrable == BeginXactDeferrable_Disabled) + { + appendStringInfoString(beginCommand, " NOT DEFERRABLE"); + } + + if (BeginXactReadOnly == BeginXactReadOnly_Enabled) + { + appendStringInfoString(beginCommand, " READ ONLY"); + } + else if (BeginXactReadOnly == BeginXactReadOnly_Disabled) + { + appendStringInfoString(beginCommand, " READ WRITE"); + } + + appendStringInfoChar(beginCommand, ';'); + + return beginCommand->data; +} + + /* * AssignDistributedTransactionIdCommand returns a command to set the local * distributed transaction ID on a remote transaction. diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 332298e3a..0f4c3c80a 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -125,6 +125,13 @@ bool FunctionOpensTransactionBlock = true; /* if true, we should trigger node metadata sync on commit */ bool NodeMetadataSyncOnCommit = false; +/* + * In an explicit BEGIN ...; we keep track of top-level transaction characteristics + * specified by the user. + */ +BeginXactReadOnlyState BeginXactReadOnly = BeginXactReadOnly_NotSet; +BeginXactDeferrableState BeginXactDeferrable = BeginXactDeferrable_NotSet; + /* transaction management functions */ static void CoordinatedTransactionCallback(XactEvent event, void *arg); @@ -608,6 +615,8 @@ ResetGlobalVariables() InTopLevelDelegatedFunctionCall = false; InTableTypeConversionFunctionCall = false; CurrentOperationId = INVALID_OPERATION_ID; + BeginXactReadOnly = BeginXactReadOnly_NotSet; + BeginXactDeferrable = BeginXactDeferrable_NotSet; ResetWorkerErrorIndication(); memset(&AllowedDistributionColumnValue, 0, sizeof(AllowedDistributionColumn)); diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index d947955f0..c09f077dd 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -153,6 +153,8 @@ typedef enum SearchForeignKeyColumnFlags /* callers can also pass union of above flags */ } SearchForeignKeyColumnFlags; +/* begin.c - forward declarations */ +extern void SaveBeginCommandProperties(TransactionStmt *transactionStmt); /* cluster.c - forward declarations */ extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand, diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index d7a008054..e2d35048a 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -9,6 +9,7 @@ #ifndef TRANSACTION_MANAGMENT_H #define TRANSACTION_MANAGMENT_H +#include "access/xact.h" #include "lib/ilist.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" @@ -75,6 +76,28 @@ typedef struct AllowedDistributionColumn int executorLevel; } AllowedDistributionColumn; +/* + * BeginXactDeferrableState reflects the value of the DEFERRABLE property + * in the BEGIN of a transaction block. + */ +typedef enum BeginXactDeferrableState +{ + BeginXactDeferrable_NotSet, + BeginXactDeferrable_Disabled, + BeginXactDeferrable_Enabled, +} BeginXactDeferrableState; + +/* + * BeginXactReadOnlyState reflects the value of the READ ONLY property + * in the BEGIN of a transaction block. + */ +typedef enum BeginXactReadOnlyState +{ + BeginXactReadOnly_NotSet, + BeginXactReadOnly_Disabled, + BeginXactReadOnly_Enabled, +} BeginXactReadOnlyState; + /* * The current distribution column value passed as an argument to a forced * function call delegation. @@ -119,6 +142,10 @@ extern StringInfo activeSetStmts; /* did current transaction modify pg_dist_node? */ extern bool TransactionModifiedNodeMetadata; +/* after an explicit BEGIN, keep track of top-level transaction characteristics */ +extern BeginXactReadOnlyState BeginXactReadOnly; +extern BeginXactDeferrableState BeginXactDeferrable; + /* * Coordinated transaction management. */ diff --git a/src/test/regress/expected/propagate_set_commands.out b/src/test/regress/expected/propagate_set_commands.out index 4a3731f7d..fbfb370c1 100644 --- a/src/test/regress/expected/propagate_set_commands.out +++ b/src/test/regress/expected/propagate_set_commands.out @@ -31,6 +31,142 @@ SET TRANSACTION READ ONLY; INSERT INTO test VALUES (2,2); ERROR: cannot execute INSERT in a read-only transaction END; +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; +-- should reflect isolation level of current transaction +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + read committed +(1 row) + +END; +START TRANSACTION ISOLATION LEVEL REPEATABLE READ; +-- should reflect isolation level of current transaction +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + repeatable read +(1 row) + +END; +BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; +-- should reflect isolation level of current transaction +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + serializable +(1 row) + +END; +BEGIN READ ONLY; +-- should reflect read-only status of current transaction +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + on +(1 row) + +END; +BEGIN READ WRITE; +-- should reflect read-only status of current transaction +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + off +(1 row) + +SET TRANSACTION READ ONLY; +-- should reflect writable status of current transaction +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + on +(1 row) + +END; +BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE; +-- should reflect deferrable status of the current transaction +SELECT current_setting('transaction_deferrable') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + on +(1 row) + +END; +BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE READ ONLY; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + on +(1 row) + +SELECT current_setting('transaction_deferrable') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + on +(1 row) + +END; +-- postgres warns against, but does not disallow multiple BEGIN +BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; +WARNING: there is already a transaction in progress +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + read committed +(1 row) + +-- but not after a query (SET TRANSACTION error is consistent with postgres) +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; +WARNING: there is already a transaction in progress +ERROR: SET TRANSACTION ISOLATION LEVEL must be called before any query +END; +BEGIN READ WRITE; +SAVEPOINT goback; +SET TRANSACTION READ ONLY; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + on +(1 row) + +ROLLBACK TO SAVEPOINT goback; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + off +(1 row) + +END; +SET default_transaction_isolation TO 'repeatable read'; +BEGIN; +-- should reflect isolation level of local session +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + repeatable read +(1 row) + +END; +-- SET is not propagated and plain SELECT does not use transaction blocks +SELECT DISTINCT current_setting('transaction_isolation') FROM test; + current_setting +--------------------------------------------------------------------- + read committed +(1 row) + +-- the CTE will trigger transaction blocks +WITH cte AS MATERIALIZED ( + SELECT DISTINCT current_setting('transaction_isolation') iso FROM test +) +SELECT iso FROM cte; + iso +--------------------------------------------------------------------- + repeatable read +(1 row) + +RESET default_transaction_isolation; BEGIN; SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; -- should reflect new isolation level diff --git a/src/test/regress/sql/propagate_set_commands.sql b/src/test/regress/sql/propagate_set_commands.sql index 68b13547b..e30339d06 100644 --- a/src/test/regress/sql/propagate_set_commands.sql +++ b/src/test/regress/sql/propagate_set_commands.sql @@ -23,6 +23,78 @@ SET TRANSACTION READ ONLY; INSERT INTO test VALUES (2,2); END; +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; +-- should reflect isolation level of current transaction +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; +END; + +START TRANSACTION ISOLATION LEVEL REPEATABLE READ; +-- should reflect isolation level of current transaction +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; +END; + +BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; +-- should reflect isolation level of current transaction +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; +END; + +BEGIN READ ONLY; +-- should reflect read-only status of current transaction +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; +END; + +BEGIN READ WRITE; +-- should reflect read-only status of current transaction +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; +SET TRANSACTION READ ONLY; +-- should reflect writable status of current transaction +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; +END; + +BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE; +-- should reflect deferrable status of the current transaction +SELECT current_setting('transaction_deferrable') FROM test WHERE id = 1; +END; + +BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE READ ONLY; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; +SELECT current_setting('transaction_deferrable') FROM test WHERE id = 1; +END; + +-- postgres warns against, but does not disallow multiple BEGIN +BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; +-- but not after a query (SET TRANSACTION error is consistent with postgres) +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; +END; + +BEGIN READ WRITE; +SAVEPOINT goback; +SET TRANSACTION READ ONLY; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; +ROLLBACK TO SAVEPOINT goback; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; +END; + +SET default_transaction_isolation TO 'repeatable read'; + +BEGIN; +-- should reflect isolation level of local session +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; +END; + +-- SET is not propagated and plain SELECT does not use transaction blocks +SELECT DISTINCT current_setting('transaction_isolation') FROM test; + +-- the CTE will trigger transaction blocks +WITH cte AS MATERIALIZED ( + SELECT DISTINCT current_setting('transaction_isolation') iso FROM test +) +SELECT iso FROM cte; + +RESET default_transaction_isolation; + BEGIN; SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; -- should reflect new isolation level