mirror of https://github.com/citusdata/citus.git
Enable transaction recovery in connection API
parent
92c7567008
commit
6cbc1945f9
|
@ -16,9 +16,12 @@
|
||||||
|
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/remote_transaction.h"
|
#include "distributed/remote_transaction.h"
|
||||||
|
#include "distributed/transaction_management.h"
|
||||||
|
#include "distributed/transaction_recovery.h"
|
||||||
|
#include "distributed/worker_manager.h"
|
||||||
#include "utils/hsearch.h"
|
#include "utils/hsearch.h"
|
||||||
|
|
||||||
|
|
||||||
|
@ -376,6 +379,7 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection)
|
||||||
RemoteTransaction *transaction = &connection->remoteTransaction;
|
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||||
StringInfoData command;
|
StringInfoData command;
|
||||||
const bool raiseErrors = true;
|
const bool raiseErrors = true;
|
||||||
|
WorkerNode *workerNode = NULL;
|
||||||
|
|
||||||
/* can't prepare a nonexistant transaction */
|
/* can't prepare a nonexistant transaction */
|
||||||
Assert(transaction->transactionState != REMOTE_TRANS_INVALID);
|
Assert(transaction->transactionState != REMOTE_TRANS_INVALID);
|
||||||
|
@ -388,6 +392,13 @@ StartRemoteTransactionPrepare(struct MultiConnection *connection)
|
||||||
|
|
||||||
Assign2PCIdentifier(connection);
|
Assign2PCIdentifier(connection);
|
||||||
|
|
||||||
|
/* log transactions to workers in pg_dist_transaction */
|
||||||
|
workerNode = FindWorkerNode(connection->hostname, connection->port);
|
||||||
|
if (workerNode != NULL)
|
||||||
|
{
|
||||||
|
LogTransactionRecord(workerNode->groupId, transaction->preparedName);
|
||||||
|
}
|
||||||
|
|
||||||
initStringInfo(&command);
|
initStringInfo(&command);
|
||||||
appendStringInfo(&command, "PREPARE TRANSACTION '%s'",
|
appendStringInfo(&command, "PREPARE TRANSACTION '%s'",
|
||||||
transaction->preparedName);
|
transaction->preparedName);
|
||||||
|
@ -826,7 +837,7 @@ Assign2PCIdentifier(MultiConnection *connection)
|
||||||
{
|
{
|
||||||
static uint64 sequence = 0;
|
static uint64 sequence = 0;
|
||||||
snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN,
|
snprintf(connection->remoteTransaction.preparedName, NAMEDATALEN,
|
||||||
"citus_%d_"UINT64_FORMAT,
|
"citus_%d_%d_"UINT64_FORMAT, GetLocalGroupId(),
|
||||||
MyProcPid, sequence++);
|
MyProcPid, sequence++);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,6 @@ PG_FUNCTION_INFO_V1(recover_prepared_transactions);
|
||||||
|
|
||||||
|
|
||||||
/* Local functions forward declarations */
|
/* Local functions forward declarations */
|
||||||
static void LogTransactionRecord(int groupId, char *transactionName);
|
|
||||||
static int RecoverPreparedTransactions(void);
|
static int RecoverPreparedTransactions(void);
|
||||||
static int RecoverWorkerTransactions(WorkerNode *workerNode);
|
static int RecoverWorkerTransactions(WorkerNode *workerNode);
|
||||||
static List * NameListDifference(List *nameList, List *subtractList);
|
static List * NameListDifference(List *nameList, List *subtractList);
|
||||||
|
@ -106,7 +105,7 @@ LogPreparedTransactions(List *connectionList)
|
||||||
* prepared on a worker. The presence of this record indicates that the
|
* prepared on a worker. The presence of this record indicates that the
|
||||||
* prepared transaction should be committed.
|
* prepared transaction should be committed.
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
LogTransactionRecord(int groupId, char *transactionName)
|
LogTransactionRecord(int groupId, char *transactionName)
|
||||||
{
|
{
|
||||||
Relation pgDistTransaction = NULL;
|
Relation pgDistTransaction = NULL;
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
/* Functions declarations for worker transactions */
|
/* Functions declarations for worker transactions */
|
||||||
extern void LogPreparedTransactions(List *connectionList);
|
extern void LogPreparedTransactions(List *connectionList);
|
||||||
|
extern void LogTransactionRecord(int groupId, char *transactionName);
|
||||||
|
|
||||||
|
|
||||||
#endif /* TRANSACTION_RECOVERY_H */
|
#endif /* TRANSACTION_RECOVERY_H */
|
||||||
|
|
|
@ -60,3 +60,114 @@ SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit';
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
SET citus.shard_count TO 2;
|
||||||
|
SET citus.multi_shard_commit_protocol TO '2pc';
|
||||||
|
CREATE TABLE test_recovery (x text);
|
||||||
|
SELECT create_distributed_table('test_recovery', 'x');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_recovery VALUES ('hello');
|
||||||
|
-- Committed DDL commands should write 4 transaction recovery records
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE test_recovery ADD COLUMN y text;
|
||||||
|
ROLLBACK;
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ALTER TABLE test_recovery ADD COLUMN y text;
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Committed master_modify_multiple_shards should write 4 transaction recovery records
|
||||||
|
BEGIN;
|
||||||
|
SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$);
|
||||||
|
master_modify_multiple_shards
|
||||||
|
-------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$);
|
||||||
|
master_modify_multiple_shards
|
||||||
|
-------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Committed INSERT..SELECT should write 4 transaction recovery records
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
|
||||||
|
ROLLBACK;
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
DROP TABLE test_recovery;
|
||||||
|
|
|
@ -35,3 +35,51 @@ SELECT count(*) FROM pg_dist_transaction;
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort';
|
SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort';
|
||||||
SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit';
|
SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit';
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
SET citus.shard_count TO 2;
|
||||||
|
SET citus.multi_shard_commit_protocol TO '2pc';
|
||||||
|
|
||||||
|
CREATE TABLE test_recovery (x text);
|
||||||
|
SELECT create_distributed_table('test_recovery', 'x');
|
||||||
|
INSERT INTO test_recovery VALUES ('hello');
|
||||||
|
|
||||||
|
-- Committed DDL commands should write 4 transaction recovery records
|
||||||
|
BEGIN;
|
||||||
|
ALTER TABLE test_recovery ADD COLUMN y text;
|
||||||
|
ROLLBACK;
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
|
ALTER TABLE test_recovery ADD COLUMN y text;
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
|
-- Committed master_modify_multiple_shards should write 4 transaction recovery records
|
||||||
|
BEGIN;
|
||||||
|
SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$);
|
||||||
|
ROLLBACK;
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
|
SELECT master_modify_multiple_shards($$UPDATE test_recovery SET y = 'world'$$);
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
|
-- Committed INSERT..SELECT should write 4 transaction recovery records
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
|
||||||
|
ROLLBACK;
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
|
INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
DROP TABLE test_recovery;
|
||||||
|
|
Loading…
Reference in New Issue