From 6a7ed7b309caca3688c51d28ca4fdf1f165398fd Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 10 Mar 2021 11:23:27 +0100 Subject: [PATCH 1/2] Do not trigger 2PC for reads on local execution Before this commit, Citus used 2PC no matter what kind of local query execution happens. For example, if the coordinator has shards (and the workers as well), even a simple SELECT query could start 2PC: ```SQL WITH cte_1 AS (SELECT * FROM test LIMIT 10) SELECT count(*) FROM cte_1; ``` In this query, the local execution of the shards (and also intermediate result reads) triggers the 2PC. To prevent that, Citus now distinguishes local reads and local writes. And, Citus switches to 2PC only if a modification happens. This may still lead to unnecessary 2PCs when there is a local modification and remote SELECTs only. Though, we handle that separately via #4587. --- .../distributed/executor/adaptive_executor.c | 17 -- .../distributed/executor/local_executor.c | 11 + src/backend/distributed/test/xact_stats.c | 22 ++ .../transaction/transaction_management.c | 11 + .../distributed/transaction_management.h | 1 + .../expected/coordinator_shouldhaveshards.out | 211 +++++++++++++++++- src/test/regress/expected/single_node.out | 132 +++++++++++ .../sql/coordinator_shouldhaveshards.sql | 88 +++++++- src/test/regress/sql/single_node.sql | 54 +++++ 9 files changed, 527 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 780b0201e..0ea04b3e2 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1165,23 +1165,6 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, return xactProperties; } - if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) - { - /* - * In case localExecutionHappened, we force the executor to use 2PC. - * The primary motivation is that at this point we're definitely expanding - * the nodes participated in the transaction. And, by re-generating the - * remote task lists during local query execution, we might prevent the adaptive - * executor to kick-in 2PC (or even start coordinated transaction, that's why - * we prefer adding this check here instead of - * Activate2PCIfModifyingTransactionExpandsToNewNode()). - */ - xactProperties.errorOnAnyFailure = true; - xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED; - xactProperties.requires2PC = true; - return xactProperties; - } - if (DistributedExecutionRequiresRollback(taskList)) { /* transaction blocks are required if the task list needs to roll back */ diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index c1ace895f..c8214c886 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -236,6 +236,17 @@ ExecuteLocalTaskListExtended(List *taskList, { SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); } + + if (!ReadOnlyTask(task->taskType)) + { + /* + * Any modification on the local execution should enable 2PC. If remote + * queries are also ReadOnly, our 2PC logic is smart enough to skip sending + * PREPARE to those connections. + */ + CoordinatedTransactionUse2PC(); + } + LogLocalCommand(task); if (isUtilityCommand) diff --git a/src/backend/distributed/test/xact_stats.c b/src/backend/distributed/test/xact_stats.c index b157ea27a..e74b65b74 100644 --- a/src/backend/distributed/test/xact_stats.c +++ b/src/backend/distributed/test/xact_stats.c @@ -18,9 +18,14 @@ #include "miscadmin.h" #include "pgstat.h" +#include "distributed/transaction_management.h" + + static Size MemoryContextTotalSpace(MemoryContext context); PG_FUNCTION_INFO_V1(top_transaction_context_size); +PG_FUNCTION_INFO_V1(coordinated_transaction_uses_2PC); + /* * top_transaction_context_size returns current size of TopTransactionContext. @@ -54,3 +59,20 @@ MemoryContextTotalSpace(MemoryContext context) return totalSpace; } + + +/* + * coordinated_transaction_uses_2PC returns true if the transaction is in a + * coordinated transaction and uses 2PC. If the transaction is nott in a + * coordinated transaction, the function throws an error. + */ +Datum +coordinated_transaction_uses_2PC(PG_FUNCTION_ARGS) +{ + if (!InCoordinatedTransaction()) + { + ereport(ERROR, (errmsg("The transaction is not a coordinated transaction"))); + } + + PG_RETURN_BOOL(GetCoordinatedTransactionUses2PC()); +} diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 96a4180a4..3c74885ac 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -195,6 +195,17 @@ CoordinatedTransactionUse2PC(void) } +/* + * GetCoordinatedTransactionUses2PC is a wrapper function to read the value + * of CoordinatedTransactionUses2PC. + */ +bool +GetCoordinatedTransactionUses2PC(void) +{ + return CoordinatedTransactionUses2PC; +} + + void InitializeTransactionManagement(void) { diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index f6bac9100..9aac8b826 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -112,6 +112,7 @@ extern bool TransactionModifiedNodeMetadata; extern void UseCoordinatedTransaction(void); extern bool InCoordinatedTransaction(void); extern void CoordinatedTransactionUse2PC(void); +extern bool GetCoordinatedTransactionUses2PC(void); extern bool IsMultiStatementTransaction(void); extern void EnsureDistributedTransactionId(void); diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 1ba4570ba..e574493e6 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -895,6 +895,215 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in 0 (1 row) +-- a helper function which return true if the coordinated trannsaction uses +-- 2PC +CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_uses_2PC$$; +-- a local SELECT followed by remote SELECTs +-- does not trigger 2PC +BEGIN; + SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + y +--------------------------------------------------------------------- +(0 rows) + + WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +-- remote SELECTs followed by local SELECTs +-- does not trigger 2PC +BEGIN; + SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + y +--------------------------------------------------------------------- +(0 rows) + + WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +-- a local SELECT followed by a remote Modify +-- triggers 2PC +BEGIN; + SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + y +--------------------------------------------------------------------- +(0 rows) + + UPDATE test SET y = y +1; +NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1) + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a local modify followed by a remote SELECT +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true + count +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a local modify followed by a remote MODIFY +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + UPDATE test SET y = y +1; +NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1) + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a local modify followed by a remote single shard MODIFY +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + INSERT INTO test VALUES (3,3); + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a remote single shard modify followed by a local single +-- shard MODIFY triggers 2PC +BEGIN; + INSERT INTO test VALUES (3,3); + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +-- a remote single shard select followed by a local single +-- shard MODIFY triggers 2PC. But, the transaction manager +-- is smart enough to skip sending 2PC as the remote +-- command is read only +BEGIN; + SELECT count(*) FROM test WHERE x = 3; + count +--------------------------------------------------------------------- + 2 +(1 row) + + INSERT INTO test VALUES (1,1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + + SET LOCAL citus.log_remote_commands TO ON; +COMMIT; +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +-- a local single shard select followed by a remote single +-- shard modify does not trigger 2PC +BEGIN; + SELECT count(*) FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 5 +(1 row) + + INSERT INTO test VALUES (3,3); + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + f +(1 row) + + SET LOCAL citus.log_remote_commands TO ON; +COMMIT; +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx RESET client_min_messages; \set VERBOSITY terse DROP TABLE ref_table; @@ -906,7 +1115,7 @@ DROP TABLE ref; NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE DROP TABLE test_append_table; DROP SCHEMA coordinator_shouldhaveshards CASCADE; -NOTICE: drop cascades to 19 other objects +NOTICE: drop cascades to 20 other objects SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 3aea816bb..08f4b841c 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -1855,6 +1855,138 @@ NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and (1 row) RESET citus.local_copy_flush_threshold; +RESET citus.local_copy_flush_threshold; +CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_uses_2PC$$; +-- a multi-shard/single-shard select that is failed over to local +-- execution doesn't start a 2PC +BEGIN; + SELECT count(*) FROM another_schema_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630512 another_schema_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630513 another_schema_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630514 another_schema_table WHERE true + count +--------------------------------------------------------------------- + 10001 +(1 row) + + SELECT count(*) FROM another_schema_table WHERE a = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 1 +(1 row) + + WITH cte_1 as (SELECT * FROM another_schema_table LIMIT 10) + SELECT count(*) FROM cte_1; +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 + count +--------------------------------------------------------------------- + 10 +(1 row) + + WITH cte_1 as (SELECT * FROM another_schema_table WHERE a = 1 LIMIT 10) + SELECT count(*) FROM cte_1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT another_schema_table.a, another_schema_table.b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) LIMIT 10) cte_1 + count +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + f +(1 row) + +ROLLBACK; +-- same without a transaction block +WITH cte_1 AS (SELECT count(*) as cnt FROM another_schema_table LIMIT 1000), + cte_2 AS (SELECT coordinated_transaction_uses_2PC() as enabled_2pc) +SELECT cnt, enabled_2pc FROM cte_1, cte_2; +NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT cte_1.cnt, cte_2.enabled_2pc FROM (SELECT intermediate_result.cnt FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(cnt bigint)) cte_1, (SELECT intermediate_result.enabled_2pc FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(enabled_2pc boolean)) cte_2 + cnt | enabled_2pc +--------------------------------------------------------------------- + 10001 | f +(1 row) + +-- a multi-shard modification that is failed over to local +-- execution starts a 2PC +BEGIN; + UPDATE another_schema_table SET b = b + 1; +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- a multi-shard modification that is failed over to local +-- execution starts a 2PC +BEGIN; + WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) + SELECT count(*) FROM cte_1; +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 + count +--------------------------------------------------------------------- + 10001 +(1 row) + + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same without transaction block +WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) +SELECT coordinated_transaction_uses_2PC(); +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b +NOTICE: executing the command locally: SELECT single_node.coordinated_transaction_uses_2pc() AS coordinated_transaction_uses_2pc + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +-- a single-shard modification that is failed over to local +-- starts 2PC execution +BEGIN; + UPDATE another_schema_table SET b = b + 1 WHERE a = 1; +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) WHERE (a OPERATOR(pg_catalog.=) 1) + SELECT coordinated_transaction_uses_2PC(); + coordinated_transaction_uses_2pc +--------------------------------------------------------------------- + t +(1 row) + +ROLLBACK; +-- same without transaction block +WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *) +SELECT coordinated_transaction_uses_2PC() FROM cte_1; +NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (another_schema_table.b OPERATOR(pg_catalog.+) 1) WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) RETURNING another_schema_table.a, another_schema_table.b) SELECT single_node.coordinated_transaction_uses_2pc() AS coordinated_transaction_uses_2pc FROM cte_1 +ERROR: The transaction is not a coordinated transaction -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 8dd0b197c..cdd52304f 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -372,9 +372,93 @@ inserts AS ( RETURNING * ) SELECT count(*) FROM inserts; +-- a helper function which return true if the coordinated trannsaction uses +-- 2PC +CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_uses_2PC$$; + +-- a local SELECT followed by remote SELECTs +-- does not trigger 2PC +BEGIN; + SELECT y FROM test WHERE x = 1; + WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; + SELECT count(*) FROM test; + WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; + SELECT coordinated_transaction_uses_2PC(); +COMMIT; + +-- remote SELECTs followed by local SELECTs +-- does not trigger 2PC +BEGIN; + SELECT count(*) FROM test; + WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; + SELECT y FROM test WHERE x = 1; + WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; + SELECT coordinated_transaction_uses_2PC(); +COMMIT; + +-- a local SELECT followed by a remote Modify +-- triggers 2PC +BEGIN; + SELECT y FROM test WHERE x = 1; + UPDATE test SET y = y +1; + SELECT coordinated_transaction_uses_2PC(); +COMMIT; + +-- a local modify followed by a remote SELECT +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); + SELECT count(*) FROM test; + SELECT coordinated_transaction_uses_2PC(); +COMMIT; + +-- a local modify followed by a remote MODIFY +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); + UPDATE test SET y = y +1; + SELECT coordinated_transaction_uses_2PC(); +COMMIT; + +-- a local modify followed by a remote single shard MODIFY +-- triggers 2PC +BEGIN; + INSERT INTO test VALUES (1,1); + INSERT INTO test VALUES (3,3); + SELECT coordinated_transaction_uses_2PC(); +COMMIT; + +-- a remote single shard modify followed by a local single +-- shard MODIFY triggers 2PC +BEGIN; + INSERT INTO test VALUES (3,3); + INSERT INTO test VALUES (1,1); + SELECT coordinated_transaction_uses_2PC(); +COMMIT; + +-- a remote single shard select followed by a local single +-- shard MODIFY triggers 2PC. But, the transaction manager +-- is smart enough to skip sending 2PC as the remote +-- command is read only +BEGIN; + SELECT count(*) FROM test WHERE x = 3; + INSERT INTO test VALUES (1,1); + SELECT coordinated_transaction_uses_2PC(); + SET LOCAL citus.log_remote_commands TO ON; +COMMIT; + +-- a local single shard select followed by a remote single +-- shard modify does not trigger 2PC +BEGIN; + SELECT count(*) FROM test WHERE x = 1; + INSERT INTO test VALUES (3,3); + SELECT coordinated_transaction_uses_2PC(); + SET LOCAL citus.log_remote_commands TO ON; +COMMIT; + RESET client_min_messages; - - \set VERBOSITY terse DROP TABLE ref_table; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 6640c97b2..9089fb717 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -906,6 +906,60 @@ WITH cte_1 AS SELECT bool_and(z is null) FROM cte_1; RESET citus.local_copy_flush_threshold; + +RESET citus.local_copy_flush_threshold; + +CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_uses_2PC$$; + +-- a multi-shard/single-shard select that is failed over to local +-- execution doesn't start a 2PC +BEGIN; + SELECT count(*) FROM another_schema_table; + SELECT count(*) FROM another_schema_table WHERE a = 1; + WITH cte_1 as (SELECT * FROM another_schema_table LIMIT 10) + SELECT count(*) FROM cte_1; + WITH cte_1 as (SELECT * FROM another_schema_table WHERE a = 1 LIMIT 10) + SELECT count(*) FROM cte_1; + SELECT coordinated_transaction_uses_2PC(); +ROLLBACK; + +-- same without a transaction block +WITH cte_1 AS (SELECT count(*) as cnt FROM another_schema_table LIMIT 1000), + cte_2 AS (SELECT coordinated_transaction_uses_2PC() as enabled_2pc) +SELECT cnt, enabled_2pc FROM cte_1, cte_2; + +-- a multi-shard modification that is failed over to local +-- execution starts a 2PC +BEGIN; + UPDATE another_schema_table SET b = b + 1; + SELECT coordinated_transaction_uses_2PC(); +ROLLBACK; + +-- a multi-shard modification that is failed over to local +-- execution starts a 2PC +BEGIN; + WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) + SELECT count(*) FROM cte_1; + SELECT coordinated_transaction_uses_2PC(); +ROLLBACK; + +-- same without transaction block +WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) +SELECT coordinated_transaction_uses_2PC(); + +-- a single-shard modification that is failed over to local +-- starts 2PC execution +BEGIN; + UPDATE another_schema_table SET b = b + 1 WHERE a = 1; + SELECT coordinated_transaction_uses_2PC(); +ROLLBACK; + +-- same without transaction block +WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *) +SELECT coordinated_transaction_uses_2PC() FROM cte_1; + -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; From e65e72130d119ade5b118dbb28e77959ede1be0e Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 10 Mar 2021 19:52:05 +0100 Subject: [PATCH 2/2] Rename use -> shouldUse Because setting the flag doesn't necessarily mean that we'll use 2PC. If connections are read-only, we will not use 2PC. In other words, we'll use 2PC only for connections that modified any placements. --- src/backend/distributed/commands/multi_copy.c | 2 +- .../distributed/executor/adaptive_executor.c | 4 +- .../distributed/executor/local_executor.c | 2 +- .../distributed/operations/delete_protocol.c | 2 +- src/backend/distributed/test/xact_stats.c | 8 ++-- .../transaction/transaction_management.c | 34 +++++++++----- .../transaction/worker_transaction.c | 4 +- .../distributed/transaction_management.h | 4 +- .../expected/coordinator_shouldhaveshards.out | 44 +++++++++---------- src/test/regress/expected/single_node.out | 34 +++++++------- .../sql/coordinator_shouldhaveshards.sql | 26 +++++------ src/test/regress/sql/single_node.sql | 18 ++++---- 12 files changed, 96 insertions(+), 86 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index c584ac96f..8ba97f14a 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2244,7 +2244,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC || MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) { - CoordinatedTransactionUse2PC(); + CoordinatedTransactionShouldUse2PC(); } /* define how tuples will be serialised */ diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 0ea04b3e2..5d2908bd8 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1223,7 +1223,7 @@ StartDistributedExecution(DistributedExecution *execution) if (xactProperties->requires2PC) { - CoordinatedTransactionUse2PC(); + CoordinatedTransactionShouldUse2PC(); } /* @@ -3180,7 +3180,7 @@ Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session) * just opened, which means we're now going to make modifications * over multiple connections. Activate 2PC! */ - CoordinatedTransactionUse2PC(); + CoordinatedTransactionShouldUse2PC(); } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index c8214c886..0a6cfbbd4 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -244,7 +244,7 @@ ExecuteLocalTaskListExtended(List *taskList, * queries are also ReadOnly, our 2PC logic is smart enough to skip sending * PREPARE to those connections. */ - CoordinatedTransactionUse2PC(); + CoordinatedTransactionShouldUse2PC(); } LogLocalCommand(task); diff --git a/src/backend/distributed/operations/delete_protocol.c b/src/backend/distributed/operations/delete_protocol.c index eda26ec3d..84a0bc9a9 100644 --- a/src/backend/distributed/operations/delete_protocol.c +++ b/src/backend/distributed/operations/delete_protocol.c @@ -332,7 +332,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName, */ if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) { - CoordinatedTransactionUse2PC(); + CoordinatedTransactionShouldUse2PC(); } List *dropTaskList = DropTaskList(relationId, schemaName, relationName, diff --git a/src/backend/distributed/test/xact_stats.c b/src/backend/distributed/test/xact_stats.c index e74b65b74..05723aa35 100644 --- a/src/backend/distributed/test/xact_stats.c +++ b/src/backend/distributed/test/xact_stats.c @@ -24,7 +24,7 @@ static Size MemoryContextTotalSpace(MemoryContext context); PG_FUNCTION_INFO_V1(top_transaction_context_size); -PG_FUNCTION_INFO_V1(coordinated_transaction_uses_2PC); +PG_FUNCTION_INFO_V1(coordinated_transaction_should_use_2PC); /* @@ -62,17 +62,17 @@ MemoryContextTotalSpace(MemoryContext context) /* - * coordinated_transaction_uses_2PC returns true if the transaction is in a + * coordinated_transaction_should_use_2PC returns true if the transaction is in a * coordinated transaction and uses 2PC. If the transaction is nott in a * coordinated transaction, the function throws an error. */ Datum -coordinated_transaction_uses_2PC(PG_FUNCTION_ARGS) +coordinated_transaction_should_use_2PC(PG_FUNCTION_ARGS) { if (!InCoordinatedTransaction()) { ereport(ERROR, (errmsg("The transaction is not a coordinated transaction"))); } - PG_RETURN_BOOL(GetCoordinatedTransactionUses2PC()); + PG_RETURN_BOOL(GetCoordinatedTransactionShouldUse2PC()); } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 3c74885ac..7bb16e441 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -96,9 +96,16 @@ MemoryContext CommitContext = NULL; /* * Should this coordinated transaction use 2PC? Set by * CoordinatedTransactionUse2PC(), e.g. if DDL was issued and - * MultiShardCommitProtocol was set to 2PC. + * MultiShardCommitProtocol was set to 2PC. But, even if this + * flag is set, the transaction manager is smart enough to only + * do 2PC on the remote connections that did a modification. + * + * As a variable name ShouldCoordinatedTransactionUse2PC could + * be improved. We use CoordinatedTransactionShouldUse2PC() as the + * public API function, hence couldn't come up with a better name + * for the underlying variable at the moment. */ -bool CoordinatedTransactionUses2PC = false; +bool ShouldCoordinatedTransactionUse2PC = false; /* if disabled, distributed statements in a function may run as separate transactions */ bool FunctionOpensTransactionBlock = true; @@ -183,26 +190,29 @@ InCoordinatedTransaction(void) /* - * CoordinatedTransactionUse2PC() signals that the current coordinated + * CoordinatedTransactionShouldUse2PC() signals that the current coordinated * transaction should use 2PC to commit. + * + * Note that even if 2PC is enabled, it is only used for connections that make + * modification (DML or DDL). */ void -CoordinatedTransactionUse2PC(void) +CoordinatedTransactionShouldUse2PC(void) { Assert(InCoordinatedTransaction()); - CoordinatedTransactionUses2PC = true; + ShouldCoordinatedTransactionUse2PC = true; } /* - * GetCoordinatedTransactionUses2PC is a wrapper function to read the value - * of CoordinatedTransactionUses2PC. + * GetCoordinatedTransactionShouldUse2PC is a wrapper function to read the value + * of CoordinatedTransactionShouldUse2PCFlag. */ bool -GetCoordinatedTransactionUses2PC(void) +GetCoordinatedTransactionShouldUse2PC(void) { - return CoordinatedTransactionUses2PC; + return ShouldCoordinatedTransactionUse2PC; } @@ -436,7 +446,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) */ MarkFailedShardPlacements(); - if (CoordinatedTransactionUses2PC) + if (ShouldCoordinatedTransactionUse2PC) { CoordinatedRemoteTransactionsPrepare(); CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; @@ -464,7 +474,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * Check again whether shards/placement successfully * committed. This handles failure at COMMIT/PREPARE time. */ - PostCommitMarkFailedShardPlacements(CoordinatedTransactionUses2PC); + PostCommitMarkFailedShardPlacements(ShouldCoordinatedTransactionUse2PC); break; } @@ -496,7 +506,7 @@ ResetGlobalVariables() FreeSavedExplainPlan(); dlist_init(&InProgressTransactions); activeSetStmts = NULL; - CoordinatedTransactionUses2PC = false; + ShouldCoordinatedTransactionUse2PC = false; TransactionModifiedNodeMetadata = false; MetadataSyncOnCommit = false; ResetWorkerErrorIndication(); diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index bf65d1e04..0f3c3222d 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -96,7 +96,7 @@ SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *node uint32 connectionFlags = 0; UseCoordinatedTransaction(); - CoordinatedTransactionUse2PC(); + CoordinatedTransactionShouldUse2PC(); MultiConnection *transactionConnection = GetNodeUserDatabaseConnection( connectionFlags, nodeName, @@ -404,7 +404,7 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char * List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock); UseCoordinatedTransaction(); - CoordinatedTransactionUse2PC(); + CoordinatedTransactionShouldUse2PC(); /* open connections in parallel */ WorkerNode *workerNode = NULL; diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 9aac8b826..ecee5eeee 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -111,8 +111,8 @@ extern bool TransactionModifiedNodeMetadata; */ extern void UseCoordinatedTransaction(void); extern bool InCoordinatedTransaction(void); -extern void CoordinatedTransactionUse2PC(void); -extern bool GetCoordinatedTransactionUses2PC(void); +extern void CoordinatedTransactionShouldUse2PC(void); +extern bool GetCoordinatedTransactionShouldUse2PC(void); extern bool IsMultiStatementTransaction(void); extern void EnsureDistributedTransactionId(void); diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index e574493e6..6a25138c6 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -895,11 +895,11 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in 0 (1 row) --- a helper function which return true if the coordinated trannsaction uses --- 2PC -CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC() +-- a helper function which return true if the coordinated +-- trannsaction uses 2PC +CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', -$$coordinated_transaction_uses_2PC$$; +$$coordinated_transaction_should_use_2PC$$; -- a local SELECT followed by remote SELECTs -- does not trigger 2PC BEGIN; @@ -933,8 +933,8 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato 0 (1 row) - SELECT coordinated_transaction_uses_2PC(); - coordinated_transaction_uses_2pc + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc --------------------------------------------------------------------- f (1 row) @@ -973,8 +973,8 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato 0 (1 row) - SELECT coordinated_transaction_uses_2PC(); - coordinated_transaction_uses_2pc + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc --------------------------------------------------------------------- f (1 row) @@ -992,8 +992,8 @@ NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshar UPDATE test SET y = y +1; NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1) NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1) - SELECT coordinated_transaction_uses_2PC(); - coordinated_transaction_uses_2pc + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) @@ -1012,8 +1012,8 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato 1 (1 row) - SELECT coordinated_transaction_uses_2PC(); - coordinated_transaction_uses_2pc + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) @@ -1027,8 +1027,8 @@ NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards UPDATE test SET y = y +1; NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503000 test SET y = (y OPERATOR(pg_catalog.+) 1) NOTICE: executing the command locally: UPDATE coordinator_shouldhaveshards.test_1503003 test SET y = (y OPERATOR(pg_catalog.+) 1) - SELECT coordinated_transaction_uses_2PC(); - coordinated_transaction_uses_2pc + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) @@ -1040,8 +1040,8 @@ BEGIN; INSERT INTO test VALUES (1,1); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) INSERT INTO test VALUES (3,3); - SELECT coordinated_transaction_uses_2PC(); - coordinated_transaction_uses_2pc + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) @@ -1053,8 +1053,8 @@ BEGIN; INSERT INTO test VALUES (3,3); INSERT INTO test VALUES (1,1); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) - SELECT coordinated_transaction_uses_2PC(); - coordinated_transaction_uses_2pc + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) @@ -1073,8 +1073,8 @@ BEGIN; INSERT INTO test VALUES (1,1); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) - SELECT coordinated_transaction_uses_2PC(); - coordinated_transaction_uses_2pc + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) @@ -1094,8 +1094,8 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato (1 row) INSERT INTO test VALUES (3,3); - SELECT coordinated_transaction_uses_2PC(); - coordinated_transaction_uses_2pc + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc --------------------------------------------------------------------- f (1 row) diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 08f4b841c..599743550 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -705,7 +705,7 @@ USING HASH AS operator 1 =, function 2 part_hashint4_noop(int4, int8); CREATE TABLE hash_parted ( - a int, + a int, b int ) PARTITION BY HASH (a part_test_int4_ops); CREATE TABLE hpart0 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 0); @@ -1856,9 +1856,9 @@ NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and RESET citus.local_copy_flush_threshold; RESET citus.local_copy_flush_threshold; -CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC() +CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', -$$coordinated_transaction_uses_2PC$$; +$$coordinated_transaction_should_use_2PC$$; -- a multi-shard/single-shard select that is failed over to local -- execution doesn't start a 2PC BEGIN; @@ -1899,8 +1899,8 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT an 1 (1 row) - SELECT coordinated_transaction_uses_2PC(); - coordinated_transaction_uses_2pc + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc --------------------------------------------------------------------- f (1 row) @@ -1908,7 +1908,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT an ROLLBACK; -- same without a transaction block WITH cte_1 AS (SELECT count(*) as cnt FROM another_schema_table LIMIT 1000), - cte_2 AS (SELECT coordinated_transaction_uses_2PC() as enabled_2pc) + cte_2 AS (SELECT coordinated_transaction_should_use_2PC() as enabled_2pc) SELECT cnt, enabled_2pc FROM cte_1, cte_2; NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '1000'::bigint NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '1000'::bigint @@ -1928,8 +1928,8 @@ NOTICE: executing the command locally: UPDATE single_node.another_schema_table_ NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) - SELECT coordinated_transaction_uses_2PC(); - coordinated_transaction_uses_2pc + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) @@ -1950,8 +1950,8 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in 10001 (1 row) - SELECT coordinated_transaction_uses_2PC(); - coordinated_transaction_uses_2pc + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) @@ -1959,13 +1959,13 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in ROLLBACK; -- same without transaction block WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) -SELECT coordinated_transaction_uses_2PC(); +SELECT coordinated_transaction_should_use_2PC(); NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b -NOTICE: executing the command locally: SELECT single_node.coordinated_transaction_uses_2pc() AS coordinated_transaction_uses_2pc - coordinated_transaction_uses_2pc +NOTICE: executing the command locally: SELECT single_node.coordinated_transaction_should_use_2pc() AS coordinated_transaction_should_use_2pc + coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) @@ -1975,8 +1975,8 @@ NOTICE: executing the command locally: SELECT single_node.coordinated_transacti BEGIN; UPDATE another_schema_table SET b = b + 1 WHERE a = 1; NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) WHERE (a OPERATOR(pg_catalog.=) 1) - SELECT coordinated_transaction_uses_2PC(); - coordinated_transaction_uses_2pc + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) @@ -1984,8 +1984,8 @@ NOTICE: executing the command locally: UPDATE single_node.another_schema_table_ ROLLBACK; -- same without transaction block WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *) -SELECT coordinated_transaction_uses_2PC() FROM cte_1; -NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (another_schema_table.b OPERATOR(pg_catalog.+) 1) WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) RETURNING another_schema_table.a, another_schema_table.b) SELECT single_node.coordinated_transaction_uses_2pc() AS coordinated_transaction_uses_2pc FROM cte_1 +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; +NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = (another_schema_table.b OPERATOR(pg_catalog.+) 1) WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) RETURNING another_schema_table.a, another_schema_table.b) SELECT single_node.coordinated_transaction_should_use_2pc() AS coordinated_transaction_should_use_2pc FROM cte_1 ERROR: The transaction is not a coordinated transaction -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index cdd52304f..aeb2ccb24 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -372,11 +372,11 @@ inserts AS ( RETURNING * ) SELECT count(*) FROM inserts; --- a helper function which return true if the coordinated trannsaction uses --- 2PC -CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC() +-- a helper function which return true if the coordinated +-- trannsaction uses 2PC +CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', -$$coordinated_transaction_uses_2PC$$; +$$coordinated_transaction_should_use_2PC$$; -- a local SELECT followed by remote SELECTs -- does not trigger 2PC @@ -385,7 +385,7 @@ BEGIN; WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; SELECT count(*) FROM test; WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; - SELECT coordinated_transaction_uses_2PC(); + SELECT coordinated_transaction_should_use_2PC(); COMMIT; -- remote SELECTs followed by local SELECTs @@ -395,7 +395,7 @@ BEGIN; WITH cte_1 as (SELECT * FROM test LIMIT 5) SELECT count(*) FROM test; SELECT y FROM test WHERE x = 1; WITH cte_1 AS (SELECT y FROM test WHERE x = 1 LIMIT 5) SELECT count(*) FROM test; - SELECT coordinated_transaction_uses_2PC(); + SELECT coordinated_transaction_should_use_2PC(); COMMIT; -- a local SELECT followed by a remote Modify @@ -403,7 +403,7 @@ COMMIT; BEGIN; SELECT y FROM test WHERE x = 1; UPDATE test SET y = y +1; - SELECT coordinated_transaction_uses_2PC(); + SELECT coordinated_transaction_should_use_2PC(); COMMIT; -- a local modify followed by a remote SELECT @@ -411,7 +411,7 @@ COMMIT; BEGIN; INSERT INTO test VALUES (1,1); SELECT count(*) FROM test; - SELECT coordinated_transaction_uses_2PC(); + SELECT coordinated_transaction_should_use_2PC(); COMMIT; -- a local modify followed by a remote MODIFY @@ -419,7 +419,7 @@ COMMIT; BEGIN; INSERT INTO test VALUES (1,1); UPDATE test SET y = y +1; - SELECT coordinated_transaction_uses_2PC(); + SELECT coordinated_transaction_should_use_2PC(); COMMIT; -- a local modify followed by a remote single shard MODIFY @@ -427,7 +427,7 @@ COMMIT; BEGIN; INSERT INTO test VALUES (1,1); INSERT INTO test VALUES (3,3); - SELECT coordinated_transaction_uses_2PC(); + SELECT coordinated_transaction_should_use_2PC(); COMMIT; -- a remote single shard modify followed by a local single @@ -435,7 +435,7 @@ COMMIT; BEGIN; INSERT INTO test VALUES (3,3); INSERT INTO test VALUES (1,1); - SELECT coordinated_transaction_uses_2PC(); + SELECT coordinated_transaction_should_use_2PC(); COMMIT; -- a remote single shard select followed by a local single @@ -445,7 +445,7 @@ COMMIT; BEGIN; SELECT count(*) FROM test WHERE x = 3; INSERT INTO test VALUES (1,1); - SELECT coordinated_transaction_uses_2PC(); + SELECT coordinated_transaction_should_use_2PC(); SET LOCAL citus.log_remote_commands TO ON; COMMIT; @@ -454,7 +454,7 @@ COMMIT; BEGIN; SELECT count(*) FROM test WHERE x = 1; INSERT INTO test VALUES (3,3); - SELECT coordinated_transaction_uses_2PC(); + SELECT coordinated_transaction_should_use_2PC(); SET LOCAL citus.log_remote_commands TO ON; COMMIT; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 9089fb717..a1d498d44 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -909,9 +909,9 @@ RESET citus.local_copy_flush_threshold; RESET citus.local_copy_flush_threshold; -CREATE OR REPLACE FUNCTION coordinated_transaction_uses_2PC() +CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', -$$coordinated_transaction_uses_2PC$$; +$$coordinated_transaction_should_use_2PC$$; -- a multi-shard/single-shard select that is failed over to local -- execution doesn't start a 2PC @@ -922,19 +922,19 @@ BEGIN; SELECT count(*) FROM cte_1; WITH cte_1 as (SELECT * FROM another_schema_table WHERE a = 1 LIMIT 10) SELECT count(*) FROM cte_1; - SELECT coordinated_transaction_uses_2PC(); + SELECT coordinated_transaction_should_use_2PC(); ROLLBACK; -- same without a transaction block WITH cte_1 AS (SELECT count(*) as cnt FROM another_schema_table LIMIT 1000), - cte_2 AS (SELECT coordinated_transaction_uses_2PC() as enabled_2pc) + cte_2 AS (SELECT coordinated_transaction_should_use_2PC() as enabled_2pc) SELECT cnt, enabled_2pc FROM cte_1, cte_2; -- a multi-shard modification that is failed over to local -- execution starts a 2PC BEGIN; UPDATE another_schema_table SET b = b + 1; - SELECT coordinated_transaction_uses_2PC(); + SELECT coordinated_transaction_should_use_2PC(); ROLLBACK; -- a multi-shard modification that is failed over to local @@ -942,23 +942,23 @@ ROLLBACK; BEGIN; WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) SELECT count(*) FROM cte_1; - SELECT coordinated_transaction_uses_2PC(); + SELECT coordinated_transaction_should_use_2PC(); ROLLBACK; -- same without transaction block WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) -SELECT coordinated_transaction_uses_2PC(); +SELECT coordinated_transaction_should_use_2PC(); -- a single-shard modification that is failed over to local -- starts 2PC execution BEGIN; UPDATE another_schema_table SET b = b + 1 WHERE a = 1; - SELECT coordinated_transaction_uses_2PC(); + SELECT coordinated_transaction_should_use_2PC(); ROLLBACK; -- same without transaction block WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *) -SELECT coordinated_transaction_uses_2PC() FROM cte_1; +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail