diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index f3eacf848..a79557d97 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -384,7 +384,7 @@ PendingWorkerTransactionList(MultiConnection *connection) int rowCount = 0; int rowIndex = 0; List *transactionNames = NIL; - int coordinatorId = 0; + int coordinatorId = GetLocalGroupId(); appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts " "WHERE gid LIKE 'citus_%d_%%'", diff --git a/src/test/regress/expected/multi_mx_transaction_recovery.out b/src/test/regress/expected/multi_mx_transaction_recovery.out new file mode 100644 index 000000000..4e3e33e5e --- /dev/null +++ b/src/test/regress/expected/multi_mx_transaction_recovery.out @@ -0,0 +1,135 @@ +-- Tests for running transaction recovery from a worker node +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO streaming; +CREATE TABLE test_recovery (x text); +SELECT create_distributed_table('test_recovery', 'x'); + create_distributed_table +-------------------------- + +(1 row) + +\c - - - :worker_1_port +SET citus.multi_shard_commit_protocol TO '2pc'; +-- Ensure pg_dist_transaction is empty for test +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +-- If the groupid of the worker changes this query will produce a +-- different result and the prepared statement names should be adapted +-- accordingly. +SELECT * FROM pg_dist_local_group; + groupid +--------- + 12 +(1 row) + +BEGIN; +CREATE TABLE table_should_abort (value int); +PREPARE TRANSACTION 'citus_12_should_abort'; +BEGIN; +CREATE TABLE table_should_commit (value int); +PREPARE TRANSACTION 'citus_12_should_commit'; +BEGIN; +CREATE TABLE should_be_sorted_into_middle (value int); +PREPARE TRANSACTION 'citus_12_should_be_sorted_into_middle'; +-- Add "fake" pg_dist_transaction records and run recovery +INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_commit'); +INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_be_forgotten'); +SELECT recover_prepared_transactions(); +NOTICE: recovered a prepared transaction on localhost:57637 +CONTEXT: ROLLBACK PREPARED 'citus_12_should_abort' +NOTICE: recovered a prepared transaction on localhost:57637 +CONTEXT: ROLLBACK PREPARED 'citus_12_should_be_sorted_into_middle' +NOTICE: recovered a prepared transaction on localhost:57637 +CONTEXT: COMMIT PREPARED 'citus_12_should_commit' + recover_prepared_transactions +------------------------------- + 3 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +SELECT count(*) FROM pg_tables WHERE tablename = 'table_should_abort'; + count +------- + 0 +(1 row) + +SELECT count(*) FROM pg_tables WHERE tablename = 'table_should_commit'; + count +------- + 1 +(1 row) + +-- plain INSERT does not use 2PC +INSERT INTO test_recovery VALUES ('hello'); +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +-- Multi-statement transactions should write 2 transaction recovery records +BEGIN; +INSERT INTO test_recovery VALUES ('hello'); +INSERT INTO test_recovery VALUES ('world'); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 2 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records +INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 4 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +-- Committed COPY should write 3 transaction records (2 fall into the same shard) +COPY test_recovery (x) FROM STDIN CSV; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 3 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +DROP TABLE table_should_commit; +\c - - - :master_port +DROP TABLE test_recovery_ref; +ERROR: table "test_recovery_ref" does not exist +DROP TABLE test_recovery; diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 3897f7cce..701157fb4 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -26,7 +26,7 @@ test: multi_mx_tpch_query7_nested multi_mx_ddl test: multi_mx_repartition_udt_prepare test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: multi_mx_metadata -test: multi_mx_modifications +test: multi_mx_modifications multi_mx_transaction_recovery test: multi_mx_modifying_xacts test: multi_mx_explain test: multi_mx_reference_table diff --git a/src/test/regress/sql/multi_mx_transaction_recovery.sql b/src/test/regress/sql/multi_mx_transaction_recovery.sql new file mode 100644 index 000000000..b016ff2a1 --- /dev/null +++ b/src/test/regress/sql/multi_mx_transaction_recovery.sql @@ -0,0 +1,78 @@ +-- Tests for running transaction recovery from a worker node + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO streaming; + +CREATE TABLE test_recovery (x text); +SELECT create_distributed_table('test_recovery', 'x'); + +\c - - - :worker_1_port + +SET citus.multi_shard_commit_protocol TO '2pc'; + +-- Ensure pg_dist_transaction is empty for test +SELECT recover_prepared_transactions(); +SELECT count(*) FROM pg_dist_transaction; + +-- If the groupid of the worker changes this query will produce a +-- different result and the prepared statement names should be adapted +-- accordingly. +SELECT * FROM pg_dist_local_group; + +BEGIN; +CREATE TABLE table_should_abort (value int); +PREPARE TRANSACTION 'citus_12_should_abort'; + +BEGIN; +CREATE TABLE table_should_commit (value int); +PREPARE TRANSACTION 'citus_12_should_commit'; + +BEGIN; +CREATE TABLE should_be_sorted_into_middle (value int); +PREPARE TRANSACTION 'citus_12_should_be_sorted_into_middle'; + +-- Add "fake" pg_dist_transaction records and run recovery +INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_commit'); +INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_be_forgotten'); + +SELECT recover_prepared_transactions(); +SELECT count(*) FROM pg_dist_transaction; + +SELECT count(*) FROM pg_tables WHERE tablename = 'table_should_abort'; +SELECT count(*) FROM pg_tables WHERE tablename = 'table_should_commit'; + +-- plain INSERT does not use 2PC +INSERT INTO test_recovery VALUES ('hello'); +SELECT count(*) FROM pg_dist_transaction; + +-- Multi-statement transactions should write 2 transaction recovery records +BEGIN; +INSERT INTO test_recovery VALUES ('hello'); +INSERT INTO test_recovery VALUES ('world'); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records +INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- Committed COPY should write 3 transaction records (2 fall into the same shard) +COPY test_recovery (x) FROM STDIN CSV; +hello-0 +hello-1 +world-0 +world-1 +\. + +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +DROP TABLE table_should_commit; + +\c - - - :master_port + +DROP TABLE test_recovery_ref; +DROP TABLE test_recovery;