From 196064836c5a12c1c95d2fd33f94f8bb890640f0 Mon Sep 17 00:00:00 2001 From: Naisila Puka <37271756+naisila@users.noreply.github.com> Date: Wed, 10 Mar 2021 20:01:37 +0300 Subject: [PATCH] Skip 2PC for readonly connections in a transaction (#4587) * Skip 2PC for readonly connections in a transaction * Use ConnectionModifiedPlacement() function * Remove the second check of ConnectionModifiedPlacement() * Add order by to prevent flaky output * Test using pg_dist_transaction --- .../transaction/remote_transaction.c | 17 ++- .../expected/multi_transaction_recovery.out | 107 ++++++++++++++++++ .../sql/multi_transaction_recovery.sql | 48 ++++++++ 3 files changed, 170 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index c19552e22..8b67a2186 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -20,6 +20,7 @@ #include "distributed/connection_management.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" +#include "distributed/placement_connection.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" #include "distributed/transaction_identifier.h" @@ -782,8 +783,16 @@ CoordinatedRemoteTransactionsPrepare(void) continue; } - StartRemoteTransactionPrepare(connection); - connectionList = lappend(connectionList, connection); + /* + * Check if any DML or DDL is executed over the connection on any + * placement/table. If yes, we start preparing the transaction, otherwise + * we skip prepare since the connection didn't perform any write (read-only) + */ + if (ConnectionModifiedPlacement(connection)) + { + StartRemoteTransactionPrepare(connection); + connectionList = lappend(connectionList, connection); + } } bool raiseInterrupts = true; @@ -798,6 +807,10 @@ CoordinatedRemoteTransactionsPrepare(void) if (transaction->transactionState != REMOTE_TRANS_PREPARING) { + /* + * Verify that the connection didn't modify any placement + */ + Assert(!ConnectionModifiedPlacement(connection)); continue; } diff --git a/src/test/regress/expected/multi_transaction_recovery.out b/src/test/regress/expected/multi_transaction_recovery.out index 7c5cef8e4..575e62068 100644 --- a/src/test/regress/expected/multi_transaction_recovery.out +++ b/src/test/regress/expected/multi_transaction_recovery.out @@ -331,6 +331,111 @@ SELECT count(*) FROM pg_dist_transaction; 2 (1 row) +-- check that read-only participants skip prepare +SET citus.shard_count TO 4; +CREATE TABLE test_2pcskip (a int); +SELECT create_distributed_table('test_2pcskip', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test_2pcskip SELECT i FROM generate_series(0, 5)i; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- for the following test, ensure that 6 and 7 go to different shards on different workers +SELECT count(DISTINCT nodeport) FROM pg_dist_shard_placement WHERE shardid IN (get_shard_id_for_distribution_column('test_2pcskip', 6),get_shard_id_for_distribution_column('test_2pcskip', 7)); + count +--------------------------------------------------------------------- + 2 +(1 row) + +-- only two of the connections will perform a write (INSERT) +SET citus.force_max_query_parallelization TO ON; +BEGIN; +-- these inserts use two connections +INSERT INTO test_2pcskip VALUES (6); +INSERT INTO test_2pcskip VALUES (7); +-- we know this will use more than two connections +SELECT count(*) FROM test_2pcskip; + count +--------------------------------------------------------------------- + 8 +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 2 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- only two of the connections will perform a write (INSERT) +BEGIN; +-- this insert uses two connections +INSERT INTO test_2pcskip SELECT i FROM generate_series(6, 7)i; +-- we know this will use more than two connections +SELECT COUNT(*) FROM test_2pcskip; + count +--------------------------------------------------------------------- + 10 +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 2 +(1 row) + +-- check that reads from a reference table don't trigger 2PC +-- despite repmodel being 2PC +CREATE TABLE test_reference (b int); +SELECT create_reference_table('test_reference'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test_reference VALUES(1); +INSERT INTO test_reference VALUES(2); +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +BEGIN; +SELECT * FROM test_reference ORDER BY 1; + b +--------------------------------------------------------------------- + 1 + 2 +(2 rows) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + -- Test whether auto-recovery runs ALTER SYSTEM SET citus.recover_2pc_interval TO 10; SELECT pg_reload_conf(); @@ -362,6 +467,8 @@ SELECT pg_reload_conf(); DROP TABLE test_recovery_ref; DROP TABLE test_recovery; DROP TABLE test_recovery_single; +DROP TABLE test_2pcskip; +DROP TABLE test_reference; SELECT 1 FROM master_remove_node('localhost', :master_port); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/sql/multi_transaction_recovery.sql b/src/test/regress/sql/multi_transaction_recovery.sql index 3b6efefba..8aeb97ce6 100644 --- a/src/test/regress/sql/multi_transaction_recovery.sql +++ b/src/test/regress/sql/multi_transaction_recovery.sql @@ -186,6 +186,52 @@ INSERT INTO test_recovery_single VALUES ('hello-2'); COMMIT; SELECT count(*) FROM pg_dist_transaction; +-- check that read-only participants skip prepare +SET citus.shard_count TO 4; +CREATE TABLE test_2pcskip (a int); +SELECT create_distributed_table('test_2pcskip', 'a'); +INSERT INTO test_2pcskip SELECT i FROM generate_series(0, 5)i; +SELECT recover_prepared_transactions(); + +-- for the following test, ensure that 6 and 7 go to different shards on different workers +SELECT count(DISTINCT nodeport) FROM pg_dist_shard_placement WHERE shardid IN (get_shard_id_for_distribution_column('test_2pcskip', 6),get_shard_id_for_distribution_column('test_2pcskip', 7)); +-- only two of the connections will perform a write (INSERT) +SET citus.force_max_query_parallelization TO ON; +BEGIN; +-- these inserts use two connections +INSERT INTO test_2pcskip VALUES (6); +INSERT INTO test_2pcskip VALUES (7); +-- we know this will use more than two connections +SELECT count(*) FROM test_2pcskip; +COMMIT; + +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- only two of the connections will perform a write (INSERT) +BEGIN; +-- this insert uses two connections +INSERT INTO test_2pcskip SELECT i FROM generate_series(6, 7)i; +-- we know this will use more than two connections +SELECT COUNT(*) FROM test_2pcskip; +COMMIT; + +SELECT count(*) FROM pg_dist_transaction; + +-- check that reads from a reference table don't trigger 2PC +-- despite repmodel being 2PC +CREATE TABLE test_reference (b int); +SELECT create_reference_table('test_reference'); +INSERT INTO test_reference VALUES(1); +INSERT INTO test_reference VALUES(2); +SELECT recover_prepared_transactions(); + +BEGIN; +SELECT * FROM test_reference ORDER BY 1; +COMMIT; + +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); -- Test whether auto-recovery runs ALTER SYSTEM SET citus.recover_2pc_interval TO 10; @@ -200,5 +246,7 @@ SELECT pg_reload_conf(); DROP TABLE test_recovery_ref; DROP TABLE test_recovery; DROP TABLE test_recovery_single; +DROP TABLE test_2pcskip; +DROP TABLE test_reference; SELECT 1 FROM master_remove_node('localhost', :master_port);