mirror of https://github.com/citusdata/citus.git
Merge pull request #4381 from citusdata/recover-transactions-when-removing-node
Delete transactions when removing nodepull/4390/head
commit
3d8a7c1741
|
@ -42,6 +42,7 @@
|
|||
#include "distributed/shardinterval_utils.h"
|
||||
#include "distributed/shared_connection_stats.h"
|
||||
#include "distributed/string_utils.h"
|
||||
#include "distributed/transaction_recovery.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
|
@ -1131,6 +1132,12 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
|
|||
"To proceed, either drop the distributed tables or use "
|
||||
"undistribute_table() function to convert them to local tables")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Secondary nodes are read-only, never 2PC is used.
|
||||
* Hence, no items can be inserted to pg_dist_transaction for secondary nodes.
|
||||
*/
|
||||
DeleteWorkerTransactions(workerNode);
|
||||
}
|
||||
|
||||
DeleteNodeRow(workerNode->workerName, nodePort);
|
||||
|
|
|
@ -319,7 +319,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
|||
* In addition, if the transaction started after the call to
|
||||
* ActiveDistributedTransactionNumbers and finished just before our
|
||||
* pg_dist_transaction snapshot, then it may still be in the process
|
||||
* of comitting the prepared transactions in the post-commit callback
|
||||
* of committing the prepared transactions in the post-commit callback
|
||||
* and we should not touch the prepared transactions.
|
||||
*
|
||||
* To handle these cases, we just leave the records and prepared
|
||||
|
@ -511,3 +511,47 @@ RecoverPreparedTransactionOnWorker(MultiConnection *connection, char *transactio
|
|||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeleteWorkerTransactions deletes the entries on pg_dist_transaction for a given
|
||||
* worker node. It's implemented to be called at master_remove_node.
|
||||
*/
|
||||
void
|
||||
DeleteWorkerTransactions(WorkerNode *workerNode)
|
||||
{
|
||||
if (workerNode == NULL)
|
||||
{
|
||||
/*
|
||||
* We don't expect this, but let's be defensive since crashing is much worse
|
||||
* than leaving pg_dist_transction entries.
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
bool indexOK = true;
|
||||
int scanKeyCount = 1;
|
||||
ScanKeyData scanKey[1];
|
||||
int32 groupId = workerNode->groupId;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
||||
Relation pgDistTransaction = table_open(DistTransactionRelationId(),
|
||||
RowExclusiveLock);
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid,
|
||||
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
|
||||
|
||||
SysScanDesc scanDescriptor = systable_beginscan(pgDistTransaction,
|
||||
DistTransactionGroupIndexId(),
|
||||
indexOK,
|
||||
NULL, scanKeyCount, scanKey);
|
||||
|
||||
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||
{
|
||||
simple_heap_delete(pgDistTransaction, &heapTuple->t_self);
|
||||
}
|
||||
|
||||
CommandCounterIncrement();
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistTransaction, NoLock);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,6 @@ extern int Recover2PCInterval;
|
|||
/* Functions declarations for worker transactions */
|
||||
extern void LogTransactionRecord(int32 groupId, char *transactionName);
|
||||
extern int RecoverTwoPhaseCommits(void);
|
||||
|
||||
extern void DeleteWorkerTransactions(WorkerNode *workerNode);
|
||||
|
||||
#endif /* TRANSACTION_RECOVERY_H */
|
||||
|
|
|
@ -23,7 +23,7 @@ INSERT INTO dest_table (a, b) VALUES (1, 1);
|
|||
INSERT INTO dest_table (a, b) VALUES (2, 1);
|
||||
INSERT INTO source_table (a, b) VALUES (1, 5);
|
||||
INSERT INTO source_table (a, b) VALUES (10, 10);
|
||||
-- simluate actually having secondary nodes
|
||||
-- simulate actually having secondary nodes
|
||||
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node;
|
||||
nodeid | groupid | nodename | nodeport | noderack | isactive | noderole | nodecluster
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -28,12 +28,33 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
|||
1
|
||||
(1 row)
|
||||
|
||||
-- test recovery when removing node
|
||||
CREATE TABLE recovery_test (x int, y int);
|
||||
SELECT create_distributed_table('recovery_test','x');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
DROP TABLE recovery_test;
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
master_remove_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT recover_prepared_transactions();
|
||||
recover_prepared_transactions
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_transaction;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- verify node is removed
|
||||
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
||||
count
|
||||
|
@ -132,7 +153,7 @@ WHERE
|
|||
nodeport = :worker_2_port;
|
||||
shardid | shardstate | shardlength | nodename | nodeport
|
||||
---------------------------------------------------------------------
|
||||
1380000 | 1 | 0 | localhost | 57638
|
||||
1380004 | 1 | 0 | localhost | 57638
|
||||
(1 row)
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
|
@ -161,7 +182,7 @@ WHERE
|
|||
nodeport = :worker_2_port;
|
||||
shardid | shardstate | shardlength | nodename | nodeport
|
||||
---------------------------------------------------------------------
|
||||
1380000 | 1 | 0 | localhost | 57638
|
||||
1380004 | 1 | 0 | localhost | 57638
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
|
|
|
@ -1059,7 +1059,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
|||
--
|
||||
SET citus.replicate_reference_tables_on_activate TO off;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
select master_remove_node('localhost', :worker_2_port);
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
master_remove_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -362,6 +362,7 @@ SELECT pg_reload_conf();
|
|||
DROP TABLE test_recovery_ref;
|
||||
DROP TABLE test_recovery;
|
||||
DROP TABLE test_recovery_single;
|
||||
DROP TABLE test_reference_table;
|
||||
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -19,7 +19,7 @@ INSERT INTO dest_table (a, b) VALUES (2, 1);
|
|||
INSERT INTO source_table (a, b) VALUES (1, 5);
|
||||
INSERT INTO source_table (a, b) VALUES (10, 10);
|
||||
|
||||
-- simluate actually having secondary nodes
|
||||
-- simulate actually having secondary nodes
|
||||
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node;
|
||||
UPDATE pg_dist_node SET noderole = 'secondary';
|
||||
|
||||
|
|
|
@ -26,7 +26,13 @@ SELECT master_remove_node('localhost', 55555);
|
|||
-- verify node exist before removal
|
||||
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
||||
|
||||
-- test recovery when removing node
|
||||
CREATE TABLE recovery_test (x int, y int);
|
||||
SELECT create_distributed_table('recovery_test','x');
|
||||
DROP TABLE recovery_test;
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
SELECT recover_prepared_transactions();
|
||||
SELECT count(*) FROM pg_dist_transaction;
|
||||
|
||||
-- verify node is removed
|
||||
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
||||
|
|
|
@ -654,7 +654,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
|||
SET citus.replicate_reference_tables_on_activate TO off;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
select master_remove_node('localhost', :worker_2_port);
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
|
||||
CREATE TABLE ref (a int primary key, b int);
|
||||
SELECT create_reference_table('ref');
|
||||
|
|
|
@ -200,5 +200,6 @@ SELECT pg_reload_conf();
|
|||
DROP TABLE test_recovery_ref;
|
||||
DROP TABLE test_recovery;
|
||||
DROP TABLE test_recovery_single;
|
||||
DROP TABLE test_reference_table;
|
||||
|
||||
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
||||
|
|
Loading…
Reference in New Issue