From 0805ef9c79cbbc2d8a020442aa64d6bfe863e9a2 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 10 Mar 2021 11:23:27 +0100 Subject: [PATCH] Do not trigger 2PC for reads on local execution Before this commit, Citus used 2PC no matter what kind of local query execution happens. For example, if the coordinator has shards (and the workers as well), even a simple SELECT query could start 2PC: ```SQL WITH cte_1 AS (SELECT * FROM test LIMIT 10) SELECT count(*) FROM cte_1; ``` In this query, the local execution of the shards (and also intermediate result reads) triggers the 2PC. To prevent that, Citus now distinguishes local reads and local writes. And, Citus switches to 2PC only if a modification happens. This may still lead to unnecessary 2PCs when there is a local modification and remote SELECTs only. Though, we handle that separately via #4587. (cherry picked from commit 6a7ed7b309caca3688c51d28ca4fdf1f165398fd) --- .../distributed/executor/adaptive_executor.c | 17 -- .../distributed/executor/local_executor.c | 11 + src/backend/distributed/test/xact_stats.c | 22 ++ .../transaction/transaction_management.c | 11 + .../distributed/transaction_management.h | 1 + .../expected/coordinator_shouldhaveshards.out | 211 +++++++++++++++++- src/test/regress/expected/single_node.out | 132 +++++++++++ .../sql/coordinator_shouldhaveshards.sql | 88 +++++++- src/test/regress/sql/single_node.sql | 54 +++++ 9 files changed, 527 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 4e4292f8f..a79520947 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1165,23 +1165,6 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, return xactProperties; } - if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) - { - /* - * In case localExecutionHappened, we force the executor to use 2PC. - * The primary motivation is that at this point we're definitely expanding - * the nodes participated in the transaction. And, by re-generating the - * remote task lists during local query execution, we might prevent the adaptive - * executor to kick-in 2PC (or even start coordinated transaction, that's why - * we prefer adding this check here instead of - * Activate2PCIfModifyingTransactionExpandsToNewNode()). - */ - xactProperties.errorOnAnyFailure = true; - xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED; - xactProperties.requires2PC = true; - return xactProperties; - } - if (DistributedExecutionRequiresRollback(taskList)) { /* transaction blocks are required if the task list needs to roll back */ diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index c1ace895f..c8214c886 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -236,6 +236,17 @@ ExecuteLocalTaskListExtended(List *taskList, { SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); } + + if (!ReadOnlyTask(task->taskType)) + { + /* + * Any modification on the local execution should enable 2PC. If remote + * queries are also ReadOnly, our 2PC logic is smart enough to skip sending + * PREPARE to those connections. + */ + CoordinatedTransactionUse2PC(); + } + LogLocalCommand(task); if (isUtilityCommand) diff --git a/src/backend/distributed/test/xact_stats.c b/src/backend/distributed/test/xact_stats.c index b157ea27a..e74b65b74 100644 --- a/src/backend/distributed/test/xact_stats.c +++ b/src/backend/distributed/test/xact_stats.c @@ -18,9 +18,14 @@ #include "miscadmin.h" #include "pgstat.h" +#include "distributed/transaction_management.h" + + static Size MemoryContextTotalSpace(MemoryContext context); PG_FUNCTION_INFO_V1(top_transaction_context_size); +PG_FUNCTION_INFO_V1(coordinated_transaction_uses_2PC); + /* * top_transaction_context_size returns current size of TopTransactionContext. @@ -54,3 +59,20 @@ MemoryContextTotalSpace(MemoryContext context) return totalSpace; } + + +/* + * coordinated_transaction_uses_2PC returns true if the transaction is in a + * coordinated transaction and uses 2PC. If the transaction is nott in a + * coordinated transaction, the function throws an error. + */ +Datum +coordinated_transaction_uses_2PC(PG_FUNCTION_ARGS) +{ + if (!InCoordinatedTransaction()) + { + ereport(ERROR, (errmsg("The transaction is not a coordinated transaction"))); + } + + PG_RETURN_BOOL(GetCoordinatedTransactionUses2PC()); +} diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 96a4180a4..3c74885ac 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -195,6 +195,17 @@ CoordinatedTransactionUse2PC(void) } +/* + * GetCoordinatedTransactionUses2PC is a wrapper function to read the value + * of CoordinatedTransactionUses2PC. + */ +bool +GetCoordinatedTransactionUses2PC(void) +{ + return CoordinatedTransactionUses2PC; +} + + void InitializeTransactionManagement(void) { diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index f6bac9100..9aac8b826 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -112,6 +112,7 @@ extern bool TransactionModifiedNodeMetadata; extern void UseCoordinatedTransaction(void); extern bool InCoordinatedTransaction(void); extern void CoordinatedTransactionUse2PC(void); +extern bool GetCoordinatedTransactionUses2PC(void); extern bool IsMultiStatementTransaction(void); extern void EnsureDistributedTransactionId(void); diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 1ba4570ba..e574493e6 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -895,6 +895,215 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in 0 (1 row) +-- a helper function which return true if the coordinated trannsaction uses +-- 2PC +CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_uses_2PC$$; +-- a local SELECT followed by remote SELECTs +-- does not trigger 2PC +BEGIN; + SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + y +--------------------------------------------------------------------- +(0 rows) + + WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +-- remote SELECTs followed by local SELECTs +-- does not trigger 2PC +BEGIN; + SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + y +--------------------------------------------------------------------- +(0 rows) + + WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +-- a local SELECT followed by a remote Modify +-- triggers 2PC +BEGIN; + SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + y +--------------------------------------------------------------------- +(0 rows) + + UPDATE test SET y = y +1; +NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1) + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a local modify followed by a remote SELECT +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a local modify followed by a remote MODIFY +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + UPDATE test SET y = y +1; +NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1) + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a local modify followed by a remote single shard MODIFY +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + INSERT INTO test VALUES (3,3); + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a remote single shard modify followed by a local single +-- shard MODIFY triggers 2PC +BEGIN; + INSERT INTO test VALUES (3,3); + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a remote single shard select followed by a local single +-- shard MODIFY triggers 2PC. But, the transaction manager +-- is smart enough to skip sending 2PC as the remote +-- command is read only +BEGIN; + SELECT count(*) FROM test WHERE x = 3; + count +--------------------------------------------------------------------- + 2 +(1 row) + + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + + SET LOCAL citus.log_remote_commands TO ON; +COMMIT; +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +-- a local single shard select followed by a remote single +-- shard modify does not trigger 2PC +BEGIN; + SELECT count(*) FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 5 +(1 row) + + INSERT INTO test VALUES (3,3); + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + f +(1 row) + + SET LOCAL citus.log_remote_commands TO ON; +COMMIT; +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx RESET client_min_messages; \set VERBOSITY terse DROP TABLE ref_table; @@ -906,7 +1115,7 @@ DROP TABLE ref; NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE DROP TABLE test_append_table; DROP SCHEMA coordinator_shouldhaveshards CASCADE; -NOTICE: drop cascades to 19 other objects +NOTICE: drop cascades to 20 other objects SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 3aea816bb..08f4b841c 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -1855,6 +1855,138 @@ NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and (1 row) RESET citus.local_copy_flush_threshold; +RESET citus.local_copy_flush_threshold; +CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_uses_2PC$$; +-- a multi-shard/single-shard select that is failed over to local +-- execution doesn't start a 2PC +BEGIN; + SELECT count(*) FROM another_schema_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630512 another_schema_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630513 another_schema_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630514 another_schema_table WHERE true + count +--------------------------------------------------------------------- + 10001 +(1 row) + + SELECT count(*) FROM another_schema_table WHERE a = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 1 +(1 row) + + WITH cte_1 as (SELECT * FROM another_schema_table LIMIT 10) + SELECT count(*) FROM cte_1; +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 + count +--------------------------------------------------------------------- + 10 +(1 row) + + WITH cte_1 as (SELECT * FROM another_schema_table WHERE a = 1 LIMIT 10) + SELECT count(*) FROM cte_1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT another_schema_table.a, another_schema_table.b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) LIMIT 10) cte_1 + count +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + f +(1 row) + +ROLLBACK; +-- same without a transaction block +WITH cte_1 AS (SELECT count(*) as cnt FROM another_schema_table LIMIT 1000), + cte_2 AS (SELECT coordinated_transaction_uses_2PC() as enabled_2pc) +SELECT cnt, enabled_2pc FROM cte_1, cte_2; +NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT cte_1.cnt, cte_2.enabled_2pc FROM (SELECT intermediate_result.cnt FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(cnt bigint)) cte_1, (SELECT intermediate_result.enabled_2pc FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(enabled_2pc boolean)) cte_2 + cnt | enabled_2pc +--------------------------------------------------------------------- + 10001 | f +(1 row) + +-- a multi-shard modification that is failed over to local +-- execution starts a 2PC +BEGIN; + UPDATE another_schema_table SET b = b + 1; +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- a multi-shard modification that is failed over to local +-- execution starts a 2PC +BEGIN; + WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) + SELECT count(*) FROM cte_1; +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 + count +--------------------------------------------------------------------- + 10001 +(1 row) + + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same without transaction block +WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) +SELECT coordinated_transaction_uses_2PC(); +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: SELECT single_node.coordinated_transaction_uses_2pc() AS coordinated_transaction_uses_2pc + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +-- a single-shard modification that is failed over to local +-- starts 2PC execution +BEGIN; + UPDATE another_schema_table SET b = b + 1 WHERE a = 1; +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) WHERE (a OPERATOR(pg_catalog.=) 1) + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same without transaction block +WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *) +SELECT coordinated_transaction_uses_2PC() FROM cte_1; +NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (another_schema_table.b OPERATOR(pg_catalog.+) 1) WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) RETURNING another_schema_table.a, another_schema_table.b) SELECT single_node.coordinated_transaction_uses_2pc() AS coordinated_transaction_uses_2pc FROM cte_1 +ERROR: The transaction is not a coordinated transaction -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 8dd0b197c..cdd52304f 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -372,9 +372,93 @@ inserts AS ( RETURNING * ) SELECT count(*) FROM inserts; +-- a helper function which return true if the coordinated trannsaction uses +-- 2PC +CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_uses_2PC$$; + +-- a local SELECT followed by remote SELECTs +-- does not trigger 2PC +BEGIN; + SELECT y FROM test WHERE x = 1; + WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; + SELECT count(*) FROM test; + WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; + SELECT coordinated_transaction_uses_2PC(); +COMMIT; + +-- remote SELECTs followed by local SELECTs +-- does not trigger 2PC +BEGIN; + SELECT count(*) FROM test; + WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; + SELECT y FROM test WHERE x = 1; + WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; + SELECT coordinated_transaction_uses_2PC(); +COMMIT; + +-- a local SELECT followed by a remote Modify +-- triggers 2PC +BEGIN; + SELECT y FROM test WHERE x = 1; + UPDATE test SET y = y +1; + SELECT coordinated_transaction_uses_2PC(); +COMMIT; + +-- a local modify followed by a remote SELECT +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); + SELECT count(*) FROM test; + SELECT coordinated_transaction_uses_2PC(); +COMMIT; + +-- a local modify followed by a remote MODIFY +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); + UPDATE test SET y = y +1; + SELECT coordinated_transaction_uses_2PC(); +COMMIT; + +-- a local modify followed by a remote single shard MODIFY +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); + INSERT INTO test VALUES (3,3); + SELECT coordinated_transaction_uses_2PC(); +COMMIT; + +-- a remote single shard modify followed by a local single +-- shard MODIFY triggers 2PC +BEGIN; + INSERT INTO test VALUES (3,3); + INSERT INTO test VALUES (1,1); + SELECT coordinated_transaction_uses_2PC(); +COMMIT; + +-- a remote single shard select followed by a local single +-- shard MODIFY triggers 2PC. But, the transaction manager +-- is smart enough to skip sending 2PC as the remote +-- command is read only +BEGIN; + SELECT count(*) FROM test WHERE x = 3; + INSERT INTO test VALUES (1,1); + SELECT coordinated_transaction_uses_2PC(); + SET LOCAL citus.log_remote_commands TO ON; +COMMIT; + +-- a local single shard select followed by a remote single +-- shard modify does not trigger 2PC +BEGIN; + SELECT count(*) FROM test WHERE x = 1; + INSERT INTO test VALUES (3,3); + SELECT coordinated_transaction_uses_2PC(); + SET LOCAL citus.log_remote_commands TO ON; +COMMIT; + RESET client_min_messages; - - \set VERBOSITY terse DROP TABLE ref_table; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 6640c97b2..9089fb717 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -906,6 +906,60 @@ WITH cte_1 AS SELECT bool_and(z is null) FROM cte_1; RESET citus.local_copy_flush_threshold; + +RESET citus.local_copy_flush_threshold; + +CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_uses_2PC$$; + +-- a multi-shard/single-shard select that is failed over to local +-- execution doesn't start a 2PC +BEGIN; + SELECT count(*) FROM another_schema_table; + SELECT count(*) FROM another_schema_table WHERE a = 1; + WITH cte_1 as (SELECT * FROM another_schema_table LIMIT 10) + SELECT count(*) FROM cte_1; + WITH cte_1 as (SELECT * FROM another_schema_table WHERE a = 1 LIMIT 10) + SELECT count(*) FROM cte_1; + SELECT coordinated_transaction_uses_2PC(); +ROLLBACK; + +-- same without a transaction block +WITH cte_1 AS (SELECT count(*) as cnt FROM another_schema_table LIMIT 1000), + cte_2 AS (SELECT coordinated_transaction_uses_2PC() as enabled_2pc) +SELECT cnt, enabled_2pc FROM cte_1, cte_2; + +-- a multi-shard modification that is failed over to local +-- execution starts a 2PC +BEGIN; + UPDATE another_schema_table SET b = b + 1; + SELECT coordinated_transaction_uses_2PC(); +ROLLBACK; + +-- a multi-shard modification that is failed over to local +-- execution starts a 2PC +BEGIN; + WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) + SELECT count(*) FROM cte_1; + SELECT coordinated_transaction_uses_2PC(); +ROLLBACK; + +-- same without transaction block +WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) +SELECT coordinated_transaction_uses_2PC(); + +-- a single-shard modification that is failed over to local +-- starts 2PC execution +BEGIN; + UPDATE another_schema_table SET b = b + 1 WHERE a = 1; + SELECT coordinated_transaction_uses_2PC(); +ROLLBACK; + +-- same without transaction block +WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *) +SELECT coordinated_transaction_uses_2PC() FROM cte_1; + -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false;