Propagate BEGIN properties to worker nodes (#6483)

Co-authored-by: Marco Slot <marco.slot@gmail.com>
pull/6481/head
Marco Slot 2022-11-10 18:08:43 +01:00 committed by GitHub
parent d7d5f0df3e
commit 77fbcfaf14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 393 additions and 3 deletions

View File

@ -0,0 +1,61 @@
/*-------------------------------------------------------------------------
*
* begin.c
* Processing of the BEGIN command.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "c.h"
#include "distributed/commands.h"
#include "distributed/listutils.h"
#include "distributed/transaction_management.h"
#include "nodes/parsenodes.h"
/*
* SaveBeginCommandProperties stores the transaction properties passed
* via BEGIN.
*/
void
SaveBeginCommandProperties(TransactionStmt *transactionStmt)
{
DefElem *item = NULL;
/*
* This loop is similar to the one in standard_ProcessUtility.
*
* While BEGIN can be quite frequent it will rarely have options set.
*/
foreach_ptr(item, transactionStmt->options)
{
A_Const *constant = (A_Const *) item->arg;
if (strcmp(item->defname, "transaction_read_only") == 0)
{
if (intVal(&constant->val) == 1)
{
BeginXactReadOnly = BeginXactReadOnly_Enabled;
}
else
{
BeginXactReadOnly = BeginXactReadOnly_Disabled;
}
}
else if (strcmp(item->defname, "transaction_deferrable") == 0)
{
if (intVal(&constant->val) == 1)
{
BeginXactDeferrable = BeginXactDeferrable_Enabled;
}
else
{
BeginXactDeferrable = BeginXactDeferrable_Disabled;
}
}
}
}

View File

@ -70,6 +70,7 @@
#include "distributed/reference_table_utils.h"
#include "distributed/resource_lock.h"
#include "distributed/string_utils.h"
#include "distributed/transaction_management.h"
#include "distributed/version_compat.h"
#include "distributed/worker_shard_visibility.h"
#include "distributed/worker_transaction.h"
@ -93,7 +94,6 @@ static int activeAlterTables = 0;
static int activeDropSchemaOrDBs = 0;
static bool ConstraintDropped = false;
ProcessUtility_hook_type PrevProcessUtility = NULL;
int UtilityHookLevel = 0;
@ -171,6 +171,18 @@ multi_ProcessUtility(PlannedStmt *pstmt,
parsetree = pstmt->utilityStmt;
if (IsA(parsetree, TransactionStmt))
{
TransactionStmt *transactionStmt = (TransactionStmt *) parsetree;
if (context == PROCESS_UTILITY_TOPLEVEL &&
(transactionStmt->kind == TRANS_STMT_BEGIN ||
transactionStmt->kind == TRANS_STMT_START))
{
SaveBeginCommandProperties(transactionStmt);
}
}
if (IsA(parsetree, TransactionStmt) ||
IsA(parsetree, ListenStmt) ||
IsA(parsetree, NotifyStmt) ||

View File

@ -39,6 +39,7 @@
#define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u"
static char * BeginTransactionCommand(void);
static char * AssignDistributedTransactionIdCommand(void);
static void StartRemoteTransactionSavepointBegin(MultiConnection *connection,
SubTransactionId subId);
@ -56,6 +57,14 @@ static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection
static void Assign2PCIdentifier(MultiConnection *connection);
static char *IsolationLevelName[] = {
"READ UNCOMMITTED",
"READ COMMITTED",
"REPEATABLE READ",
"SERIALIZABLE"
};
/*
* StartRemoteTransactionBegin initiates beginning the remote transaction in
* a non-blocking manner. The function sends "BEGIN" followed by
@ -82,7 +91,7 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
* behaviour.
*/
appendStringInfoString(beginAndSetDistributedTransactionId,
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
BeginTransactionCommand());
/* append context for in-progress SAVEPOINTs for this transaction */
List *activeSubXacts = ActiveSubXactContexts();
@ -151,7 +160,7 @@ BeginAndSetDistributedTransactionIdCommand(void)
* behaviour.
*/
appendStringInfoString(beginAndSetDistributedTransactionId,
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
BeginTransactionCommand());
appendStringInfoString(beginAndSetDistributedTransactionId,
AssignDistributedTransactionIdCommand());
@ -160,6 +169,68 @@ BeginAndSetDistributedTransactionIdCommand(void)
}
/*
* BeginTransactionCommand returns the BEGIN command to use for the current isolation
* level.
*
* Transactions have 3 properties that we care about here:
* - XactIsoLevel (isolation level)
* - XactDeferrable (deferrable)
* - XactReadOnly (read only)
*
* These properties can be set in several ways:
* - via BEGIN TRANSACTION ISOLATION LEVEL ...
* - via default_transaction_isolation, ...
* - via SET TRANSACTION .. (or plain SET transaction_isolation ...)
*
* We want to make sure that the properties that are passed to the worker nodes
* match the coordinator as much as possible. However, we do not want to waste
* bytes repeating the current values ad infinitum.
*
* The trade-off we make is that we send the isolation level in all cases,
* but only set deferrable and read-only if they were explicitly specified
* in the BEGIN by the user. The implication is that we may not follow the
* default_transaction_* settings on the coordinator if they differ on the
* worker.
*/
static char *
BeginTransactionCommand(void)
{
StringInfo beginCommand = makeStringInfo();
/*
* XactIsoLevel can only be set at the start of the transaction, before the
* first query. Since Citus does not send BEGIN until the first query, we
* can simply use the current values, and they will match the values for the
* outer transaction after any BEGIN and SET TRANSACTION that may have occurred.
*/
appendStringInfo(beginCommand, "BEGIN TRANSACTION ISOLATION LEVEL %s",
IsolationLevelName[XactIsoLevel]);
if (BeginXactDeferrable == BeginXactDeferrable_Enabled)
{
appendStringInfoString(beginCommand, " DEFERRABLE");
}
else if (BeginXactDeferrable == BeginXactDeferrable_Disabled)
{
appendStringInfoString(beginCommand, " NOT DEFERRABLE");
}
if (BeginXactReadOnly == BeginXactReadOnly_Enabled)
{
appendStringInfoString(beginCommand, " READ ONLY");
}
else if (BeginXactReadOnly == BeginXactReadOnly_Disabled)
{
appendStringInfoString(beginCommand, " READ WRITE");
}
appendStringInfoChar(beginCommand, ';');
return beginCommand->data;
}
/*
* AssignDistributedTransactionIdCommand returns a command to set the local
* distributed transaction ID on a remote transaction.

View File

@ -125,6 +125,13 @@ bool FunctionOpensTransactionBlock = true;
/* if true, we should trigger node metadata sync on commit */
bool NodeMetadataSyncOnCommit = false;
/*
* In an explicit BEGIN ...; we keep track of top-level transaction characteristics
* specified by the user.
*/
BeginXactReadOnlyState BeginXactReadOnly = BeginXactReadOnly_NotSet;
BeginXactDeferrableState BeginXactDeferrable = BeginXactDeferrable_NotSet;
/* transaction management functions */
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
@ -608,6 +615,8 @@ ResetGlobalVariables()
InTopLevelDelegatedFunctionCall = false;
InTableTypeConversionFunctionCall = false;
CurrentOperationId = INVALID_OPERATION_ID;
BeginXactReadOnly = BeginXactReadOnly_NotSet;
BeginXactDeferrable = BeginXactDeferrable_NotSet;
ResetWorkerErrorIndication();
memset(&AllowedDistributionColumnValue, 0,
sizeof(AllowedDistributionColumn));

View File

@ -153,6 +153,8 @@ typedef enum SearchForeignKeyColumnFlags
/* callers can also pass union of above flags */
} SearchForeignKeyColumnFlags;
/* begin.c - forward declarations */
extern void SaveBeginCommandProperties(TransactionStmt *transactionStmt);
/* cluster.c - forward declarations */
extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand,

View File

@ -9,6 +9,7 @@
#ifndef TRANSACTION_MANAGMENT_H
#define TRANSACTION_MANAGMENT_H
#include "access/xact.h"
#include "lib/ilist.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
@ -75,6 +76,28 @@ typedef struct AllowedDistributionColumn
int executorLevel;
} AllowedDistributionColumn;
/*
* BeginXactDeferrableState reflects the value of the DEFERRABLE property
* in the BEGIN of a transaction block.
*/
typedef enum BeginXactDeferrableState
{
BeginXactDeferrable_NotSet,
BeginXactDeferrable_Disabled,
BeginXactDeferrable_Enabled,
} BeginXactDeferrableState;
/*
* BeginXactReadOnlyState reflects the value of the READ ONLY property
* in the BEGIN of a transaction block.
*/
typedef enum BeginXactReadOnlyState
{
BeginXactReadOnly_NotSet,
BeginXactReadOnly_Disabled,
BeginXactReadOnly_Enabled,
} BeginXactReadOnlyState;
/*
* The current distribution column value passed as an argument to a forced
* function call delegation.
@ -119,6 +142,10 @@ extern StringInfo activeSetStmts;
/* did current transaction modify pg_dist_node? */
extern bool TransactionModifiedNodeMetadata;
/* after an explicit BEGIN, keep track of top-level transaction characteristics */
extern BeginXactReadOnlyState BeginXactReadOnly;
extern BeginXactDeferrableState BeginXactDeferrable;
/*
* Coordinated transaction management.
*/

View File

@ -31,6 +31,142 @@ SET TRANSACTION READ ONLY;
INSERT INTO test VALUES (2,2);
ERROR: cannot execute INSERT in a read-only transaction
END;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- should reflect isolation level of current transaction
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
read committed
(1 row)
END;
START TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- should reflect isolation level of current transaction
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
repeatable read
(1 row)
END;
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- should reflect isolation level of current transaction
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
serializable
(1 row)
END;
BEGIN READ ONLY;
-- should reflect read-only status of current transaction
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
on
(1 row)
END;
BEGIN READ WRITE;
-- should reflect read-only status of current transaction
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
off
(1 row)
SET TRANSACTION READ ONLY;
-- should reflect writable status of current transaction
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
on
(1 row)
END;
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE;
-- should reflect deferrable status of the current transaction
SELECT current_setting('transaction_deferrable') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
on
(1 row)
END;
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE READ ONLY;
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
on
(1 row)
SELECT current_setting('transaction_deferrable') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
on
(1 row)
END;
-- postgres warns against, but does not disallow multiple BEGIN
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
WARNING: there is already a transaction in progress
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
read committed
(1 row)
-- but not after a query (SET TRANSACTION error is consistent with postgres)
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
WARNING: there is already a transaction in progress
ERROR: SET TRANSACTION ISOLATION LEVEL must be called before any query
END;
BEGIN READ WRITE;
SAVEPOINT goback;
SET TRANSACTION READ ONLY;
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
on
(1 row)
ROLLBACK TO SAVEPOINT goback;
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
off
(1 row)
END;
SET default_transaction_isolation TO 'repeatable read';
BEGIN;
-- should reflect isolation level of local session
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
repeatable read
(1 row)
END;
-- SET is not propagated and plain SELECT does not use transaction blocks
SELECT DISTINCT current_setting('transaction_isolation') FROM test;
current_setting
---------------------------------------------------------------------
read committed
(1 row)
-- the CTE will trigger transaction blocks
WITH cte AS MATERIALIZED (
SELECT DISTINCT current_setting('transaction_isolation') iso FROM test
)
SELECT iso FROM cte;
iso
---------------------------------------------------------------------
repeatable read
(1 row)
RESET default_transaction_isolation;
BEGIN;
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- should reflect new isolation level

View File

@ -23,6 +23,78 @@ SET TRANSACTION READ ONLY;
INSERT INTO test VALUES (2,2);
END;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- should reflect isolation level of current transaction
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
END;
START TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- should reflect isolation level of current transaction
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
END;
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- should reflect isolation level of current transaction
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
END;
BEGIN READ ONLY;
-- should reflect read-only status of current transaction
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
END;
BEGIN READ WRITE;
-- should reflect read-only status of current transaction
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
SET TRANSACTION READ ONLY;
-- should reflect writable status of current transaction
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
END;
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE;
-- should reflect deferrable status of the current transaction
SELECT current_setting('transaction_deferrable') FROM test WHERE id = 1;
END;
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE READ ONLY;
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
SELECT current_setting('transaction_deferrable') FROM test WHERE id = 1;
END;
-- postgres warns against, but does not disallow multiple BEGIN
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
-- but not after a query (SET TRANSACTION error is consistent with postgres)
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
END;
BEGIN READ WRITE;
SAVEPOINT goback;
SET TRANSACTION READ ONLY;
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
ROLLBACK TO SAVEPOINT goback;
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
END;
SET default_transaction_isolation TO 'repeatable read';
BEGIN;
-- should reflect isolation level of local session
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
END;
-- SET is not propagated and plain SELECT does not use transaction blocks
SELECT DISTINCT current_setting('transaction_isolation') FROM test;
-- the CTE will trigger transaction blocks
WITH cte AS MATERIALIZED (
SELECT DISTINCT current_setting('transaction_isolation') iso FROM test
)
SELECT iso FROM cte;
RESET default_transaction_isolation;
BEGIN;
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- should reflect new isolation level