Skip 2PC for readonly connections in a transaction (#4587)

* Skip 2PC for readonly connections in a transaction

* Use ConnectionModifiedPlacement() function

* Remove the second check of ConnectionModifiedPlacement()

* Add order by to prevent flaky output

* Test using pg_dist_transaction
pull/4809/head
Naisila Puka 2021-03-10 20:01:37 +03:00 committed by GitHub
parent 68a527ba17
commit 196064836c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 170 additions and 2 deletions

View File

@ -20,6 +20,7 @@
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/transaction_identifier.h"
@ -782,8 +783,16 @@ CoordinatedRemoteTransactionsPrepare(void)
continue;
}
StartRemoteTransactionPrepare(connection);
connectionList = lappend(connectionList, connection);
/*
* Check if any DML or DDL is executed over the connection on any
* placement/table. If yes, we start preparing the transaction, otherwise
* we skip prepare since the connection didn't perform any write (read-only)
*/
if (ConnectionModifiedPlacement(connection))
{
StartRemoteTransactionPrepare(connection);
connectionList = lappend(connectionList, connection);
}
}
bool raiseInterrupts = true;
@ -798,6 +807,10 @@ CoordinatedRemoteTransactionsPrepare(void)
if (transaction->transactionState != REMOTE_TRANS_PREPARING)
{
/*
* Verify that the connection didn't modify any placement
*/
Assert(!ConnectionModifiedPlacement(connection));
continue;
}

View File

@ -331,6 +331,111 @@ SELECT count(*) FROM pg_dist_transaction;
2
(1 row)
-- check that read-only participants skip prepare
SET citus.shard_count TO 4;
CREATE TABLE test_2pcskip (a int);
SELECT create_distributed_table('test_2pcskip', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_2pcskip SELECT i FROM generate_series(0, 5)i;
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- for the following test, ensure that 6 and 7 go to different shards on different workers
SELECT count(DISTINCT nodeport) FROM pg_dist_shard_placement WHERE shardid IN (get_shard_id_for_distribution_column('test_2pcskip', 6),get_shard_id_for_distribution_column('test_2pcskip', 7));
count
---------------------------------------------------------------------
2
(1 row)
-- only two of the connections will perform a write (INSERT)
SET citus.force_max_query_parallelization TO ON;
BEGIN;
-- these inserts use two connections
INSERT INTO test_2pcskip VALUES (6);
INSERT INTO test_2pcskip VALUES (7);
-- we know this will use more than two connections
SELECT count(*) FROM test_2pcskip;
count
---------------------------------------------------------------------
8
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
2
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- only two of the connections will perform a write (INSERT)
BEGIN;
-- this insert uses two connections
INSERT INTO test_2pcskip SELECT i FROM generate_series(6, 7)i;
-- we know this will use more than two connections
SELECT COUNT(*) FROM test_2pcskip;
count
---------------------------------------------------------------------
10
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
2
(1 row)
-- check that reads from a reference table don't trigger 2PC
-- despite repmodel being 2PC
CREATE TABLE test_reference (b int);
SELECT create_reference_table('test_reference');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_reference VALUES(1);
INSERT INTO test_reference VALUES(2);
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
BEGIN;
SELECT * FROM test_reference ORDER BY 1;
b
---------------------------------------------------------------------
1
2
(2 rows)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- Test whether auto-recovery runs
ALTER SYSTEM SET citus.recover_2pc_interval TO 10;
SELECT pg_reload_conf();
@ -362,6 +467,8 @@ SELECT pg_reload_conf();
DROP TABLE test_recovery_ref;
DROP TABLE test_recovery;
DROP TABLE test_recovery_single;
DROP TABLE test_2pcskip;
DROP TABLE test_reference;
SELECT 1 FROM master_remove_node('localhost', :master_port);
?column?
---------------------------------------------------------------------

View File

@ -186,6 +186,52 @@ INSERT INTO test_recovery_single VALUES ('hello-2');
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
-- check that read-only participants skip prepare
SET citus.shard_count TO 4;
CREATE TABLE test_2pcskip (a int);
SELECT create_distributed_table('test_2pcskip', 'a');
INSERT INTO test_2pcskip SELECT i FROM generate_series(0, 5)i;
SELECT recover_prepared_transactions();
-- for the following test, ensure that 6 and 7 go to different shards on different workers
SELECT count(DISTINCT nodeport) FROM pg_dist_shard_placement WHERE shardid IN (get_shard_id_for_distribution_column('test_2pcskip', 6),get_shard_id_for_distribution_column('test_2pcskip', 7));
-- only two of the connections will perform a write (INSERT)
SET citus.force_max_query_parallelization TO ON;
BEGIN;
-- these inserts use two connections
INSERT INTO test_2pcskip VALUES (6);
INSERT INTO test_2pcskip VALUES (7);
-- we know this will use more than two connections
SELECT count(*) FROM test_2pcskip;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- only two of the connections will perform a write (INSERT)
BEGIN;
-- this insert uses two connections
INSERT INTO test_2pcskip SELECT i FROM generate_series(6, 7)i;
-- we know this will use more than two connections
SELECT COUNT(*) FROM test_2pcskip;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
-- check that reads from a reference table don't trigger 2PC
-- despite repmodel being 2PC
CREATE TABLE test_reference (b int);
SELECT create_reference_table('test_reference');
INSERT INTO test_reference VALUES(1);
INSERT INTO test_reference VALUES(2);
SELECT recover_prepared_transactions();
BEGIN;
SELECT * FROM test_reference ORDER BY 1;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- Test whether auto-recovery runs
ALTER SYSTEM SET citus.recover_2pc_interval TO 10;
@ -200,5 +246,7 @@ SELECT pg_reload_conf();
DROP TABLE test_recovery_ref;
DROP TABLE test_recovery;
DROP TABLE test_recovery_single;
DROP TABLE test_2pcskip;
DROP TABLE test_reference;
SELECT 1 FROM master_remove_node('localhost', :master_port);