Merge pull request #4806 from citusdata/fix_2pc_local_exec

Do not trigger 2PC for reads on local execution
pull/4817/head
Önder Kalacı 2021-03-12 09:34:34 +01:00 committed by GitHub
commit 56245d232d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 553 additions and 36 deletions

View File

@ -2244,7 +2244,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC ||
MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{
CoordinatedTransactionUse2PC();
CoordinatedTransactionShouldUse2PC();
}
/* define how tuples will be serialised */

View File

@ -1165,23 +1165,6 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList,
return xactProperties;
}
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
{
/*
* In case localExecutionHappened, we force the executor to use 2PC.
* The primary motivation is that at this point we're definitely expanding
* the nodes participated in the transaction. And, by re-generating the
* remote task lists during local query execution, we might prevent the adaptive
* executor to kick-in 2PC (or even start coordinated transaction, that's why
* we prefer adding this check here instead of
* Activate2PCIfModifyingTransactionExpandsToNewNode()).
*/
xactProperties.errorOnAnyFailure = true;
xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED;
xactProperties.requires2PC = true;
return xactProperties;
}
if (DistributedExecutionRequiresRollback(taskList))
{
/* transaction blocks are required if the task list needs to roll back */
@ -1240,7 +1223,7 @@ StartDistributedExecution(DistributedExecution *execution)
if (xactProperties->requires2PC)
{
CoordinatedTransactionUse2PC();
CoordinatedTransactionShouldUse2PC();
}
/*
@ -3197,7 +3180,7 @@ Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session)
* just opened, which means we're now going to make modifications
* over multiple connections. Activate 2PC!
*/
CoordinatedTransactionUse2PC();
CoordinatedTransactionShouldUse2PC();
}
}

View File

@ -236,6 +236,17 @@ ExecuteLocalTaskListExtended(List *taskList,
{
SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
}
if (!ReadOnlyTask(task->taskType))
{
/*
* Any modification on the local execution should enable 2PC. If remote
* queries are also ReadOnly, our 2PC logic is smart enough to skip sending
* PREPARE to those connections.
*/
CoordinatedTransactionShouldUse2PC();
}
LogLocalCommand(task);
if (isUtilityCommand)

View File

@ -332,7 +332,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
*/
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{
CoordinatedTransactionUse2PC();
CoordinatedTransactionShouldUse2PC();
}
List *dropTaskList = DropTaskList(relationId, schemaName, relationName,

View File

@ -18,9 +18,14 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "distributed/transaction_management.h"
static Size MemoryContextTotalSpace(MemoryContext context);
PG_FUNCTION_INFO_V1(top_transaction_context_size);
PG_FUNCTION_INFO_V1(coordinated_transaction_should_use_2PC);
/*
* top_transaction_context_size returns current size of TopTransactionContext.
@ -54,3 +59,20 @@ MemoryContextTotalSpace(MemoryContext context)
return totalSpace;
}
/*
* coordinated_transaction_should_use_2PC returns true if the transaction is in a
* coordinated transaction and uses 2PC. If the transaction is nott in a
* coordinated transaction, the function throws an error.
*/
Datum
coordinated_transaction_should_use_2PC(PG_FUNCTION_ARGS)
{
if (!InCoordinatedTransaction())
{
ereport(ERROR, (errmsg("The transaction is not a coordinated transaction")));
}
PG_RETURN_BOOL(GetCoordinatedTransactionShouldUse2PC());
}

View File

@ -96,9 +96,16 @@ MemoryContext CommitContext = NULL;
/*
* Should this coordinated transaction use 2PC? Set by
* CoordinatedTransactionUse2PC(), e.g. if DDL was issued and
* MultiShardCommitProtocol was set to 2PC.
* MultiShardCommitProtocol was set to 2PC. But, even if this
* flag is set, the transaction manager is smart enough to only
* do 2PC on the remote connections that did a modification.
*
* As a variable name ShouldCoordinatedTransactionUse2PC could
* be improved. We use CoordinatedTransactionShouldUse2PC() as the
* public API function, hence couldn't come up with a better name
* for the underlying variable at the moment.
*/
bool CoordinatedTransactionUses2PC = false;
bool ShouldCoordinatedTransactionUse2PC = false;
/* if disabled, distributed statements in a function may run as separate transactions */
bool FunctionOpensTransactionBlock = true;
@ -183,15 +190,29 @@ InCoordinatedTransaction(void)
/*
* CoordinatedTransactionUse2PC() signals that the current coordinated
* CoordinatedTransactionShouldUse2PC() signals that the current coordinated
* transaction should use 2PC to commit.
*
* Note that even if 2PC is enabled, it is only used for connections that make
* modification (DML or DDL).
*/
void
CoordinatedTransactionUse2PC(void)
CoordinatedTransactionShouldUse2PC(void)
{
Assert(InCoordinatedTransaction());
CoordinatedTransactionUses2PC = true;
ShouldCoordinatedTransactionUse2PC = true;
}
/*
* GetCoordinatedTransactionShouldUse2PC is a wrapper function to read the value
* of CoordinatedTransactionShouldUse2PCFlag.
*/
bool
GetCoordinatedTransactionShouldUse2PC(void)
{
return ShouldCoordinatedTransactionUse2PC;
}
@ -425,7 +446,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
*/
MarkFailedShardPlacements();
if (CoordinatedTransactionUses2PC)
if (ShouldCoordinatedTransactionUse2PC)
{
CoordinatedRemoteTransactionsPrepare();
CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
@ -453,7 +474,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* Check again whether shards/placement successfully
* committed. This handles failure at COMMIT/PREPARE time.
*/
PostCommitMarkFailedShardPlacements(CoordinatedTransactionUses2PC);
PostCommitMarkFailedShardPlacements(ShouldCoordinatedTransactionUse2PC);
break;
}
@ -485,7 +506,7 @@ ResetGlobalVariables()
FreeSavedExplainPlan();
dlist_init(&InProgressTransactions);
activeSetStmts = NULL;
CoordinatedTransactionUses2PC = false;
ShouldCoordinatedTransactionUse2PC = false;
TransactionModifiedNodeMetadata = false;
MetadataSyncOnCommit = false;
ResetWorkerErrorIndication();

View File

@ -96,7 +96,7 @@ SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *node
uint32 connectionFlags = 0;
UseCoordinatedTransaction();
CoordinatedTransactionUse2PC();
CoordinatedTransactionShouldUse2PC();
MultiConnection *transactionConnection = GetNodeUserDatabaseConnection(
connectionFlags, nodeName,
@ -404,7 +404,7 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *
List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
UseCoordinatedTransaction();
CoordinatedTransactionUse2PC();
CoordinatedTransactionShouldUse2PC();
/* open connections in parallel */
WorkerNode *workerNode = NULL;

View File

@ -111,7 +111,8 @@ extern bool TransactionModifiedNodeMetadata;
*/
extern void UseCoordinatedTransaction(void);
extern bool InCoordinatedTransaction(void);
extern void CoordinatedTransactionUse2PC(void);
extern void CoordinatedTransactionShouldUse2PC(void);
extern bool GetCoordinatedTransactionShouldUse2PC(void);
extern bool IsMultiStatementTransaction(void);
extern void EnsureDistributedTransactionId(void);

View File

@ -895,6 +895,215 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in
0
(1 row)
-- a helper function which return true if the coordinated
-- trannsaction uses 2PC
CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC()
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
$$coordinated_transaction_should_use_2PC$$;
-- a local SELECT followed by remote SELECTs
-- does not trigger 2PC
BEGIN;
SELECT y FROM test WHERE x = 1;
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
y
---------------------------------------------------------------------
(0 rows)
WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM test;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
count
---------------------------------------------------------------------
0
(1 row)
WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
count
---------------------------------------------------------------------
0
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
-- remote SELECTs followed by local SELECTs
-- does not trigger 2PC
BEGIN;
SELECT count(*) FROM test;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
count
---------------------------------------------------------------------
0
(1 row)
WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
count
---------------------------------------------------------------------
0
(1 row)
SELECT y FROM test WHERE x = 1;
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
y
---------------------------------------------------------------------
(0 rows)
WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
count
---------------------------------------------------------------------
0
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
-- a local SELECT followed by a remote Modify
-- triggers 2PC
BEGIN;
SELECT y FROM test WHERE x = 1;
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
y
---------------------------------------------------------------------
(0 rows)
UPDATE test SET y = y +1;
NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1)
NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
COMMIT;
-- a local modify followed by a remote SELECT
-- triggers 2PC
BEGIN;
INSERT INTO test VALUES (1,1);
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
SELECT count(*) FROM test;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
count
---------------------------------------------------------------------
1
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
COMMIT;
-- a local modify followed by a remote MODIFY
-- triggers 2PC
BEGIN;
INSERT INTO test VALUES (1,1);
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
UPDATE test SET y = y +1;
NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1)
NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
COMMIT;
-- a local modify followed by a remote single shard MODIFY
-- triggers 2PC
BEGIN;
INSERT INTO test VALUES (1,1);
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
INSERT INTO test VALUES (3,3);
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
COMMIT;
-- a remote single shard modify followed by a local single
-- shard MODIFY triggers 2PC
BEGIN;
INSERT INTO test VALUES (3,3);
INSERT INTO test VALUES (1,1);
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
COMMIT;
-- a remote single shard select followed by a local single
-- shard MODIFY triggers 2PC. But, the transaction manager
-- is smart enough to skip sending 2PC as the remote
-- command is read only
BEGIN;
SELECT count(*) FROM test WHERE x = 3;
count
---------------------------------------------------------------------
2
(1 row)
INSERT INTO test VALUES (1,1);
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
SET LOCAL citus.log_remote_commands TO ON;
COMMIT;
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-- a local single shard select followed by a remote single
-- shard modify does not trigger 2PC
BEGIN;
SELECT count(*) FROM test WHERE x = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
count
---------------------------------------------------------------------
5
(1 row)
INSERT INTO test VALUES (3,3);
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
SET LOCAL citus.log_remote_commands TO ON;
COMMIT;
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
RESET client_min_messages;
\set VERBOSITY terse
DROP TABLE ref_table;
@ -906,7 +1115,7 @@ DROP TABLE ref;
NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE
DROP TABLE test_append_table;
DROP SCHEMA coordinator_shouldhaveshards CASCADE;
NOTICE: drop cascades to 19 other objects
NOTICE: drop cascades to 20 other objects
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
?column?
---------------------------------------------------------------------

View File

@ -705,7 +705,7 @@ USING HASH AS
operator 1 =,
function 2 part_hashint4_noop(int4, int8);
CREATE TABLE hash_parted (
a int,
a int,
b int
) PARTITION BY HASH (a part_test_int4_ops);
CREATE TABLE hpart0 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 0);
@ -1855,6 +1855,138 @@ NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and
(1 row)
RESET citus.local_copy_flush_threshold;
RESET citus.local_copy_flush_threshold;
CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC()
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
$$coordinated_transaction_should_use_2PC$$;
-- a multi-shard/single-shard select that is failed over to local
-- execution doesn't start a 2PC
BEGIN;
SELECT count(*) FROM another_schema_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630512 another_schema_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630513 another_schema_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630514 another_schema_table WHERE true
count
---------------------------------------------------------------------
10001
(1 row)
SELECT count(*) FROM another_schema_table WHERE a = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 1)
count
---------------------------------------------------------------------
1
(1 row)
WITH cte_1 as (SELECT * FROM another_schema_table LIMIT 10)
SELECT count(*) FROM cte_1;
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '10'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '10'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '10'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '10'::bigint
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1
count
---------------------------------------------------------------------
10
(1 row)
WITH cte_1 as (SELECT * FROM another_schema_table WHERE a = 1 LIMIT 10)
SELECT count(*) FROM cte_1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT another_schema_table.a, another_schema_table.b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) LIMIT 10) cte_1
count
---------------------------------------------------------------------
1
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
ROLLBACK;
-- same without a transaction block
WITH cte_1 AS (SELECT count(*) as cnt FROM another_schema_table LIMIT 1000),
cte_2 AS (SELECT coordinated_transaction_should_use_2PC() as enabled_2pc)
SELECT cnt, enabled_2pc FROM cte_1, cte_2;
NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '1000'::bigint
NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '1000'::bigint
NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '1000'::bigint
NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '1000'::bigint
NOTICE: executing the command locally: SELECT cte_1.cnt, cte_2.enabled_2pc FROM (SELECT intermediate_result.cnt FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(cnt bigint)) cte_1, (SELECT intermediate_result.enabled_2pc FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(enabled_2pc boolean)) cte_2
cnt | enabled_2pc
---------------------------------------------------------------------
10001 | f
(1 row)
-- a multi-shard modification that is failed over to local
-- execution starts a 2PC
BEGIN;
UPDATE another_schema_table SET b = b + 1;
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1)
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1)
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1)
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
-- a multi-shard modification that is failed over to local
-- execution starts a 2PC
BEGIN;
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *)
SELECT count(*) FROM cte_1;
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1
count
---------------------------------------------------------------------
10001
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
-- same without transaction block
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *)
SELECT coordinated_transaction_should_use_2PC();
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b
NOTICE: executing the command locally: SELECT single_node.coordinated_transaction_should_use_2pc() AS coordinated_transaction_should_use_2pc
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
-- a single-shard modification that is failed over to local
-- starts 2PC execution
BEGIN;
UPDATE another_schema_table SET b = b + 1 WHERE a = 1;
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) WHERE (a OPERATOR(pg_catalog.=) 1)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
-- same without transaction block
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (another_schema_table.b OPERATOR(pg_catalog.+) 1) WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) RETURNING another_schema_table.a, another_schema_table.b) SELECT single_node.coordinated_transaction_should_use_2pc() AS coordinated_transaction_should_use_2pc FROM cte_1
ERROR: The transaction is not a coordinated transaction
-- if the local execution is disabled, we cannot failover to
-- local execution and the queries would fail
SET citus.enable_local_execution TO false;

View File

@ -372,9 +372,93 @@ inserts AS (
RETURNING *
) SELECT count(*) FROM inserts;
-- a helper function which return true if the coordinated
-- trannsaction uses 2PC
CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC()
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
$$coordinated_transaction_should_use_2PC$$;
-- a local SELECT followed by remote SELECTs
-- does not trigger 2PC
BEGIN;
SELECT y FROM test WHERE x = 1;
WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test;
SELECT count(*) FROM test;
WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
-- remote SELECTs followed by local SELECTs
-- does not trigger 2PC
BEGIN;
SELECT count(*) FROM test;
WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test;
SELECT y FROM test WHERE x = 1;
WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
-- a local SELECT followed by a remote Modify
-- triggers 2PC
BEGIN;
SELECT y FROM test WHERE x = 1;
UPDATE test SET y = y +1;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
-- a local modify followed by a remote SELECT
-- triggers 2PC
BEGIN;
INSERT INTO test VALUES (1,1);
SELECT count(*) FROM test;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
-- a local modify followed by a remote MODIFY
-- triggers 2PC
BEGIN;
INSERT INTO test VALUES (1,1);
UPDATE test SET y = y +1;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
-- a local modify followed by a remote single shard MODIFY
-- triggers 2PC
BEGIN;
INSERT INTO test VALUES (1,1);
INSERT INTO test VALUES (3,3);
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
-- a remote single shard modify followed by a local single
-- shard MODIFY triggers 2PC
BEGIN;
INSERT INTO test VALUES (3,3);
INSERT INTO test VALUES (1,1);
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
-- a remote single shard select followed by a local single
-- shard MODIFY triggers 2PC. But, the transaction manager
-- is smart enough to skip sending 2PC as the remote
-- command is read only
BEGIN;
SELECT count(*) FROM test WHERE x = 3;
INSERT INTO test VALUES (1,1);
SELECT coordinated_transaction_should_use_2PC();
SET LOCAL citus.log_remote_commands TO ON;
COMMIT;
-- a local single shard select followed by a remote single
-- shard modify does not trigger 2PC
BEGIN;
SELECT count(*) FROM test WHERE x = 1;
INSERT INTO test VALUES (3,3);
SELECT coordinated_transaction_should_use_2PC();
SET LOCAL citus.log_remote_commands TO ON;
COMMIT;
RESET client_min_messages;
\set VERBOSITY terse
DROP TABLE ref_table;

View File

@ -906,6 +906,60 @@ WITH cte_1 AS
SELECT bool_and(z is null) FROM cte_1;
RESET citus.local_copy_flush_threshold;
RESET citus.local_copy_flush_threshold;
CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC()
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
$$coordinated_transaction_should_use_2PC$$;
-- a multi-shard/single-shard select that is failed over to local
-- execution doesn't start a 2PC
BEGIN;
SELECT count(*) FROM another_schema_table;
SELECT count(*) FROM another_schema_table WHERE a = 1;
WITH cte_1 as (SELECT * FROM another_schema_table LIMIT 10)
SELECT count(*) FROM cte_1;
WITH cte_1 as (SELECT * FROM another_schema_table WHERE a = 1 LIMIT 10)
SELECT count(*) FROM cte_1;
SELECT coordinated_transaction_should_use_2PC();
ROLLBACK;
-- same without a transaction block
WITH cte_1 AS (SELECT count(*) as cnt FROM another_schema_table LIMIT 1000),
cte_2 AS (SELECT coordinated_transaction_should_use_2PC() as enabled_2pc)
SELECT cnt, enabled_2pc FROM cte_1, cte_2;
-- a multi-shard modification that is failed over to local
-- execution starts a 2PC
BEGIN;
UPDATE another_schema_table SET b = b + 1;
SELECT coordinated_transaction_should_use_2PC();
ROLLBACK;
-- a multi-shard modification that is failed over to local
-- execution starts a 2PC
BEGIN;
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *)
SELECT count(*) FROM cte_1;
SELECT coordinated_transaction_should_use_2PC();
ROLLBACK;
-- same without transaction block
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *)
SELECT coordinated_transaction_should_use_2PC();
-- a single-shard modification that is failed over to local
-- starts 2PC execution
BEGIN;
UPDATE another_schema_table SET b = b + 1 WHERE a = 1;
SELECT coordinated_transaction_should_use_2PC();
ROLLBACK;
-- same without transaction block
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
-- if the local execution is disabled, we cannot failover to
-- local execution and the queries would fail
SET citus.enable_local_execution TO false;