diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index e9f7d9af6..01bfe0eea 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -16,9 +16,12 @@ #include "access/xact.h" #include "distributed/connection_management.h" -#include "distributed/transaction_management.h" +#include "distributed/metadata_cache.h" #include "distributed/remote_commands.h" #include "distributed/remote_transaction.h" +#include "distributed/transaction_management.h" +#include "distributed/transaction_recovery.h" +#include "distributed/worker_manager.h" #include "utils/hsearch.h" @@ -376,6 +379,7 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection) RemoteTransaction *transaction = &connection->remoteTransaction; StringInfoData command; const bool raiseErrors = true; + WorkerNode *workerNode = NULL; /* can't prepare a nonexistant transaction */ Assert(transaction->transactionState != REMOTE_TRANS_INVALID); @@ -388,6 +392,13 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection) Assign2PCIdentifier(connection); + /* log transactions to workers in pg_dist_transaction */ + workerNode = FindWorkerNode(connection->hostname, connection->port); + if (workerNode != NULL) + { + LogTransactionRecord(workerNode->groupId, transaction->preparedName); + } + initStringInfo(&command); appendStringInfo(&command, "PREPARE TRANSACTION '%s'", transaction->preparedName); @@ -826,7 +837,7 @@ Assign2PCIdentifier(MultiConnection *connection) { static uint64 sequence = 0; snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN, - "citus_%d_"UINT64_FORMAT, + "citus_%d_%d_"UINT64_FORMAT, GetLocalGroupId(), MyProcPid, sequence++); } diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 9c59d57af..88e4e25d1 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -45,7 +45,6 @@ PG_FUNCTION_INFO_V1(recover_prepared_transactions); /* Local functions forward declarations */ -static void LogTransactionRecord(int groupId, char *transactionName); static int RecoverPreparedTransactions(void); static int RecoverWorkerTransactions(WorkerNode *workerNode); static List * NameListDifference(List *nameList, List *subtractList); @@ -106,7 +105,7 @@ LogPreparedTransactions(List *connectionList) * prepared on a worker. The presence of this record indicates that the * prepared transaction should be committed. */ -static void +void LogTransactionRecord(int groupId, char *transactionName) { Relation pgDistTransaction = NULL; diff --git a/src/include/distributed/transaction_recovery.h b/src/include/distributed/transaction_recovery.h index 82c7d0dde..15bbecd08 100644 --- a/src/include/distributed/transaction_recovery.h +++ b/src/include/distributed/transaction_recovery.h @@ -17,6 +17,7 @@ /* Functions declarations for worker transactions */ extern void LogPreparedTransactions(List *connectionList); +extern void LogTransactionRecord(int groupId, char *transactionName); #endif /* TRANSACTION_RECOVERY_H */ diff --git a/src/test/regress/expected/multi_transaction_recovery.out b/src/test/regress/expected/multi_transaction_recovery.out index a0bc331bf..3ef82a7c0 100644 --- a/src/test/regress/expected/multi_transaction_recovery.out +++ b/src/test/regress/expected/multi_transaction_recovery.out @@ -60,3 +60,114 @@ SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit'; 1 (1 row) +\c - - - :master_port +SET citus.shard_replication_factor TO 2; +SET citus.shard_count TO 2; +SET citus.multi_shard_commit_protocol TO '2pc'; +CREATE TABLE test_recovery (x text); +SELECT create_distributed_table('test_recovery', 'x'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO test_recovery VALUES ('hello'); +-- Committed DDL commands should write 4 transaction recovery records +BEGIN; +ALTER TABLE test_recovery ADD COLUMN y text; +ROLLBACK; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +ALTER TABLE test_recovery ADD COLUMN y text; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 4 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +-- Committed master_modify_multiple_shards should write 4 transaction recovery records +BEGIN; +SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + +ROLLBACK; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$); + master_modify_multiple_shards +------------------------------- + 1 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +------- + 4 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +-- Committed INSERT..SELECT should write 4 transaction recovery records +BEGIN; +INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery; +ROLLBACK; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 4 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + +\c - - - :master_port +DROP TABLE test_recovery; diff --git a/src/test/regress/sql/multi_transaction_recovery.sql b/src/test/regress/sql/multi_transaction_recovery.sql index b21e297f6..5f8f6bb3c 100644 --- a/src/test/regress/sql/multi_transaction_recovery.sql +++ b/src/test/regress/sql/multi_transaction_recovery.sql @@ -35,3 +35,51 @@ SELECT count(*) FROM pg_dist_transaction; \c - - - :worker_1_port SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort'; SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit'; + +\c - - - :master_port +SET citus.shard_replication_factor TO 2; +SET citus.shard_count TO 2; +SET citus.multi_shard_commit_protocol TO '2pc'; + +CREATE TABLE test_recovery (x text); +SELECT create_distributed_table('test_recovery', 'x'); +INSERT INTO test_recovery VALUES ('hello'); + +-- Committed DDL commands should write 4 transaction recovery records +BEGIN; +ALTER TABLE test_recovery ADD COLUMN y text; +ROLLBACK; +SELECT count(*) FROM pg_dist_transaction; + +ALTER TABLE test_recovery ADD COLUMN y text; + +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); +SELECT count(*) FROM pg_dist_transaction; + +-- Committed master_modify_multiple_shards should write 4 transaction recovery records +BEGIN; +SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$); +ROLLBACK; +SELECT count(*) FROM pg_dist_transaction; + +SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$); + +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); +SELECT count(*) FROM pg_dist_transaction; + +-- Committed INSERT..SELECT should write 4 transaction recovery records +BEGIN; +INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery; +ROLLBACK; +SELECT count(*) FROM pg_dist_transaction; + +INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery; + +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); +SELECT count(*) FROM pg_dist_transaction; + +\c - - - :master_port +DROP TABLE test_recovery;