diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 7c0426c0a..306b7f3e1 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -207,19 +207,6 @@ ExecuteLocalTaskListExtended(List *taskList, uint64 totalRowsProcessed = 0; ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); - /* - * Even if we are executing local tasks, we still enable - * coordinated transaction. This is because - * (a) we might be in a transaction, and the next commands may - * require coordinated transaction - * (b) we might be executing some tasks locally and the others - * via remote execution - * - * Also, there is no harm enabling coordinated transaction even if - * we only deal with local tasks in the transaction. - */ - UseCoordinatedTransaction(); - LocalExecutorLevel++; PG_TRY(); { diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 0337411cb..5b6c8d116 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -205,8 +205,17 @@ InCoordinatedTransaction(void) void Use2PCForCoordinatedTransaction(void) { - Assert(InCoordinatedTransaction()); - + /* + * If this transaction is also a coordinated + * transaction, use 2PC. Otherwise, this + * state change does nothing. + * + * In other words, when this flag is set, + * we "should" use 2PC when needed (e.g., + * we are in a coordinated transaction and + * the coordinated transaction does a remote + * modification). + */ ShouldCoordinatedTransactionUse2PC = true; } diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 5d29a4e71..98639a953 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -1983,6 +1983,17 @@ SELECT create_distributed_table('event_responses', 'event_id'); (1 row) INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no'); +CREATE TABLE event_responses_no_pkey ( + event_id int, + user_id int, + response invite_resp +); +SELECT create_distributed_table('event_responses_no_pkey', 'event_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + CREATE OR REPLACE FUNCTION regular_func(p invite_resp) RETURNS int AS $$ DECLARE @@ -2432,6 +2443,755 @@ DEBUG: Creating router plan 17 | 777 | no (2 rows) +-- set back to sane settings +RESET citus.enable_local_execution; +RESET citus.enable_fast_path_router_planner; +-- we'll test some 2PC states +SET citus.enable_metadata_sync TO OFF; +-- coordinated_transaction_should_use_2PC prints the internal +-- state for 2PC decision on Citus. However, even if 2PC is decided, +-- we may not necessarily use 2PC over a connection unless it does +-- a modification +CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_should_use_2PC$$; +-- make tests consistent +SET citus.max_adaptive_executor_pool_size TO 1; +RESET citus.enable_metadata_sync; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +SET citus.log_remote_commands TO ON; +-- we use event_id = 2 for local execution and event_id = 1 for reemote execution +--show it here, if anything changes here, all the tests below might be broken +-- we prefer this to avoid excessive logging below +SELECT * FROM event_responses_no_pkey WHERE event_id = 2; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 2 +NOTICE: executing the command locally: SELECT event_id, user_id, response FROM public.event_responses_no_pkey_1480007 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 2) + event_id | user_id | response +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM event_responses_no_pkey WHERE event_id = 1; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 1 +NOTICE: issuing SELECT event_id, user_id, response FROM public.event_responses_no_pkey_1480004 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 1) + event_id | user_id | response +--------------------------------------------------------------------- +(0 rows) + +RESET citus.log_remote_commands; +RESET citus.log_local_commands; +RESET client_min_messages; +-- single shard local command without transaction block does set the +-- internal state for 2PC, but does not require any actual entries +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two local commands without transaction block set the internal 2PC state +-- but does not use remotely +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local modification followed by another single shard +-- local modification sets the 2PC state, but does not use remotely +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local modification followed by a single shard +-- remote modification uses 2PC because multiple nodes involved +-- in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local modification followed by a single shard +-- remote modification uses 2PC even if it is not in an explicit +-- tx block as multiple nodes involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a single shard +-- local modification uses 2PC as multiple nodes involved +-- in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a single shard +-- local modification uses 2PC even if it is not in an explicit +-- tx block +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local SELECT command without transaction block does not set the +-- internal state for 2PC +WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; +ERROR: The transaction is not a coordinated transaction +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two local SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2) +SELECT count(*) FROM cte_1, cte_2; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two local SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 9 +(1 row) + + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 9 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- a local SELECT followed by a remote SELECT does not require to +-- use actual 2PC +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 9 +(1 row) + + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 13 +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + SELECT * FROM event_responses_no_pkey WHERE event_id = 2; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes +(9 rows) + + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a single shard +-- local SELECT does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 9 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a single shard +-- local SELECT does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- multi shard local SELECT command without transaction block does not set the +-- internal state for 2PC +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +BEGIN; + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 17 +(1 row) + + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 17 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- multi-shard shard SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 17 +(1 row) + + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- multi shard SELECT followed by a single shard +-- remote single shard modification does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a multi shard +-- SELECT does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 20 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a multi shard +-- SELECT does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local modification followed by remote multi-shard +-- modification uses 2PC as multiple nodes are involved in modifications +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (UPDATE event_responses_no_pkey SET user_id = 1000 RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- a local SELECT followed by a remote multi-shard UPDATE requires to +-- use actual 2PC as multiple nodes are involved in modifications +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 10 +(1 row) + + UPDATE event_responses_no_pkey SET user_id = 1; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- a local SELECT followed by a remote single-shard UPDATE does not require to +-- use actual 2PC. This is because a single node is involved in modification +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 10 +(1 row) + + UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- a remote single-shard UPDATE followed by a local single shard SELECT +-- does not require to use actual 2PC. This is because a single node +-- is involved in modification +BEGIN; + UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 10 +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + \c - - - :master_port -- verify the local_hostname guc is used for local executions that should connect to the -- local host diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index c854ec48a..5051bbee9 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -2231,15 +2231,6 @@ NOTICE: executing the command locally: UPDATE single_node.another_schema_table_ (1 row) ROLLBACK; --- same without transaction block -WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *) -SELECT coordinated_transaction_should_use_2PC() FROM cte_1; -NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.another_schema_table_90630515 another_schema_table SET b = (another_schema_table.b OPERATOR(pg_catalog.+) 1) WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) RETURNING another_schema_table.a, another_schema_table.b) SELECT single_node.coordinated_transaction_should_use_2pc() AS coordinated_transaction_should_use_2pc FROM cte_1 - coordinated_transaction_should_use_2pc ---------------------------------------------------------------------- - t -(1 row) - -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index ff5cccf11..a88b4ac0d 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -43,7 +43,9 @@ test: coordinator_evaluation_modify test: coordinator_evaluation_select test: multi_mx_call test: multi_mx_function_call_delegation -test: multi_mx_modifications local_shard_execution local_shard_execution_replicated +test: multi_mx_modifications local_shard_execution_replicated +# the following test has to be run sequentially +test: local_shard_execution test: multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: local_shard_copy test: undistribute_table_cascade_mx diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index b68863a7f..7a0fc8b8a 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -922,6 +922,17 @@ SELECT create_distributed_table('event_responses', 'event_id'); INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no'); + +CREATE TABLE event_responses_no_pkey ( + event_id int, + user_id int, + response invite_resp +); + +SELECT create_distributed_table('event_responses_no_pkey', 'event_id'); + + + CREATE OR REPLACE FUNCTION regular_func(p invite_resp) RETURNS int AS $$ DECLARE @@ -1114,6 +1125,281 @@ INSERT INTO event_responses VALUES (16, 666, 'maybe'), (17, 777, 'no') ON CONFLICT (event_id, user_id) DO UPDATE SET response = EXCLUDED.response RETURNING *; +-- set back to sane settings +RESET citus.enable_local_execution; +RESET citus.enable_fast_path_router_planner; + + +-- we'll test some 2PC states +SET citus.enable_metadata_sync TO OFF; + +-- coordinated_transaction_should_use_2PC prints the internal +-- state for 2PC decision on Citus. However, even if 2PC is decided, +-- we may not necessarily use 2PC over a connection unless it does +-- a modification +CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_should_use_2PC$$; + +-- make tests consistent +SET citus.max_adaptive_executor_pool_size TO 1; + +RESET citus.enable_metadata_sync; +SELECT recover_prepared_transactions(); + + +SET citus.log_remote_commands TO ON; + +-- we use event_id = 2 for local execution and event_id = 1 for reemote execution +--show it here, if anything changes here, all the tests below might be broken +-- we prefer this to avoid excessive logging below +SELECT * FROM event_responses_no_pkey WHERE event_id = 2; +SELECT * FROM event_responses_no_pkey WHERE event_id = 1; +RESET citus.log_remote_commands; +RESET citus.log_local_commands; +RESET client_min_messages; + +-- single shard local command without transaction block does set the +-- internal state for 2PC, but does not require any actual entries +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two local commands without transaction block set the internal 2PC state +-- but does not use remotely +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local modification followed by another single shard +-- local modification sets the 2PC state, but does not use remotely +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local modification followed by a single shard +-- remote modification uses 2PC because multiple nodes involved +-- in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local modification followed by a single shard +-- remote modification uses 2PC even if it is not in an explicit +-- tx block as multiple nodes involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + + +-- single shard remote modification followed by a single shard +-- local modification uses 2PC as multiple nodes involved +-- in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a single shard +-- local modification uses 2PC even if it is not in an explicit +-- tx block +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local SELECT command without transaction block does not set the +-- internal state for 2PC +WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two local SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2) +SELECT count(*) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two local SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- a local SELECT followed by a remote SELECT does not require to +-- use actual 2PC +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + SELECT count(*) FROM event_responses_no_pkey; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + SELECT * FROM event_responses_no_pkey WHERE event_id = 2; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a single shard +-- local SELECT does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a single shard +-- local SELECT does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- multi shard local SELECT command without transaction block does not set the +-- internal state for 2PC +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +BEGIN; + SELECT count(*) FROM event_responses_no_pkey; + SELECT count(*) FROM event_responses_no_pkey; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- multi-shard shard SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + SELECT count(*) FROM event_responses_no_pkey; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- multi shard SELECT followed by a single shard +-- remote single shard modification does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a multi shard +-- SELECT does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT count(*) FROM event_responses_no_pkey; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a multi shard +-- SELECT does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local modification followed by remote multi-shard +-- modification uses 2PC as multiple nodes are involved in modifications +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (UPDATE event_responses_no_pkey SET user_id = 1000 RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- a local SELECT followed by a remote multi-shard UPDATE requires to +-- use actual 2PC as multiple nodes are involved in modifications +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + UPDATE event_responses_no_pkey SET user_id = 1; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- a local SELECT followed by a remote single-shard UPDATE does not require to +-- use actual 2PC. This is because a single node is involved in modification +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- a remote single-shard UPDATE followed by a local single shard SELECT +-- does not require to use actual 2PC. This is because a single node +-- is involved in modification +BEGIN; + UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + \c - - - :master_port -- verify the local_hostname guc is used for local executions that should connect to the diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 74c857d4e..0e1714003 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -1102,10 +1102,6 @@ BEGIN; SELECT coordinated_transaction_should_use_2PC(); ROLLBACK; --- same without transaction block -WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *) -SELECT coordinated_transaction_should_use_2PC() FROM cte_1; - -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false;