mirror of https://github.com/citusdata/citus.git
Delete transactions when removing node
With this commit, we delete entries in pg_dist_transaction for the primary nodes that are removed by `master_remove_node`.pull/4381/head
parent
164d73ad8c
commit
936775e8e3
|
@ -42,6 +42,7 @@
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "distributed/shared_connection_stats.h"
|
#include "distributed/shared_connection_stats.h"
|
||||||
#include "distributed/string_utils.h"
|
#include "distributed/string_utils.h"
|
||||||
|
#include "distributed/transaction_recovery.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
|
@ -1131,6 +1132,12 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
|
||||||
"To proceed, either drop the distributed tables or use "
|
"To proceed, either drop the distributed tables or use "
|
||||||
"undistribute_table() function to convert them to local tables")));
|
"undistribute_table() function to convert them to local tables")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Secondary nodes are read-only, never 2PC is used.
|
||||||
|
* Hence, no items can be inserted to pg_dist_transaction for secondary nodes.
|
||||||
|
*/
|
||||||
|
DeleteWorkerTransactions(workerNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
DeleteNodeRow(workerNode->workerName, nodePort);
|
DeleteNodeRow(workerNode->workerName, nodePort);
|
||||||
|
|
|
@ -319,7 +319,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
||||||
* In addition, if the transaction started after the call to
|
* In addition, if the transaction started after the call to
|
||||||
* ActiveDistributedTransactionNumbers and finished just before our
|
* ActiveDistributedTransactionNumbers and finished just before our
|
||||||
* pg_dist_transaction snapshot, then it may still be in the process
|
* pg_dist_transaction snapshot, then it may still be in the process
|
||||||
* of comitting the prepared transactions in the post-commit callback
|
* of committing the prepared transactions in the post-commit callback
|
||||||
* and we should not touch the prepared transactions.
|
* and we should not touch the prepared transactions.
|
||||||
*
|
*
|
||||||
* To handle these cases, we just leave the records and prepared
|
* To handle these cases, we just leave the records and prepared
|
||||||
|
@ -511,3 +511,47 @@ RecoverPreparedTransactionOnWorker(MultiConnection *connection, char *transactio
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DeleteWorkerTransactions deletes the entries on pg_dist_transaction for a given
|
||||||
|
* worker node. It's implemented to be called at master_remove_node.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
DeleteWorkerTransactions(WorkerNode *workerNode)
|
||||||
|
{
|
||||||
|
if (workerNode == NULL)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We don't expect this, but let's be defensive since crashing is much worse
|
||||||
|
* than leaving pg_dist_transction entries.
|
||||||
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool indexOK = true;
|
||||||
|
int scanKeyCount = 1;
|
||||||
|
ScanKeyData scanKey[1];
|
||||||
|
int32 groupId = workerNode->groupId;
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
|
||||||
|
Relation pgDistTransaction = table_open(DistTransactionRelationId(),
|
||||||
|
RowExclusiveLock);
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid,
|
||||||
|
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
|
||||||
|
|
||||||
|
SysScanDesc scanDescriptor = systable_beginscan(pgDistTransaction,
|
||||||
|
DistTransactionGroupIndexId(),
|
||||||
|
indexOK,
|
||||||
|
NULL, scanKeyCount, scanKey);
|
||||||
|
|
||||||
|
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||||
|
{
|
||||||
|
simple_heap_delete(pgDistTransaction, &heapTuple->t_self);
|
||||||
|
}
|
||||||
|
|
||||||
|
CommandCounterIncrement();
|
||||||
|
systable_endscan(scanDescriptor);
|
||||||
|
table_close(pgDistTransaction, NoLock);
|
||||||
|
}
|
||||||
|
|
|
@ -19,6 +19,6 @@ extern int Recover2PCInterval;
|
||||||
/* Functions declarations for worker transactions */
|
/* Functions declarations for worker transactions */
|
||||||
extern void LogTransactionRecord(int32 groupId, char *transactionName);
|
extern void LogTransactionRecord(int32 groupId, char *transactionName);
|
||||||
extern int RecoverTwoPhaseCommits(void);
|
extern int RecoverTwoPhaseCommits(void);
|
||||||
|
extern void DeleteWorkerTransactions(WorkerNode *workerNode);
|
||||||
|
|
||||||
#endif /* TRANSACTION_RECOVERY_H */
|
#endif /* TRANSACTION_RECOVERY_H */
|
||||||
|
|
|
@ -23,7 +23,7 @@ INSERT INTO dest_table (a, b) VALUES (1, 1);
|
||||||
INSERT INTO dest_table (a, b) VALUES (2, 1);
|
INSERT INTO dest_table (a, b) VALUES (2, 1);
|
||||||
INSERT INTO source_table (a, b) VALUES (1, 5);
|
INSERT INTO source_table (a, b) VALUES (1, 5);
|
||||||
INSERT INTO source_table (a, b) VALUES (10, 10);
|
INSERT INTO source_table (a, b) VALUES (10, 10);
|
||||||
-- simluate actually having secondary nodes
|
-- simulate actually having secondary nodes
|
||||||
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node;
|
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | isactive | noderole | nodecluster
|
nodeid | groupid | nodename | nodeport | noderack | isactive | noderole | nodecluster
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
|
@ -28,12 +28,33 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- test recovery when removing node
|
||||||
|
CREATE TABLE recovery_test (x int, y int);
|
||||||
|
SELECT create_distributed_table('recovery_test','x');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE recovery_test;
|
||||||
SELECT master_remove_node('localhost', :worker_2_port);
|
SELECT master_remove_node('localhost', :worker_2_port);
|
||||||
master_remove_node
|
master_remove_node
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- verify node is removed
|
-- verify node is removed
|
||||||
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
||||||
count
|
count
|
||||||
|
@ -132,7 +153,7 @@ WHERE
|
||||||
nodeport = :worker_2_port;
|
nodeport = :worker_2_port;
|
||||||
shardid | shardstate | shardlength | nodename | nodeport
|
shardid | shardstate | shardlength | nodename | nodeport
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
1380000 | 1 | 0 | localhost | 57638
|
1380004 | 1 | 0 | localhost | 57638
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||||
|
@ -161,7 +182,7 @@ WHERE
|
||||||
nodeport = :worker_2_port;
|
nodeport = :worker_2_port;
|
||||||
shardid | shardstate | shardlength | nodename | nodeport
|
shardid | shardstate | shardlength | nodename | nodeport
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
1380000 | 1 | 0 | localhost | 57638
|
1380004 | 1 | 0 | localhost | 57638
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
|
@ -1059,7 +1059,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
--
|
--
|
||||||
SET citus.replicate_reference_tables_on_activate TO off;
|
SET citus.replicate_reference_tables_on_activate TO off;
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
select master_remove_node('localhost', :worker_2_port);
|
SELECT master_remove_node('localhost', :worker_2_port);
|
||||||
master_remove_node
|
master_remove_node
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -362,6 +362,7 @@ SELECT pg_reload_conf();
|
||||||
DROP TABLE test_recovery_ref;
|
DROP TABLE test_recovery_ref;
|
||||||
DROP TABLE test_recovery;
|
DROP TABLE test_recovery;
|
||||||
DROP TABLE test_recovery_single;
|
DROP TABLE test_recovery_single;
|
||||||
|
DROP TABLE test_reference_table;
|
||||||
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
||||||
?column?
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
|
@ -19,7 +19,7 @@ INSERT INTO dest_table (a, b) VALUES (2, 1);
|
||||||
INSERT INTO source_table (a, b) VALUES (1, 5);
|
INSERT INTO source_table (a, b) VALUES (1, 5);
|
||||||
INSERT INTO source_table (a, b) VALUES (10, 10);
|
INSERT INTO source_table (a, b) VALUES (10, 10);
|
||||||
|
|
||||||
-- simluate actually having secondary nodes
|
-- simulate actually having secondary nodes
|
||||||
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node;
|
SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node;
|
||||||
UPDATE pg_dist_node SET noderole = 'secondary';
|
UPDATE pg_dist_node SET noderole = 'secondary';
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,13 @@ SELECT master_remove_node('localhost', 55555);
|
||||||
-- verify node exist before removal
|
-- verify node exist before removal
|
||||||
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
||||||
|
|
||||||
|
-- test recovery when removing node
|
||||||
|
CREATE TABLE recovery_test (x int, y int);
|
||||||
|
SELECT create_distributed_table('recovery_test','x');
|
||||||
|
DROP TABLE recovery_test;
|
||||||
SELECT master_remove_node('localhost', :worker_2_port);
|
SELECT master_remove_node('localhost', :worker_2_port);
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
-- verify node is removed
|
-- verify node is removed
|
||||||
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
||||||
|
|
|
@ -654,7 +654,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
SET citus.replicate_reference_tables_on_activate TO off;
|
SET citus.replicate_reference_tables_on_activate TO off;
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
select master_remove_node('localhost', :worker_2_port);
|
SELECT master_remove_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
CREATE TABLE ref (a int primary key, b int);
|
CREATE TABLE ref (a int primary key, b int);
|
||||||
SELECT create_reference_table('ref');
|
SELECT create_reference_table('ref');
|
||||||
|
|
|
@ -200,5 +200,6 @@ SELECT pg_reload_conf();
|
||||||
DROP TABLE test_recovery_ref;
|
DROP TABLE test_recovery_ref;
|
||||||
DROP TABLE test_recovery;
|
DROP TABLE test_recovery;
|
||||||
DROP TABLE test_recovery_single;
|
DROP TABLE test_recovery_single;
|
||||||
|
DROP TABLE test_reference_table;
|
||||||
|
|
||||||
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
||||||
|
|
Loading…
Reference in New Issue