mirror of https://github.com/citusdata/citus.git
Merge pull request #4945 from citusdata/marcocitus/set-transaction
commit
80f41e94c0
|
@ -387,7 +387,10 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
||||||
parsetree = ProcessCreateSubscriptionStmt(createSubStmt);
|
parsetree = ProcessCreateSubscriptionStmt(createSubStmt);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* process SET LOCAL stmts of allowed GUCs in multi-stmt xacts */
|
/*
|
||||||
|
* Process SET LOCAL and SET TRANSACTION statements in multi-statement
|
||||||
|
* transactions.
|
||||||
|
*/
|
||||||
if (IsA(parsetree, VariableSetStmt))
|
if (IsA(parsetree, VariableSetStmt))
|
||||||
{
|
{
|
||||||
VariableSetStmt *setStmt = (VariableSetStmt *) parsetree;
|
VariableSetStmt *setStmt = (VariableSetStmt *) parsetree;
|
||||||
|
|
|
@ -35,6 +35,7 @@ static bool IsSettingSafeToPropagate(char *name);
|
||||||
*
|
*
|
||||||
* We currently propagate:
|
* We currently propagate:
|
||||||
* - SET LOCAL (for allowed settings)
|
* - SET LOCAL (for allowed settings)
|
||||||
|
* - SET TRANSACTION
|
||||||
* - RESET (for allowed settings)
|
* - RESET (for allowed settings)
|
||||||
* - RESET ALL
|
* - RESET ALL
|
||||||
*/
|
*/
|
||||||
|
@ -72,8 +73,8 @@ ShouldPropagateSetCommand(VariableSetStmt *setStmt)
|
||||||
case VAR_SET_MULTI:
|
case VAR_SET_MULTI:
|
||||||
default:
|
default:
|
||||||
{
|
{
|
||||||
/* SET (LOCAL) TRANSACTION should be handled locally */
|
/* SET TRANSACTION is similar to SET LOCAL */
|
||||||
return false;
|
return strcmp(setStmt->name, "TRANSACTION") == 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -121,7 +122,7 @@ PostprocessVariableSetStmt(VariableSetStmt *setStmt, const char *setStmtString)
|
||||||
const bool raiseInterrupts = true;
|
const bool raiseInterrupts = true;
|
||||||
List *connectionList = NIL;
|
List *connectionList = NIL;
|
||||||
|
|
||||||
/* at present we only support SET LOCAL */
|
/* at present we only support SET LOCAL and SET TRANSACTION */
|
||||||
AssertArg(ShouldPropagateSetCommand(setStmt));
|
AssertArg(ShouldPropagateSetCommand(setStmt));
|
||||||
|
|
||||||
/* haven't seen any SET stmts so far in this (sub-)xact: initialize StringInfo */
|
/* haven't seen any SET stmts so far in this (sub-)xact: initialize StringInfo */
|
||||||
|
|
|
@ -34,6 +34,7 @@
|
||||||
#define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u"
|
#define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u"
|
||||||
|
|
||||||
|
|
||||||
|
static char * AssignDistributedTransactionIdCommand(void);
|
||||||
static void StartRemoteTransactionSavepointBegin(MultiConnection *connection,
|
static void StartRemoteTransactionSavepointBegin(MultiConnection *connection,
|
||||||
SubTransactionId subId);
|
SubTransactionId subId);
|
||||||
static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection,
|
static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection,
|
||||||
|
@ -68,8 +69,15 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
|
||||||
|
|
||||||
transaction->transactionState = REMOTE_TRANS_STARTING;
|
transaction->transactionState = REMOTE_TRANS_STARTING;
|
||||||
|
|
||||||
StringInfo beginAndSetDistributedTransactionId =
|
StringInfo beginAndSetDistributedTransactionId = makeStringInfo();
|
||||||
BeginAndSetDistributedTransactionIdCommand();
|
|
||||||
|
/*
|
||||||
|
* Explicitly specify READ COMMITTED, the default on the remote
|
||||||
|
* side might have been changed, and that would cause problematic
|
||||||
|
* behaviour.
|
||||||
|
*/
|
||||||
|
appendStringInfoString(beginAndSetDistributedTransactionId,
|
||||||
|
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
|
||||||
|
|
||||||
/* append context for in-progress SAVEPOINTs for this transaction */
|
/* append context for in-progress SAVEPOINTs for this transaction */
|
||||||
List *activeSubXacts = ActiveSubXactContexts();
|
List *activeSubXacts = ActiveSubXactContexts();
|
||||||
|
@ -98,6 +106,10 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
|
||||||
appendStringInfoString(beginAndSetDistributedTransactionId, activeSetStmts->data);
|
appendStringInfoString(beginAndSetDistributedTransactionId, activeSetStmts->data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* add SELECT assign_distributed_transaction_id ... */
|
||||||
|
appendStringInfoString(beginAndSetDistributedTransactionId,
|
||||||
|
AssignDistributedTransactionIdCommand());
|
||||||
|
|
||||||
if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data))
|
if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data))
|
||||||
{
|
{
|
||||||
const bool raiseErrors = true;
|
const bool raiseErrors = true;
|
||||||
|
@ -126,6 +138,22 @@ BeginAndSetDistributedTransactionIdCommand(void)
|
||||||
appendStringInfoString(beginAndSetDistributedTransactionId,
|
appendStringInfoString(beginAndSetDistributedTransactionId,
|
||||||
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
|
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
|
||||||
|
|
||||||
|
appendStringInfoString(beginAndSetDistributedTransactionId,
|
||||||
|
AssignDistributedTransactionIdCommand());
|
||||||
|
|
||||||
|
return beginAndSetDistributedTransactionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AssignDistributedTransactionIdCommand returns a command to set the local
|
||||||
|
* distributed transaction ID on a remote transaction.
|
||||||
|
*/
|
||||||
|
static char *
|
||||||
|
AssignDistributedTransactionIdCommand(void)
|
||||||
|
{
|
||||||
|
StringInfo assignDistributedTransactionId = makeStringInfo();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Append BEGIN and assign_distributed_transaction_id() statements into a single command
|
* Append BEGIN and assign_distributed_transaction_id() statements into a single command
|
||||||
* and send both in one step. The reason is purely performance, we don't want
|
* and send both in one step. The reason is purely performance, we don't want
|
||||||
|
@ -134,14 +162,14 @@ BeginAndSetDistributedTransactionIdCommand(void)
|
||||||
DistributedTransactionId *distributedTransactionId =
|
DistributedTransactionId *distributedTransactionId =
|
||||||
GetCurrentDistributedTransactionId();
|
GetCurrentDistributedTransactionId();
|
||||||
const char *timestamp = timestamptz_to_str(distributedTransactionId->timestamp);
|
const char *timestamp = timestamptz_to_str(distributedTransactionId->timestamp);
|
||||||
appendStringInfo(beginAndSetDistributedTransactionId,
|
appendStringInfo(assignDistributedTransactionId,
|
||||||
"SELECT assign_distributed_transaction_id(%d, " UINT64_FORMAT
|
"SELECT assign_distributed_transaction_id(%d, " UINT64_FORMAT
|
||||||
", '%s');",
|
", '%s');",
|
||||||
distributedTransactionId->initiatorNodeIdentifier,
|
distributedTransactionId->initiatorNodeIdentifier,
|
||||||
distributedTransactionId->transactionNumber,
|
distributedTransactionId->transactionNumber,
|
||||||
timestamp);
|
timestamp);
|
||||||
|
|
||||||
return beginAndSetDistributedTransactionId;
|
return assignDistributedTransactionId->data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -19,9 +19,94 @@ SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||||
on
|
on
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- should not be propagated, error should be coming from coordinator
|
-- triggers an error on the worker
|
||||||
SET LOCAL TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
SET LOCAL TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
ERROR: SET TRANSACTION ISOLATION LEVEL must be called before any query
|
WARNING: SET TRANSACTION ISOLATION LEVEL must be called before any query
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
ERROR: failure on connection marked as essential: localhost:xxxxx
|
||||||
|
END;
|
||||||
|
BEGIN;
|
||||||
|
SET TRANSACTION READ ONLY;
|
||||||
|
-- should fail after setting transaction to read only
|
||||||
|
INSERT INTO test VALUES (2,2);
|
||||||
|
ERROR: cannot execute INSERT in a read-only transaction
|
||||||
|
END;
|
||||||
|
BEGIN;
|
||||||
|
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
-- should reflect new isolation level
|
||||||
|
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||||
|
current_setting
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
repeatable read
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
END;
|
||||||
|
BEGIN;
|
||||||
|
SET TRANSACTION READ ONLY;
|
||||||
|
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||||
|
current_setting
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
on
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||||
|
current_setting
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
repeatable read
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
END;
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL transaction_read_only TO on;
|
||||||
|
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||||
|
current_setting
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
on
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||||
|
current_setting
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
repeatable read
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
END;
|
||||||
|
BEGIN;
|
||||||
|
SET TRANSACTION READ ONLY;
|
||||||
|
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
|
||||||
|
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||||
|
current_setting
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
on
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||||
|
current_setting
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
repeatable read
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
END;
|
||||||
|
BEGIN;
|
||||||
|
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
SAVEPOINT goback;
|
||||||
|
SET TRANSACTION READ ONLY;
|
||||||
|
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||||
|
current_setting
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
on
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK TO SAVEPOINT goback;
|
||||||
|
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||||
|
current_setting
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
off
|
||||||
|
(1 row)
|
||||||
|
|
||||||
END;
|
END;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
-- set session commands are not propagated
|
-- set session commands are not propagated
|
||||||
|
|
|
@ -13,10 +13,53 @@ SET citus.select_opens_transaction_block TO on;
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
|
||||||
-- should not be propagated, error should be coming from coordinator
|
-- triggers an error on the worker
|
||||||
SET LOCAL TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
SET LOCAL TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
END;
|
END;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET TRANSACTION READ ONLY;
|
||||||
|
-- should fail after setting transaction to read only
|
||||||
|
INSERT INTO test VALUES (2,2);
|
||||||
|
END;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
-- should reflect new isolation level
|
||||||
|
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||||
|
END;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET TRANSACTION READ ONLY;
|
||||||
|
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||||
|
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||||
|
END;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL transaction_read_only TO on;
|
||||||
|
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||||
|
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||||
|
END;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET TRANSACTION READ ONLY;
|
||||||
|
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
|
||||||
|
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||||
|
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
|
||||||
|
END;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
|
||||||
|
SAVEPOINT goback;
|
||||||
|
SET TRANSACTION READ ONLY;
|
||||||
|
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||||
|
ROLLBACK TO SAVEPOINT goback;
|
||||||
|
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
|
||||||
|
END;
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
-- set session commands are not propagated
|
-- set session commands are not propagated
|
||||||
SET enable_hashagg TO false;
|
SET enable_hashagg TO false;
|
||||||
|
|
Loading…
Reference in New Issue