mirror of https://github.com/citusdata/citus.git
Propagate SET TRANSACTION commands
parent
2108410a40
commit
2eef71ccab
|
@ -387,7 +387,10 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
|||
parsetree = ProcessCreateSubscriptionStmt(createSubStmt);
|
||||
}
|
||||
|
||||
/* process SET LOCAL stmts of allowed GUCs in multi-stmt xacts */
|
||||
/*
|
||||
* Process SET LOCAL and SET TRANSACTION statements in multi-statement
|
||||
* transactions.
|
||||
*/
|
||||
if (IsA(parsetree, VariableSetStmt))
|
||||
{
|
||||
VariableSetStmt *setStmt = (VariableSetStmt *) parsetree;
|
||||
|
|
|
@ -35,6 +35,7 @@ static bool IsSettingSafeToPropagate(char *name);
|
|||
*
|
||||
* We currently propagate:
|
||||
* - SET LOCAL (for allowed settings)
|
||||
* - SET TRANSACTION
|
||||
* - RESET (for allowed settings)
|
||||
* - RESET ALL
|
||||
*/
|
||||
|
@ -72,8 +73,8 @@ ShouldPropagateSetCommand(VariableSetStmt *setStmt)
|
|||
case VAR_SET_MULTI:
|
||||
default:
|
||||
{
|
||||
/* SET (LOCAL) TRANSACTION should be handled locally */
|
||||
return false;
|
||||
/* SET TRANSACTION is similar to SET LOCAL */
|
||||
return strcmp(setStmt->name, "TRANSACTION") == 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -121,7 +122,7 @@ PostprocessVariableSetStmt(VariableSetStmt *setStmt, const char *setStmtString)
|
|||
const bool raiseInterrupts = true;
|
||||
List *connectionList = NIL;
|
||||
|
||||
/* at present we only support SET LOCAL */
|
||||
/* at present we only support SET LOCAL and SET TRANSACTION */
|
||||
AssertArg(ShouldPropagateSetCommand(setStmt));
|
||||
|
||||
/* haven't seen any SET stmts so far in this (sub-)xact: initialize StringInfo */
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
#define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u"
|
||||
|
||||
|
||||
static char * AssignDistributedTransactionIdCommand(void);
|
||||
static void StartRemoteTransactionSavepointBegin(MultiConnection *connection,
|
||||
SubTransactionId subId);
|
||||
static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection,
|
||||
|
@ -68,8 +69,15 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
|
|||
|
||||
transaction->transactionState = REMOTE_TRANS_STARTING;
|
||||
|
||||
StringInfo beginAndSetDistributedTransactionId =
|
||||
BeginAndSetDistributedTransactionIdCommand();
|
||||
StringInfo beginAndSetDistributedTransactionId = makeStringInfo();
|
||||
|
||||
/*
|
||||
* Explicitly specify READ COMMITTED, the default on the remote
|
||||
* side might have been changed, and that would cause problematic
|
||||
* behaviour.
|
||||
*/
|
||||
appendStringInfoString(beginAndSetDistributedTransactionId,
|
||||
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
|
||||
|
||||
/* append context for in-progress SAVEPOINTs for this transaction */
|
||||
List *activeSubXacts = ActiveSubXactContexts();
|
||||
|
@ -98,6 +106,10 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
|
|||
appendStringInfoString(beginAndSetDistributedTransactionId, activeSetStmts->data);
|
||||
}
|
||||
|
||||
/* add SELECT assign_distributed_transaction_id ... */
|
||||
appendStringInfoString(beginAndSetDistributedTransactionId,
|
||||
AssignDistributedTransactionIdCommand());
|
||||
|
||||
if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data))
|
||||
{
|
||||
const bool raiseErrors = true;
|
||||
|
@ -126,6 +138,22 @@ BeginAndSetDistributedTransactionIdCommand(void)
|
|||
appendStringInfoString(beginAndSetDistributedTransactionId,
|
||||
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
|
||||
|
||||
appendStringInfoString(beginAndSetDistributedTransactionId,
|
||||
AssignDistributedTransactionIdCommand());
|
||||
|
||||
return beginAndSetDistributedTransactionId;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AssignDistributedTransactionIdCommand returns a command to set the local
|
||||
* distributed transaction ID on a remote transaction.
|
||||
*/
|
||||
static char *
|
||||
AssignDistributedTransactionIdCommand(void)
|
||||
{
|
||||
StringInfo assignDistributedTransactionId = makeStringInfo();
|
||||
|
||||
/*
|
||||
* Append BEGIN and assign_distributed_transaction_id() statements into a single command
|
||||
* and send both in one step. The reason is purely performance, we don't want
|
||||
|
@ -134,14 +162,14 @@ BeginAndSetDistributedTransactionIdCommand(void)
|
|||
DistributedTransactionId *distributedTransactionId =
|
||||
GetCurrentDistributedTransactionId();
|
||||
const char *timestamp = timestamptz_to_str(distributedTransactionId->timestamp);
|
||||
appendStringInfo(beginAndSetDistributedTransactionId,
|
||||
appendStringInfo(assignDistributedTransactionId,
|
||||
"SELECT assign_distributed_transaction_id(%d, " UINT64_FORMAT
|
||||
", '%s');",
|
||||
distributedTransactionId->initiatorNodeIdentifier,
|
||||
distributedTransactionId->transactionNumber,
|
||||
timestamp);
|
||||
|
||||
return beginAndSetDistributedTransactionId;
|
||||
return assignDistributedTransactionId->data;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -19,9 +19,94 @@ SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
|||
on
|
||||
(1 row)
|
||||
|
||||
-- should not be propagated, error should be coming from coordinator
|
||||
-- triggers an error on the worker
|
||||
SET LOCAL TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
ERROR: SET TRANSACTION ISOLATION LEVEL must be called before any query
|
||||
WARNING: SET TRANSACTION ISOLATION LEVEL must be called before any query
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
ERROR: failure on connection marked as essential: localhost:xxxxx
|
||||
END;
|
||||
BEGIN;
|
||||
SET TRANSACTION READ ONLY;
|
||||
-- should fail after setting transaction to read only
|
||||
INSERT INTO test VALUES (2,2);
|
||||
ERROR: cannot execute INSERT in a read-only transaction
|
||||
END;
|
||||
BEGIN;
|
||||
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
-- should reflect new isolation level
|
||||
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
---------------------------------------------------------------------
|
||||
repeatable read
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
BEGIN;
|
||||
SET TRANSACTION READ ONLY;
|
||||
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
---------------------------------------------------------------------
|
||||
on
|
||||
(1 row)
|
||||
|
||||
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
---------------------------------------------------------------------
|
||||
repeatable read
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
BEGIN;
|
||||
SET LOCAL transaction_read_only TO on;
|
||||
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
---------------------------------------------------------------------
|
||||
on
|
||||
(1 row)
|
||||
|
||||
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
---------------------------------------------------------------------
|
||||
repeatable read
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
BEGIN;
|
||||
SET TRANSACTION READ ONLY;
|
||||
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
|
||||
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
---------------------------------------------------------------------
|
||||
on
|
||||
(1 row)
|
||||
|
||||
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
---------------------------------------------------------------------
|
||||
repeatable read
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
BEGIN;
|
||||
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
SAVEPOINT goback;
|
||||
SET TRANSACTION READ ONLY;
|
||||
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
---------------------------------------------------------------------
|
||||
on
|
||||
(1 row)
|
||||
|
||||
ROLLBACK TO SAVEPOINT goback;
|
||||
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||
current_setting
|
||||
---------------------------------------------------------------------
|
||||
off
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
BEGIN;
|
||||
-- set session commands are not propagated
|
||||
|
|
|
@ -13,10 +13,53 @@ SET citus.select_opens_transaction_block TO on;
|
|||
|
||||
BEGIN;
|
||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||
-- should not be propagated, error should be coming from coordinator
|
||||
-- triggers an error on the worker
|
||||
SET LOCAL TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
SET TRANSACTION READ ONLY;
|
||||
-- should fail after setting transaction to read only
|
||||
INSERT INTO test VALUES (2,2);
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
-- should reflect new isolation level
|
||||
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
SET TRANSACTION READ ONLY;
|
||||
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
SET LOCAL transaction_read_only TO on;
|
||||
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
SET TRANSACTION READ ONLY;
|
||||
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
|
||||
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||
SAVEPOINT goback;
|
||||
SET TRANSACTION READ ONLY;
|
||||
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||
ROLLBACK TO SAVEPOINT goback;
|
||||
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||
END;
|
||||
|
||||
BEGIN;
|
||||
-- set session commands are not propagated
|
||||
SET enable_hashagg TO false;
|
||||
|
|
Loading…
Reference in New Issue