Merge pull request #5901 from citusdata/fix_local_procs_deadlocks

Do not assign distributed transaction ids for local execution
pull/5906/head
Önder Kalacı 2022-04-25 09:25:24 +02:00 committed by GitHub
commit f938f393e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1060 additions and 29 deletions

View File

@ -207,19 +207,6 @@ ExecuteLocalTaskListExtended(List *taskList,
uint64 totalRowsProcessed = 0;
ParamListInfo paramListInfo = copyParamList(orig_paramListInfo);
/*
* Even if we are executing local tasks, we still enable
* coordinated transaction. This is because
* (a) we might be in a transaction, and the next commands may
* require coordinated transaction
* (b) we might be executing some tasks locally and the others
* via remote execution
*
* Also, there is no harm enabling coordinated transaction even if
* we only deal with local tasks in the transaction.
*/
UseCoordinatedTransaction();
LocalExecutorLevel++;
PG_TRY();
{

View File

@ -205,8 +205,17 @@ InCoordinatedTransaction(void)
void
Use2PCForCoordinatedTransaction(void)
{
Assert(InCoordinatedTransaction());
/*
* If this transaction is also a coordinated
* transaction, use 2PC. Otherwise, this
* state change does nothing.
*
* In other words, when this flag is set,
* we "should" use 2PC when needed (e.g.,
* we are in a coordinated transaction and
* the coordinated transaction does a remote
* modification).
*/
ShouldCoordinatedTransactionUse2PC = true;
}

View File

@ -1983,6 +1983,17 @@ SELECT create_distributed_table('event_responses', 'event_id');
(1 row)
INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no');
CREATE TABLE event_responses_no_pkey (
event_id int,
user_id int,
response invite_resp
);
SELECT create_distributed_table('event_responses_no_pkey', 'event_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION regular_func(p invite_resp)
RETURNS int AS $$
DECLARE
@ -2432,6 +2443,755 @@ DEBUG: Creating router plan
17 | 777 | no
(2 rows)
-- set back to sane settings
RESET citus.enable_local_execution;
RESET citus.enable_fast_path_router_planner;
-- we'll test some 2PC states
SET citus.enable_metadata_sync TO OFF;
-- coordinated_transaction_should_use_2PC prints the internal
-- state for 2PC decision on Citus. However, even if 2PC is decided,
-- we may not necessarily use 2PC over a connection unless it does
-- a modification
CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC()
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
$$coordinated_transaction_should_use_2PC$$;
-- make tests consistent
SET citus.max_adaptive_executor_pool_size TO 1;
RESET citus.enable_metadata_sync;
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
SET citus.log_remote_commands TO ON;
-- we use event_id = 2 for local execution and event_id = 1 for reemote execution
--show it here, if anything changes here, all the tests below might be broken
-- we prefer this to avoid excessive logging below
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 2
NOTICE: executing the command locally: SELECT event_id, user_id, response FROM public.event_responses_no_pkey_1480007 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 2)
event_id | user_id | response
---------------------------------------------------------------------
(0 rows)
SELECT * FROM event_responses_no_pkey WHERE event_id = 1;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 1
NOTICE: issuing SELECT event_id, user_id, response FROM public.event_responses_no_pkey_1480004 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 1)
event_id | user_id | response
---------------------------------------------------------------------
(0 rows)
RESET citus.log_remote_commands;
RESET citus.log_local_commands;
RESET client_min_messages;
-- single shard local command without transaction block does set the
-- internal state for 2PC, but does not require any actual entries
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two local commands without transaction block set the internal 2PC state
-- but does not use remotely
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local modification followed by another single shard
-- local modification sets the 2PC state, but does not use remotely
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local modification followed by a single shard
-- remote modification uses 2PC because multiple nodes involved
-- in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local modification followed by a single shard
-- remote modification uses 2PC even if it is not in an explicit
-- tx block as multiple nodes involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a single shard
-- local modification uses 2PC as multiple nodes involved
-- in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a single shard
-- local modification uses 2PC even if it is not in an explicit
-- tx block
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local SELECT command without transaction block does not set the
-- internal state for 2PC
WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
ERROR: The transaction is not a coordinated transaction
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two local SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2)
SELECT count(*) FROM cte_1, cte_2;
count
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two local SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
9
(1 row)
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
9
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- a local SELECT followed by a remote SELECT does not require to
-- use actual 2PC
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
9
(1 row)
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
13
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
(9 rows)
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a single shard
-- local SELECT does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
9
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a single shard
-- local SELECT does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- multi shard local SELECT command without transaction block does not set the
-- internal state for 2PC
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
BEGIN;
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
17
(1 row)
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
17
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- multi-shard shard SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
17
(1 row)
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- multi shard SELECT followed by a single shard
-- remote single shard modification does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a multi shard
-- SELECT does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
20
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a multi shard
-- SELECT does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local modification followed by remote multi-shard
-- modification uses 2PC as multiple nodes are involved in modifications
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (UPDATE event_responses_no_pkey SET user_id = 1000 RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- a local SELECT followed by a remote multi-shard UPDATE requires to
-- use actual 2PC as multiple nodes are involved in modifications
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
10
(1 row)
UPDATE event_responses_no_pkey SET user_id = 1;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- a local SELECT followed by a remote single-shard UPDATE does not require to
-- use actual 2PC. This is because a single node is involved in modification
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
10
(1 row)
UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- a remote single-shard UPDATE followed by a local single shard SELECT
-- does not require to use actual 2PC. This is because a single node
-- is involved in modification
BEGIN;
UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
10
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
\c - - - :master_port
-- verify the local_hostname guc is used for local executions that should connect to the
-- local host

View File

@ -2231,15 +2231,6 @@ NOTICE: executing the command locally: UPDATE single_node.another_schema_table_
(1 row)
ROLLBACK;
-- same without transaction block
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.another_schema_table_90630515 another_schema_table SET b = (another_schema_table.b OPERATOR(pg_catalog.+) 1) WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) RETURNING another_schema_table.a, another_schema_table.b) SELECT single_node.coordinated_transaction_should_use_2pc() AS coordinated_transaction_should_use_2pc FROM cte_1
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
-- if the local execution is disabled, we cannot failover to
-- local execution and the queries would fail
SET citus.enable_local_execution TO false;

View File

@ -43,7 +43,9 @@ test: coordinator_evaluation_modify
test: coordinator_evaluation_select
test: multi_mx_call
test: multi_mx_function_call_delegation
test: multi_mx_modifications local_shard_execution local_shard_execution_replicated
test: multi_mx_modifications local_shard_execution_replicated
# the following test has to be run sequentially
test: local_shard_execution
test: multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: local_shard_copy
test: undistribute_table_cascade_mx

View File

@ -922,6 +922,17 @@ SELECT create_distributed_table('event_responses', 'event_id');
INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no');
CREATE TABLE event_responses_no_pkey (
event_id int,
user_id int,
response invite_resp
);
SELECT create_distributed_table('event_responses_no_pkey', 'event_id');
CREATE OR REPLACE FUNCTION regular_func(p invite_resp)
RETURNS int AS $$
DECLARE
@ -1114,6 +1125,281 @@ INSERT INTO event_responses VALUES (16, 666, 'maybe'), (17, 777, 'no')
ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response RETURNING *;
-- set back to sane settings
RESET citus.enable_local_execution;
RESET citus.enable_fast_path_router_planner;
-- we'll test some 2PC states
SET citus.enable_metadata_sync TO OFF;
-- coordinated_transaction_should_use_2PC prints the internal
-- state for 2PC decision on Citus. However, even if 2PC is decided,
-- we may not necessarily use 2PC over a connection unless it does
-- a modification
CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC()
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
$$coordinated_transaction_should_use_2PC$$;
-- make tests consistent
SET citus.max_adaptive_executor_pool_size TO 1;
RESET citus.enable_metadata_sync;
SELECT recover_prepared_transactions();
SET citus.log_remote_commands TO ON;
-- we use event_id = 2 for local execution and event_id = 1 for reemote execution
--show it here, if anything changes here, all the tests below might be broken
-- we prefer this to avoid excessive logging below
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
SELECT * FROM event_responses_no_pkey WHERE event_id = 1;
RESET citus.log_remote_commands;
RESET citus.log_local_commands;
RESET client_min_messages;
-- single shard local command without transaction block does set the
-- internal state for 2PC, but does not require any actual entries
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two local commands without transaction block set the internal 2PC state
-- but does not use remotely
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local modification followed by another single shard
-- local modification sets the 2PC state, but does not use remotely
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local modification followed by a single shard
-- remote modification uses 2PC because multiple nodes involved
-- in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local modification followed by a single shard
-- remote modification uses 2PC even if it is not in an explicit
-- tx block as multiple nodes involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a single shard
-- local modification uses 2PC as multiple nodes involved
-- in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a single shard
-- local modification uses 2PC even if it is not in an explicit
-- tx block
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local SELECT command without transaction block does not set the
-- internal state for 2PC
WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two local SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2)
SELECT count(*) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two local SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- a local SELECT followed by a remote SELECT does not require to
-- use actual 2PC
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
SELECT count(*) FROM event_responses_no_pkey;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a single shard
-- local SELECT does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a single shard
-- local SELECT does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- multi shard local SELECT command without transaction block does not set the
-- internal state for 2PC
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
BEGIN;
SELECT count(*) FROM event_responses_no_pkey;
SELECT count(*) FROM event_responses_no_pkey;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- multi-shard shard SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
SELECT count(*) FROM event_responses_no_pkey;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- multi shard SELECT followed by a single shard
-- remote single shard modification does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a multi shard
-- SELECT does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT count(*) FROM event_responses_no_pkey;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a multi shard
-- SELECT does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local modification followed by remote multi-shard
-- modification uses 2PC as multiple nodes are involved in modifications
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (UPDATE event_responses_no_pkey SET user_id = 1000 RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- a local SELECT followed by a remote multi-shard UPDATE requires to
-- use actual 2PC as multiple nodes are involved in modifications
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
UPDATE event_responses_no_pkey SET user_id = 1;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- a local SELECT followed by a remote single-shard UPDATE does not require to
-- use actual 2PC. This is because a single node is involved in modification
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- a remote single-shard UPDATE followed by a local single shard SELECT
-- does not require to use actual 2PC. This is because a single node
-- is involved in modification
BEGIN;
UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
\c - - - :master_port
-- verify the local_hostname guc is used for local executions that should connect to the

View File

@ -1102,10 +1102,6 @@ BEGIN;
SELECT coordinated_transaction_should_use_2PC();
ROLLBACK;
-- same without transaction block
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
-- if the local execution is disabled, we cannot failover to
-- local execution and the queries would fail
SET citus.enable_local_execution TO false;