diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index c584ac96f..8ba97f14a 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2244,7 +2244,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC || MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) { - CoordinatedTransactionUse2PC(); + CoordinatedTransactionShouldUse2PC(); } /* define how tuples will be serialised */ diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 780b0201e..5d2908bd8 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1165,23 +1165,6 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, return xactProperties; } - if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) - { - /* - * In case localExecutionHappened, we force the executor to use 2PC. - * The primary motivation is that at this point we're definitely expanding - * the nodes participated in the transaction. And, by re-generating the - * remote task lists during local query execution, we might prevent the adaptive - * executor to kick-in 2PC (or even start coordinated transaction, that's why - * we prefer adding this check here instead of - * Activate2PCIfModifyingTransactionExpandsToNewNode()). - */ - xactProperties.errorOnAnyFailure = true; - xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED; - xactProperties.requires2PC = true; - return xactProperties; - } - if (DistributedExecutionRequiresRollback(taskList)) { /* transaction blocks are required if the task list needs to roll back */ @@ -1240,7 +1223,7 @@ StartDistributedExecution(DistributedExecution *execution) if (xactProperties->requires2PC) { - CoordinatedTransactionUse2PC(); + CoordinatedTransactionShouldUse2PC(); } /* @@ -3197,7 +3180,7 @@ Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session) * just opened, which means we're now going to make modifications * over multiple connections. Activate 2PC! */ - CoordinatedTransactionUse2PC(); + CoordinatedTransactionShouldUse2PC(); } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index c1ace895f..0a6cfbbd4 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -236,6 +236,17 @@ ExecuteLocalTaskListExtended(List *taskList, { SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); } + + if (!ReadOnlyTask(task->taskType)) + { + /* + * Any modification on the local execution should enable 2PC. If remote + * queries are also ReadOnly, our 2PC logic is smart enough to skip sending + * PREPARE to those connections. + */ + CoordinatedTransactionShouldUse2PC(); + } + LogLocalCommand(task); if (isUtilityCommand) diff --git a/src/backend/distributed/operations/delete_protocol.c b/src/backend/distributed/operations/delete_protocol.c index eda26ec3d..84a0bc9a9 100644 --- a/src/backend/distributed/operations/delete_protocol.c +++ b/src/backend/distributed/operations/delete_protocol.c @@ -332,7 +332,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName, */ if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) { - CoordinatedTransactionUse2PC(); + CoordinatedTransactionShouldUse2PC(); } List *dropTaskList = DropTaskList(relationId, schemaName, relationName, diff --git a/src/backend/distributed/test/xact_stats.c b/src/backend/distributed/test/xact_stats.c index b157ea27a..05723aa35 100644 --- a/src/backend/distributed/test/xact_stats.c +++ b/src/backend/distributed/test/xact_stats.c @@ -18,9 +18,14 @@ #include "miscadmin.h" #include "pgstat.h" +#include "distributed/transaction_management.h" + + static Size MemoryContextTotalSpace(MemoryContext context); PG_FUNCTION_INFO_V1(top_transaction_context_size); +PG_FUNCTION_INFO_V1(coordinated_transaction_should_use_2PC); + /* * top_transaction_context_size returns current size of TopTransactionContext. @@ -54,3 +59,20 @@ MemoryContextTotalSpace(MemoryContext context) return totalSpace; } + + +/* + * coordinated_transaction_should_use_2PC returns true if the transaction is in a + * coordinated transaction and uses 2PC. If the transaction is nott in a + * coordinated transaction, the function throws an error. + */ +Datum +coordinated_transaction_should_use_2PC(PG_FUNCTION_ARGS) +{ + if (!InCoordinatedTransaction()) + { + ereport(ERROR, (errmsg("The transaction is not a coordinated transaction"))); + } + + PG_RETURN_BOOL(GetCoordinatedTransactionShouldUse2PC()); +} diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 96a4180a4..7bb16e441 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -96,9 +96,16 @@ MemoryContext CommitContext = NULL; /* * Should this coordinated transaction use 2PC? Set by * CoordinatedTransactionUse2PC(), e.g. if DDL was issued and - * MultiShardCommitProtocol was set to 2PC. + * MultiShardCommitProtocol was set to 2PC. But, even if this + * flag is set, the transaction manager is smart enough to only + * do 2PC on the remote connections that did a modification. + * + * As a variable name ShouldCoordinatedTransactionUse2PC could + * be improved. We use CoordinatedTransactionShouldUse2PC() as the + * public API function, hence couldn't come up with a better name + * for the underlying variable at the moment. */ -bool CoordinatedTransactionUses2PC = false; +bool ShouldCoordinatedTransactionUse2PC = false; /* if disabled, distributed statements in a function may run as separate transactions */ bool FunctionOpensTransactionBlock = true; @@ -183,15 +190,29 @@ InCoordinatedTransaction(void) /* - * CoordinatedTransactionUse2PC() signals that the current coordinated + * CoordinatedTransactionShouldUse2PC() signals that the current coordinated * transaction should use 2PC to commit. + * + * Note that even if 2PC is enabled, it is only used for connections that make + * modification (DML or DDL). */ void -CoordinatedTransactionUse2PC(void) +CoordinatedTransactionShouldUse2PC(void) { Assert(InCoordinatedTransaction()); - CoordinatedTransactionUses2PC = true; + ShouldCoordinatedTransactionUse2PC = true; +} + + +/* + * GetCoordinatedTransactionShouldUse2PC is a wrapper function to read the value + * of CoordinatedTransactionShouldUse2PCFlag. + */ +bool +GetCoordinatedTransactionShouldUse2PC(void) +{ + return ShouldCoordinatedTransactionUse2PC; } @@ -425,7 +446,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) */ MarkFailedShardPlacements(); - if (CoordinatedTransactionUses2PC) + if (ShouldCoordinatedTransactionUse2PC) { CoordinatedRemoteTransactionsPrepare(); CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; @@ -453,7 +474,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * Check again whether shards/placement successfully * committed. This handles failure at COMMIT/PREPARE time. */ - PostCommitMarkFailedShardPlacements(CoordinatedTransactionUses2PC); + PostCommitMarkFailedShardPlacements(ShouldCoordinatedTransactionUse2PC); break; } @@ -485,7 +506,7 @@ ResetGlobalVariables() FreeSavedExplainPlan(); dlist_init(&InProgressTransactions); activeSetStmts = NULL; - CoordinatedTransactionUses2PC = false; + ShouldCoordinatedTransactionUse2PC = false; TransactionModifiedNodeMetadata = false; MetadataSyncOnCommit = false; ResetWorkerErrorIndication(); diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index bf65d1e04..0f3c3222d 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -96,7 +96,7 @@ SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *node uint32 connectionFlags = 0; UseCoordinatedTransaction(); - CoordinatedTransactionUse2PC(); + CoordinatedTransactionShouldUse2PC(); MultiConnection *transactionConnection = GetNodeUserDatabaseConnection( connectionFlags, nodeName, @@ -404,7 +404,7 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char * List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock); UseCoordinatedTransaction(); - CoordinatedTransactionUse2PC(); + CoordinatedTransactionShouldUse2PC(); /* open connections in parallel */ WorkerNode *workerNode = NULL; diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index f6bac9100..ecee5eeee 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -111,7 +111,8 @@ extern bool TransactionModifiedNodeMetadata; */ extern void UseCoordinatedTransaction(void); extern bool InCoordinatedTransaction(void); -extern void CoordinatedTransactionUse2PC(void); +extern void CoordinatedTransactionShouldUse2PC(void); +extern bool GetCoordinatedTransactionShouldUse2PC(void); extern bool IsMultiStatementTransaction(void); extern void EnsureDistributedTransactionId(void); diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 1ba4570ba..6a25138c6 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -895,6 +895,215 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in 0 (1 row) +-- a helper function which return true if the coordinated +-- trannsaction uses 2PC +CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_should_use_2PC$$; +-- a local SELECT followed by remote SELECTs +-- does not trigger 2PC +BEGIN; + SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + y +--------------------------------------------------------------------- +(0 rows) + + WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +-- remote SELECTs followed by local SELECTs +-- does not trigger 2PC +BEGIN; + SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + y +--------------------------------------------------------------------- +(0 rows) + + WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +-- a local SELECT followed by a remote Modify +-- triggers 2PC +BEGIN; + SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + y +--------------------------------------------------------------------- +(0 rows) + + UPDATE test SET y = y +1; +NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1) + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a local modify followed by a remote SELECT +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a local modify followed by a remote MODIFY +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + UPDATE test SET y = y +1; +NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1) + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a local modify followed by a remote single shard MODIFY +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + INSERT INTO test VALUES (3,3); + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a remote single shard modify followed by a local single +-- shard MODIFY triggers 2PC +BEGIN; + INSERT INTO test VALUES (3,3); + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a remote single shard select followed by a local single +-- shard MODIFY triggers 2PC. But, the transaction manager +-- is smart enough to skip sending 2PC as the remote +-- command is read only +BEGIN; + SELECT count(*) FROM test WHERE x = 3; + count +--------------------------------------------------------------------- + 2 +(1 row) + + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + + SET LOCAL citus.log_remote_commands TO ON; +COMMIT; +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +-- a local single shard select followed by a remote single +-- shard modify does not trigger 2PC +BEGIN; + SELECT count(*) FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 5 +(1 row) + + INSERT INTO test VALUES (3,3); + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + + SET LOCAL citus.log_remote_commands TO ON; +COMMIT; +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx RESET client_min_messages; \set VERBOSITY terse DROP TABLE ref_table; @@ -906,7 +1115,7 @@ DROP TABLE ref; NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE DROP TABLE test_append_table; DROP SCHEMA coordinator_shouldhaveshards CASCADE; -NOTICE: drop cascades to 19 other objects +NOTICE: drop cascades to 20 other objects SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 3aea816bb..599743550 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -705,7 +705,7 @@ USING HASH AS operator 1 =, function 2 part_hashint4_noop(int4, int8); CREATE TABLE hash_parted ( - a int, + a int, b int ) PARTITION BY HASH (a part_test_int4_ops); CREATE TABLE hpart0 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 0); @@ -1855,6 +1855,138 @@ NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and (1 row) RESET citus.local_copy_flush_threshold; +RESET citus.local_copy_flush_threshold; +CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_should_use_2PC$$; +-- a multi-shard/single-shard select that is failed over to local +-- execution doesn't start a 2PC +BEGIN; + SELECT count(*) FROM another_schema_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630512 another_schema_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630513 another_schema_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630514 another_schema_table WHERE true + count +--------------------------------------------------------------------- + 10001 +(1 row) + + SELECT count(*) FROM another_schema_table WHERE a = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 1 +(1 row) + + WITH cte_1 as (SELECT * FROM another_schema_table LIMIT 10) + SELECT count(*) FROM cte_1; +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 + count +--------------------------------------------------------------------- + 10 +(1 row) + + WITH cte_1 as (SELECT * FROM another_schema_table WHERE a = 1 LIMIT 10) + SELECT count(*) FROM cte_1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT another_schema_table.a, another_schema_table.b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) LIMIT 10) cte_1 + count +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +ROLLBACK; +-- same without a transaction block +WITH cte_1 AS (SELECT count(*) as cnt FROM another_schema_table LIMIT 1000), + cte_2 AS (SELECT coordinated_transaction_should_use_2PC() as enabled_2pc) +SELECT cnt, enabled_2pc FROM cte_1, cte_2; +NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT cte_1.cnt, cte_2.enabled_2pc FROM (SELECT intermediate_result.cnt FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(cnt bigint)) cte_1, (SELECT intermediate_result.enabled_2pc FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(enabled_2pc boolean)) cte_2 + cnt | enabled_2pc +--------------------------------------------------------------------- + 10001 | f +(1 row) + +-- a multi-shard modification that is failed over to local +-- execution starts a 2PC +BEGIN; + UPDATE another_schema_table SET b = b + 1; +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- a multi-shard modification that is failed over to local +-- execution starts a 2PC +BEGIN; + WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) + SELECT count(*) FROM cte_1; +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 + count +--------------------------------------------------------------------- + 10001 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same without transaction block +WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) +SELECT coordinated_transaction_should_use_2PC(); +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: SELECT single_node.coordinated_transaction_should_use_2pc() AS coordinated_transaction_should_use_2pc + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +-- a single-shard modification that is failed over to local +-- starts 2PC execution +BEGIN; + UPDATE another_schema_table SET b = b + 1 WHERE a = 1; +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) WHERE (a OPERATOR(pg_catalog.=) 1) + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same without transaction block +WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; +NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (another_schema_table.b OPERATOR(pg_catalog.+) 1) WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) RETURNING another_schema_table.a, another_schema_table.b) SELECT single_node.coordinated_transaction_should_use_2pc() AS coordinated_transaction_should_use_2pc FROM cte_1 +ERROR: The transaction is not a coordinated transaction -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 8dd0b197c..aeb2ccb24 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -372,9 +372,93 @@ inserts AS ( RETURNING * ) SELECT count(*) FROM inserts; +-- a helper function which return true if the coordinated +-- trannsaction uses 2PC +CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_should_use_2PC$$; + +-- a local SELECT followed by remote SELECTs +-- does not trigger 2PC +BEGIN; + SELECT y FROM test WHERE x = 1; + WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; + SELECT count(*) FROM test; + WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; + +-- remote SELECTs followed by local SELECTs +-- does not trigger 2PC +BEGIN; + SELECT count(*) FROM test; + WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; + SELECT y FROM test WHERE x = 1; + WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; + +-- a local SELECT followed by a remote Modify +-- triggers 2PC +BEGIN; + SELECT y FROM test WHERE x = 1; + UPDATE test SET y = y +1; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; + +-- a local modify followed by a remote SELECT +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); + SELECT count(*) FROM test; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; + +-- a local modify followed by a remote MODIFY +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); + UPDATE test SET y = y +1; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; + +-- a local modify followed by a remote single shard MODIFY +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); + INSERT INTO test VALUES (3,3); + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; + +-- a remote single shard modify followed by a local single +-- shard MODIFY triggers 2PC +BEGIN; + INSERT INTO test VALUES (3,3); + INSERT INTO test VALUES (1,1); + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; + +-- a remote single shard select followed by a local single +-- shard MODIFY triggers 2PC. But, the transaction manager +-- is smart enough to skip sending 2PC as the remote +-- command is read only +BEGIN; + SELECT count(*) FROM test WHERE x = 3; + INSERT INTO test VALUES (1,1); + SELECT coordinated_transaction_should_use_2PC(); + SET LOCAL citus.log_remote_commands TO ON; +COMMIT; + +-- a local single shard select followed by a remote single +-- shard modify does not trigger 2PC +BEGIN; + SELECT count(*) FROM test WHERE x = 1; + INSERT INTO test VALUES (3,3); + SELECT coordinated_transaction_should_use_2PC(); + SET LOCAL citus.log_remote_commands TO ON; +COMMIT; + RESET client_min_messages; - - \set VERBOSITY terse DROP TABLE ref_table; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 6640c97b2..a1d498d44 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -906,6 +906,60 @@ WITH cte_1 AS SELECT bool_and(z is null) FROM cte_1; RESET citus.local_copy_flush_threshold; + +RESET citus.local_copy_flush_threshold; + +CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_should_use_2PC$$; + +-- a multi-shard/single-shard select that is failed over to local +-- execution doesn't start a 2PC +BEGIN; + SELECT count(*) FROM another_schema_table; + SELECT count(*) FROM another_schema_table WHERE a = 1; + WITH cte_1 as (SELECT * FROM another_schema_table LIMIT 10) + SELECT count(*) FROM cte_1; + WITH cte_1 as (SELECT * FROM another_schema_table WHERE a = 1 LIMIT 10) + SELECT count(*) FROM cte_1; + SELECT coordinated_transaction_should_use_2PC(); +ROLLBACK; + +-- same without a transaction block +WITH cte_1 AS (SELECT count(*) as cnt FROM another_schema_table LIMIT 1000), + cte_2 AS (SELECT coordinated_transaction_should_use_2PC() as enabled_2pc) +SELECT cnt, enabled_2pc FROM cte_1, cte_2; + +-- a multi-shard modification that is failed over to local +-- execution starts a 2PC +BEGIN; + UPDATE another_schema_table SET b = b + 1; + SELECT coordinated_transaction_should_use_2PC(); +ROLLBACK; + +-- a multi-shard modification that is failed over to local +-- execution starts a 2PC +BEGIN; + WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) + SELECT count(*) FROM cte_1; + SELECT coordinated_transaction_should_use_2PC(); +ROLLBACK; + +-- same without transaction block +WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) +SELECT coordinated_transaction_should_use_2PC(); + +-- a single-shard modification that is failed over to local +-- starts 2PC execution +BEGIN; + UPDATE another_schema_table SET b = b + 1 WHERE a = 1; + SELECT coordinated_transaction_should_use_2PC(); +ROLLBACK; + +-- same without transaction block +WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; + -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false;