Use local group ID when querying for prepared transactions

pull/1682/head
Marco Slot 2017-09-26 21:52:29 -07:00 committed by Marco Slot
parent 11adb9b034
commit 9e516513fc
4 changed files with 215 additions and 2 deletions

View File

@ -384,7 +384,7 @@ PendingWorkerTransactionList(MultiConnection *connection)
int rowCount = 0;
int rowIndex = 0;
List *transactionNames = NIL;
int coordinatorId = 0;
int coordinatorId = GetLocalGroupId();
appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts "
"WHERE gid LIKE 'citus_%d_%%'",

View File

@ -0,0 +1,135 @@
-- Tests for running transaction recovery from a worker node
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO streaming;
CREATE TABLE test_recovery (x text);
SELECT create_distributed_table('test_recovery', 'x');
create_distributed_table
--------------------------
(1 row)
\c - - - :worker_1_port
SET citus.multi_shard_commit_protocol TO '2pc';
-- Ensure pg_dist_transaction is empty for test
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
-- If the groupid of the worker changes this query will produce a
-- different result and the prepared statement names should be adapted
-- accordingly.
SELECT * FROM pg_dist_local_group;
groupid
---------
12
(1 row)
BEGIN;
CREATE TABLE table_should_abort (value int);
PREPARE TRANSACTION 'citus_12_should_abort';
BEGIN;
CREATE TABLE table_should_commit (value int);
PREPARE TRANSACTION 'citus_12_should_commit';
BEGIN;
CREATE TABLE should_be_sorted_into_middle (value int);
PREPARE TRANSACTION 'citus_12_should_be_sorted_into_middle';
-- Add "fake" pg_dist_transaction records and run recovery
INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_commit');
INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_be_forgotten');
SELECT recover_prepared_transactions();
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: ROLLBACK PREPARED 'citus_12_should_abort'
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: ROLLBACK PREPARED 'citus_12_should_be_sorted_into_middle'
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: COMMIT PREPARED 'citus_12_should_commit'
recover_prepared_transactions
-------------------------------
3
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
SELECT count(*) FROM pg_tables WHERE tablename = 'table_should_abort';
count
-------
0
(1 row)
SELECT count(*) FROM pg_tables WHERE tablename = 'table_should_commit';
count
-------
1
(1 row)
-- plain INSERT does not use 2PC
INSERT INTO test_recovery VALUES ('hello');
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
-- Multi-statement transactions should write 2 transaction recovery records
BEGIN;
INSERT INTO test_recovery VALUES ('hello');
INSERT INTO test_recovery VALUES ('world');
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
-------
2
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records
INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s;
SELECT count(*) FROM pg_dist_transaction;
count
-------
4
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
-- Committed COPY should write 3 transaction records (2 fall into the same shard)
COPY test_recovery (x) FROM STDIN CSV;
SELECT count(*) FROM pg_dist_transaction;
count
-------
3
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
DROP TABLE table_should_commit;
\c - - - :master_port
DROP TABLE test_recovery_ref;
ERROR: table "test_recovery_ref" does not exist
DROP TABLE test_recovery;

View File

@ -26,7 +26,7 @@ test: multi_mx_tpch_query7_nested multi_mx_ddl
test: multi_mx_repartition_udt_prepare
test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: multi_mx_metadata
test: multi_mx_modifications
test: multi_mx_modifications multi_mx_transaction_recovery
test: multi_mx_modifying_xacts
test: multi_mx_explain
test: multi_mx_reference_table

View File

@ -0,0 +1,78 @@
-- Tests for running transaction recovery from a worker node
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO streaming;
CREATE TABLE test_recovery (x text);
SELECT create_distributed_table('test_recovery', 'x');
\c - - - :worker_1_port
SET citus.multi_shard_commit_protocol TO '2pc';
-- Ensure pg_dist_transaction is empty for test
SELECT recover_prepared_transactions();
SELECT count(*) FROM pg_dist_transaction;
-- If the groupid of the worker changes this query will produce a
-- different result and the prepared statement names should be adapted
-- accordingly.
SELECT * FROM pg_dist_local_group;
BEGIN;
CREATE TABLE table_should_abort (value int);
PREPARE TRANSACTION 'citus_12_should_abort';
BEGIN;
CREATE TABLE table_should_commit (value int);
PREPARE TRANSACTION 'citus_12_should_commit';
BEGIN;
CREATE TABLE should_be_sorted_into_middle (value int);
PREPARE TRANSACTION 'citus_12_should_be_sorted_into_middle';
-- Add "fake" pg_dist_transaction records and run recovery
INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_commit');
INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_be_forgotten');
SELECT recover_prepared_transactions();
SELECT count(*) FROM pg_dist_transaction;
SELECT count(*) FROM pg_tables WHERE tablename = 'table_should_abort';
SELECT count(*) FROM pg_tables WHERE tablename = 'table_should_commit';
-- plain INSERT does not use 2PC
INSERT INTO test_recovery VALUES ('hello');
SELECT count(*) FROM pg_dist_transaction;
-- Multi-statement transactions should write 2 transaction recovery records
BEGIN;
INSERT INTO test_recovery VALUES ('hello');
INSERT INTO test_recovery VALUES ('world');
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records
INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- Committed COPY should write 3 transaction records (2 fall into the same shard)
COPY test_recovery (x) FROM STDIN CSV;
hello-0
hello-1
world-0
world-1
\.
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
DROP TABLE table_should_commit;
\c - - - :master_port
DROP TABLE test_recovery_ref;
DROP TABLE test_recovery;