mirror of https://github.com/citusdata/citus.git
Add a select_opens_transaction_block GUC
parent
a54f9a6d2c
commit
89870e76ce
|
@ -42,6 +42,13 @@
|
||||||
#include "utils/timestamp.h"
|
#include "utils/timestamp.h"
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GUC that determines whether a SELECT in a transaction block should also run in
|
||||||
|
* a transaction block on the worker even if no writes have occurred yet.
|
||||||
|
*/
|
||||||
|
bool SelectOpensTransactionBlock;
|
||||||
|
|
||||||
|
|
||||||
/* Local functions forward declarations */
|
/* Local functions forward declarations */
|
||||||
static ConnectAction ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
static ConnectAction ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
||||||
TaskExecutionStatus *executionStatus,
|
TaskExecutionStatus *executionStatus,
|
||||||
|
@ -97,7 +104,7 @@ MultiRealTimeExecute(Job *job)
|
||||||
workerNodeList = ActiveReadableNodeList();
|
workerNodeList = ActiveReadableNodeList();
|
||||||
workerHash = WorkerHash(workerHashName, workerNodeList);
|
workerHash = WorkerHash(workerHashName, workerNodeList);
|
||||||
|
|
||||||
if (IsTransactionBlock())
|
if (IsTransactionBlock() && SelectOpensTransactionBlock)
|
||||||
{
|
{
|
||||||
BeginOrContinueCoordinatedTransaction();
|
BeginOrContinueCoordinatedTransaction();
|
||||||
}
|
}
|
||||||
|
|
|
@ -497,6 +497,23 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_UNIT_MS,
|
GUC_UNIT_MS,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomBoolVariable(
|
||||||
|
"citus.select_opens_transaction_block",
|
||||||
|
gettext_noop("Open transaction blocks for SELECT commands"),
|
||||||
|
gettext_noop("When enabled, Citus will always send a BEGIN to workers when "
|
||||||
|
"running a distributed SELECT in a transaction block (the "
|
||||||
|
"default). When disabled, Citus will only send BEGIN before "
|
||||||
|
"the first write or other operation that requires a distributed "
|
||||||
|
"transaction, meaning the SELECT on the worker commits "
|
||||||
|
"immediately, releasing any locks and apply any changes made "
|
||||||
|
"through function calls even if the distributed transaction "
|
||||||
|
"aborts."),
|
||||||
|
&SelectOpensTransactionBlock,
|
||||||
|
true,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_NO_SHOW_ALL,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.enable_deadlock_prevention",
|
"citus.enable_deadlock_prevention",
|
||||||
gettext_noop("Prevents transactions from expanding to multiple nodes"),
|
gettext_noop("Prevents transactions from expanding to multiple nodes"),
|
||||||
|
|
|
@ -53,6 +53,12 @@ typedef enum
|
||||||
COMMIT_PROTOCOL_2PC = 2
|
COMMIT_PROTOCOL_2PC = 2
|
||||||
} CommitProtocolType;
|
} CommitProtocolType;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GUC that determines whether a SELECT in a transaction block should also run in
|
||||||
|
* a transaction block on the worker.
|
||||||
|
*/
|
||||||
|
extern bool SelectOpensTransactionBlock;
|
||||||
|
|
||||||
/* config variable managed via guc.c */
|
/* config variable managed via guc.c */
|
||||||
extern int MultiShardCommitProtocol;
|
extern int MultiShardCommitProtocol;
|
||||||
|
|
||||||
|
|
|
@ -365,6 +365,21 @@ SELECT pg_reload_conf();
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET citus.select_opens_transaction_block TO off;
|
||||||
|
-- This query would self-deadlock if it ran in a distributed transaction
|
||||||
|
SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY id;
|
||||||
|
id | pg_advisory_lock
|
||||||
|
----+------------------
|
||||||
|
1 |
|
||||||
|
2 |
|
||||||
|
3 |
|
||||||
|
4 |
|
||||||
|
5 |
|
||||||
|
6 |
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
END;
|
||||||
DROP SCHEMA multi_real_time_transaction CASCADE;
|
DROP SCHEMA multi_real_time_transaction CASCADE;
|
||||||
NOTICE: drop cascades to 4 other objects
|
NOTICE: drop cascades to 4 other objects
|
||||||
DETAIL: drop cascades to table test_table
|
DETAIL: drop cascades to table test_table
|
||||||
|
|
|
@ -373,6 +373,21 @@ SELECT pg_reload_conf();
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET citus.select_opens_transaction_block TO off;
|
||||||
|
-- This query would self-deadlock if it ran in a distributed transaction
|
||||||
|
SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY id;
|
||||||
|
id | pg_advisory_lock
|
||||||
|
----+------------------
|
||||||
|
1 |
|
||||||
|
2 |
|
||||||
|
3 |
|
||||||
|
4 |
|
||||||
|
5 |
|
||||||
|
6 |
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
END;
|
||||||
DROP SCHEMA multi_real_time_transaction CASCADE;
|
DROP SCHEMA multi_real_time_transaction CASCADE;
|
||||||
NOTICE: drop cascades to 4 other objects
|
NOTICE: drop cascades to 4 other objects
|
||||||
DETAIL: drop cascades to table test_table
|
DETAIL: drop cascades to table test_table
|
||||||
|
|
|
@ -224,5 +224,10 @@ SET client_min_messages TO DEFAULT;
|
||||||
alter system set deadlock_timeout TO DEFAULT;
|
alter system set deadlock_timeout TO DEFAULT;
|
||||||
SELECT pg_reload_conf();
|
SELECT pg_reload_conf();
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET citus.select_opens_transaction_block TO off;
|
||||||
|
-- This query would self-deadlock if it ran in a distributed transaction
|
||||||
|
SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY id;
|
||||||
|
END;
|
||||||
|
|
||||||
DROP SCHEMA multi_real_time_transaction CASCADE;
|
DROP SCHEMA multi_real_time_transaction CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue