Do not assign distributed transaction ids for local execution

In the past, for all modifications on the local execution,
we enabled 2PC (with 6a7ed7b309).

This also required us to enable coordinated transactions
via https://github.com/citusdata/citus/pull/4831 .

However, it does have a very substantial impact on the
distributed deadlock detection. The distributed deadlock
detection is designed to avoid single-statement transactions
because they cannot lead to any actual deadlocks.

The implementation is to skip backends without distributed
transactions are assigned. Now that we assign single
statement local executions in the lock graphs, we are
conflicting with the design of distributed deadlock
detection.

In general, we should fix it. However, one might
think that it is not a big deal, even if the processes
show up in the lock graphs, the deadlock detection
should not be causing any false positives. That is
false, unless https://github.com/citusdata/citus/issues/1803
is fixed. Now that local processes are considered as a single
distributed backend, the lock graphs might find:

    local execution 1 [tx id: 1] -> any local process [tx id: 0]
    any local process [tx id: 0] -> local execution 2 [tx id: 2]

And, decides that there is a distributed deadlock.

This commit is:
   (a) right thing to do, as local execuion should not need any
       distributed tx id
   (b) Eliminates performance issues that might come up with
       deadlock detection does a lot of unncessary checks
   (c) After moving local execution after the remote execution
       via https://github.com/citusdata/citus/pull/4301, the
       vauge requirement for assigning distributed tx ids are
       already gone.
pull/5901/head
Onder Kalaci 2022-04-12 15:16:58 +02:00
parent 6254f30305
commit a2debe0f02
7 changed files with 1060 additions and 29 deletions

View File

@ -207,19 +207,6 @@ ExecuteLocalTaskListExtended(List *taskList,
uint64 totalRowsProcessed = 0; uint64 totalRowsProcessed = 0;
ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); ParamListInfo paramListInfo = copyParamList(orig_paramListInfo);
/*
* Even if we are executing local tasks, we still enable
* coordinated transaction. This is because
* (a) we might be in a transaction, and the next commands may
* require coordinated transaction
* (b) we might be executing some tasks locally and the others
* via remote execution
*
* Also, there is no harm enabling coordinated transaction even if
* we only deal with local tasks in the transaction.
*/
UseCoordinatedTransaction();
LocalExecutorLevel++; LocalExecutorLevel++;
PG_TRY(); PG_TRY();
{ {

View File

@ -205,8 +205,17 @@ InCoordinatedTransaction(void)
void void
Use2PCForCoordinatedTransaction(void) Use2PCForCoordinatedTransaction(void)
{ {
Assert(InCoordinatedTransaction()); /*
* If this transaction is also a coordinated
* transaction, use 2PC. Otherwise, this
* state change does nothing.
*
* In other words, when this flag is set,
* we "should" use 2PC when needed (e.g.,
* we are in a coordinated transaction and
* the coordinated transaction does a remote
* modification).
*/
ShouldCoordinatedTransactionUse2PC = true; ShouldCoordinatedTransactionUse2PC = true;
} }

View File

@ -1983,6 +1983,17 @@ SELECT create_distributed_table('event_responses', 'event_id');
(1 row) (1 row)
INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no'); INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no');
CREATE TABLE event_responses_no_pkey (
event_id int,
user_id int,
response invite_resp
);
SELECT create_distributed_table('event_responses_no_pkey', 'event_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION regular_func(p invite_resp) CREATE OR REPLACE FUNCTION regular_func(p invite_resp)
RETURNS int AS $$ RETURNS int AS $$
DECLARE DECLARE
@ -2432,6 +2443,755 @@ DEBUG: Creating router plan
17 | 777 | no 17 | 777 | no
(2 rows) (2 rows)
-- set back to sane settings
RESET citus.enable_local_execution;
RESET citus.enable_fast_path_router_planner;
-- we'll test some 2PC states
SET citus.enable_metadata_sync TO OFF;
-- coordinated_transaction_should_use_2PC prints the internal
-- state for 2PC decision on Citus. However, even if 2PC is decided,
-- we may not necessarily use 2PC over a connection unless it does
-- a modification
CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC()
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
$$coordinated_transaction_should_use_2PC$$;
-- make tests consistent
SET citus.max_adaptive_executor_pool_size TO 1;
RESET citus.enable_metadata_sync;
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
SET citus.log_remote_commands TO ON;
-- we use event_id = 2 for local execution and event_id = 1 for reemote execution
--show it here, if anything changes here, all the tests below might be broken
-- we prefer this to avoid excessive logging below
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 2
NOTICE: executing the command locally: SELECT event_id, user_id, response FROM public.event_responses_no_pkey_1480007 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 2)
event_id | user_id | response
---------------------------------------------------------------------
(0 rows)
SELECT * FROM event_responses_no_pkey WHERE event_id = 1;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 1
NOTICE: issuing SELECT event_id, user_id, response FROM public.event_responses_no_pkey_1480004 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 1)
event_id | user_id | response
---------------------------------------------------------------------
(0 rows)
RESET citus.log_remote_commands;
RESET citus.log_local_commands;
RESET client_min_messages;
-- single shard local command without transaction block does set the
-- internal state for 2PC, but does not require any actual entries
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two local commands without transaction block set the internal 2PC state
-- but does not use remotely
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local modification followed by another single shard
-- local modification sets the 2PC state, but does not use remotely
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local modification followed by a single shard
-- remote modification uses 2PC because multiple nodes involved
-- in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local modification followed by a single shard
-- remote modification uses 2PC even if it is not in an explicit
-- tx block as multiple nodes involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a single shard
-- local modification uses 2PC as multiple nodes involved
-- in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a single shard
-- local modification uses 2PC even if it is not in an explicit
-- tx block
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local SELECT command without transaction block does not set the
-- internal state for 2PC
WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
ERROR: The transaction is not a coordinated transaction
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two local SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2)
SELECT count(*) FROM cte_1, cte_2;
count
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two local SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
9
(1 row)
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
9
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- a local SELECT followed by a remote SELECT does not require to
-- use actual 2PC
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
9
(1 row)
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
13
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
(9 rows)
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a single shard
-- local SELECT does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
9
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a single shard
-- local SELECT does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- multi shard local SELECT command without transaction block does not set the
-- internal state for 2PC
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
BEGIN;
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
17
(1 row)
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
17
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- multi-shard shard SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
17
(1 row)
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- multi shard SELECT followed by a single shard
-- remote single shard modification does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a multi shard
-- SELECT does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
20
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a multi shard
-- SELECT does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local modification followed by remote multi-shard
-- modification uses 2PC as multiple nodes are involved in modifications
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (UPDATE event_responses_no_pkey SET user_id = 1000 RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- a local SELECT followed by a remote multi-shard UPDATE requires to
-- use actual 2PC as multiple nodes are involved in modifications
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
10
(1 row)
UPDATE event_responses_no_pkey SET user_id = 1;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- a local SELECT followed by a remote single-shard UPDATE does not require to
-- use actual 2PC. This is because a single node is involved in modification
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
10
(1 row)
UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- a remote single-shard UPDATE followed by a local single shard SELECT
-- does not require to use actual 2PC. This is because a single node
-- is involved in modification
BEGIN;
UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
10
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
\c - - - :master_port \c - - - :master_port
-- verify the local_hostname guc is used for local executions that should connect to the -- verify the local_hostname guc is used for local executions that should connect to the
-- local host -- local host

View File

@ -2231,15 +2231,6 @@ NOTICE: executing the command locally: UPDATE single_node.another_schema_table_
(1 row) (1 row)
ROLLBACK; ROLLBACK;
-- same without transaction block
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.another_schema_table_90630515 another_schema_table SET b = (another_schema_table.b OPERATOR(pg_catalog.+) 1) WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) RETURNING another_schema_table.a, another_schema_table.b) SELECT single_node.coordinated_transaction_should_use_2pc() AS coordinated_transaction_should_use_2pc FROM cte_1
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
-- if the local execution is disabled, we cannot failover to -- if the local execution is disabled, we cannot failover to
-- local execution and the queries would fail -- local execution and the queries would fail
SET citus.enable_local_execution TO false; SET citus.enable_local_execution TO false;

View File

@ -43,7 +43,9 @@ test: coordinator_evaluation_modify
test: coordinator_evaluation_select test: coordinator_evaluation_select
test: multi_mx_call test: multi_mx_call
test: multi_mx_function_call_delegation test: multi_mx_function_call_delegation
test: multi_mx_modifications local_shard_execution local_shard_execution_replicated test: multi_mx_modifications local_shard_execution_replicated
# the following test has to be run sequentially
test: local_shard_execution
test: multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: local_shard_copy test: local_shard_copy
test: undistribute_table_cascade_mx test: undistribute_table_cascade_mx

View File

@ -922,6 +922,17 @@ SELECT create_distributed_table('event_responses', 'event_id');
INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no'); INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no');
CREATE TABLE event_responses_no_pkey (
event_id int,
user_id int,
response invite_resp
);
SELECT create_distributed_table('event_responses_no_pkey', 'event_id');
CREATE OR REPLACE FUNCTION regular_func(p invite_resp) CREATE OR REPLACE FUNCTION regular_func(p invite_resp)
RETURNS int AS $$ RETURNS int AS $$
DECLARE DECLARE
@ -1114,6 +1125,281 @@ INSERT INTO event_responses VALUES (16, 666, 'maybe'), (17, 777, 'no')
ON CONFLICT (event_id, user_id) ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response RETURNING *; DO UPDATE SET response = EXCLUDED.response RETURNING *;
-- set back to sane settings
RESET citus.enable_local_execution;
RESET citus.enable_fast_path_router_planner;
-- we'll test some 2PC states
SET citus.enable_metadata_sync TO OFF;
-- coordinated_transaction_should_use_2PC prints the internal
-- state for 2PC decision on Citus. However, even if 2PC is decided,
-- we may not necessarily use 2PC over a connection unless it does
-- a modification
CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC()
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
$$coordinated_transaction_should_use_2PC$$;
-- make tests consistent
SET citus.max_adaptive_executor_pool_size TO 1;
RESET citus.enable_metadata_sync;
SELECT recover_prepared_transactions();
SET citus.log_remote_commands TO ON;
-- we use event_id = 2 for local execution and event_id = 1 for reemote execution
--show it here, if anything changes here, all the tests below might be broken
-- we prefer this to avoid excessive logging below
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
SELECT * FROM event_responses_no_pkey WHERE event_id = 1;
RESET citus.log_remote_commands;
RESET citus.log_local_commands;
RESET client_min_messages;
-- single shard local command without transaction block does set the
-- internal state for 2PC, but does not require any actual entries
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two local commands without transaction block set the internal 2PC state
-- but does not use remotely
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local modification followed by another single shard
-- local modification sets the 2PC state, but does not use remotely
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local modification followed by a single shard
-- remote modification uses 2PC because multiple nodes involved
-- in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local modification followed by a single shard
-- remote modification uses 2PC even if it is not in an explicit
-- tx block as multiple nodes involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a single shard
-- local modification uses 2PC as multiple nodes involved
-- in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a single shard
-- local modification uses 2PC even if it is not in an explicit
-- tx block
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local SELECT command without transaction block does not set the
-- internal state for 2PC
WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two local SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2)
SELECT count(*) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two local SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- a local SELECT followed by a remote SELECT does not require to
-- use actual 2PC
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
SELECT count(*) FROM event_responses_no_pkey;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a single shard
-- local SELECT does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a single shard
-- local SELECT does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- multi shard local SELECT command without transaction block does not set the
-- internal state for 2PC
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
BEGIN;
SELECT count(*) FROM event_responses_no_pkey;
SELECT count(*) FROM event_responses_no_pkey;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- multi-shard shard SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
SELECT count(*) FROM event_responses_no_pkey;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- multi shard SELECT followed by a single shard
-- remote single shard modification does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a multi shard
-- SELECT does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT count(*) FROM event_responses_no_pkey;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a multi shard
-- SELECT does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local modification followed by remote multi-shard
-- modification uses 2PC as multiple nodes are involved in modifications
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (UPDATE event_responses_no_pkey SET user_id = 1000 RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- a local SELECT followed by a remote multi-shard UPDATE requires to
-- use actual 2PC as multiple nodes are involved in modifications
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
UPDATE event_responses_no_pkey SET user_id = 1;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- a local SELECT followed by a remote single-shard UPDATE does not require to
-- use actual 2PC. This is because a single node is involved in modification
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- a remote single-shard UPDATE followed by a local single shard SELECT
-- does not require to use actual 2PC. This is because a single node
-- is involved in modification
BEGIN;
UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
\c - - - :master_port \c - - - :master_port
-- verify the local_hostname guc is used for local executions that should connect to the -- verify the local_hostname guc is used for local executions that should connect to the

View File

@ -1102,10 +1102,6 @@ BEGIN;
SELECT coordinated_transaction_should_use_2PC(); SELECT coordinated_transaction_should_use_2PC();
ROLLBACK; ROLLBACK;
-- same without transaction block
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
-- if the local execution is disabled, we cannot failover to -- if the local execution is disabled, we cannot failover to
-- local execution and the queries would fail -- local execution and the queries would fail
SET citus.enable_local_execution TO false; SET citus.enable_local_execution TO false;