From 936775e8e36b84b29d6cc74dad5cd4911f74e609 Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Thu, 3 Dec 2020 14:31:27 +0300 Subject: [PATCH] Delete transactions when removing node With this commit, we delete entries in pg_dist_transaction for the primary nodes that are removed by `master_remove_node`. --- .../distributed/metadata/node_metadata.c | 7 +++ .../transaction/transaction_recovery.c | 46 ++++++++++++++++++- .../distributed/transaction_recovery.h | 2 +- .../expected/multi_read_from_secondaries.out | 2 +- .../multi_remove_node_reference_table.out | 25 +++++++++- .../multi_replicate_reference_table.out | 2 +- .../expected/multi_transaction_recovery.out | 1 + .../sql/multi_read_from_secondaries.sql | 2 +- .../sql/multi_remove_node_reference_table.sql | 6 +++ .../sql/multi_replicate_reference_table.sql | 2 +- .../sql/multi_transaction_recovery.sql | 1 + 11 files changed, 88 insertions(+), 8 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 92340f638..65176152a 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -42,6 +42,7 @@ #include "distributed/shardinterval_utils.h" #include "distributed/shared_connection_stats.h" #include "distributed/string_utils.h" +#include "distributed/transaction_recovery.h" #include "distributed/version_compat.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" @@ -1131,6 +1132,12 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) "To proceed, either drop the distributed tables or use " "undistribute_table() function to convert them to local tables"))); } + + /* + * Secondary nodes are read-only, never 2PC is used. + * Hence, no items can be inserted to pg_dist_transaction for secondary nodes. + */ + DeleteWorkerTransactions(workerNode); } DeleteNodeRow(workerNode->workerName, nodePort); diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index a3f13388a..ac1f704d7 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -319,7 +319,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode) * In addition, if the transaction started after the call to * ActiveDistributedTransactionNumbers and finished just before our * pg_dist_transaction snapshot, then it may still be in the process - * of comitting the prepared transactions in the post-commit callback + * of committing the prepared transactions in the post-commit callback * and we should not touch the prepared transactions. * * To handle these cases, we just leave the records and prepared @@ -511,3 +511,47 @@ RecoverPreparedTransactionOnWorker(MultiConnection *connection, char *transactio return true; } + + +/* + * DeleteWorkerTransactions deletes the entries on pg_dist_transaction for a given + * worker node. It's implemented to be called at master_remove_node. + */ +void +DeleteWorkerTransactions(WorkerNode *workerNode) +{ + if (workerNode == NULL) + { + /* + * We don't expect this, but let's be defensive since crashing is much worse + * than leaving pg_dist_transction entries. + */ + return; + } + + bool indexOK = true; + int scanKeyCount = 1; + ScanKeyData scanKey[1]; + int32 groupId = workerNode->groupId; + HeapTuple heapTuple = NULL; + + Relation pgDistTransaction = table_open(DistTransactionRelationId(), + RowExclusiveLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistTransaction, + DistTransactionGroupIndexId(), + indexOK, + NULL, scanKeyCount, scanKey); + + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + simple_heap_delete(pgDistTransaction, &heapTuple->t_self); + } + + CommandCounterIncrement(); + systable_endscan(scanDescriptor); + table_close(pgDistTransaction, NoLock); +} diff --git a/src/include/distributed/transaction_recovery.h b/src/include/distributed/transaction_recovery.h index 0df696c01..811dbb949 100644 --- a/src/include/distributed/transaction_recovery.h +++ b/src/include/distributed/transaction_recovery.h @@ -19,6 +19,6 @@ extern int Recover2PCInterval; /* Functions declarations for worker transactions */ extern void LogTransactionRecord(int32 groupId, char *transactionName); extern int RecoverTwoPhaseCommits(void); - +extern void DeleteWorkerTransactions(WorkerNode *workerNode); #endif /* TRANSACTION_RECOVERY_H */ diff --git a/src/test/regress/expected/multi_read_from_secondaries.out b/src/test/regress/expected/multi_read_from_secondaries.out index d7818ec72..6e7dc64bc 100644 --- a/src/test/regress/expected/multi_read_from_secondaries.out +++ b/src/test/regress/expected/multi_read_from_secondaries.out @@ -23,7 +23,7 @@ INSERT INTO dest_table (a, b) VALUES (1, 1); INSERT INTO dest_table (a, b) VALUES (2, 1); INSERT INTO source_table (a, b) VALUES (1, 5); INSERT INTO source_table (a, b) VALUES (10, 10); --- simluate actually having secondary nodes +-- simulate actually having secondary nodes SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node; nodeid | groupid | nodename | nodeport | noderack | isactive | noderole | nodecluster --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_remove_node_reference_table.out b/src/test/regress/expected/multi_remove_node_reference_table.out index 8e87d4e9c..dd51e1500 100644 --- a/src/test/regress/expected/multi_remove_node_reference_table.out +++ b/src/test/regress/expected/multi_remove_node_reference_table.out @@ -28,12 +28,33 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; 1 (1 row) +-- test recovery when removing node +CREATE TABLE recovery_test (x int, y int); +SELECT create_distributed_table('recovery_test','x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +DROP TABLE recovery_test; SELECT master_remove_node('localhost', :worker_2_port); master_remove_node --------------------------------------------------------------------- (1 row) +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + -- verify node is removed SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; count @@ -132,7 +153,7 @@ WHERE nodeport = :worker_2_port; shardid | shardstate | shardlength | nodename | nodeport --------------------------------------------------------------------- - 1380000 | 1 | 0 | localhost | 57638 + 1380004 | 1 | 0 | localhost | 57638 (1 row) SELECT shardcount, replicationfactor, distributioncolumntype @@ -161,7 +182,7 @@ WHERE nodeport = :worker_2_port; shardid | shardstate | shardlength | nodename | nodeport --------------------------------------------------------------------- - 1380000 | 1 | 0 | localhost | 57638 + 1380004 | 1 | 0 | localhost | 57638 (1 row) \c - - - :master_port diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index d09b2e378..a177dbfe3 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -1059,7 +1059,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); -- SET citus.replicate_reference_tables_on_activate TO off; SET citus.shard_replication_factor TO 1; -select master_remove_node('localhost', :worker_2_port); +SELECT master_remove_node('localhost', :worker_2_port); master_remove_node --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_transaction_recovery.out b/src/test/regress/expected/multi_transaction_recovery.out index 7c5cef8e4..e2359bb4b 100644 --- a/src/test/regress/expected/multi_transaction_recovery.out +++ b/src/test/regress/expected/multi_transaction_recovery.out @@ -362,6 +362,7 @@ SELECT pg_reload_conf(); DROP TABLE test_recovery_ref; DROP TABLE test_recovery; DROP TABLE test_recovery_single; +DROP TABLE test_reference_table; SELECT 1 FROM master_remove_node('localhost', :master_port); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/sql/multi_read_from_secondaries.sql b/src/test/regress/sql/multi_read_from_secondaries.sql index 3f530561d..c9ef21b11 100644 --- a/src/test/regress/sql/multi_read_from_secondaries.sql +++ b/src/test/regress/sql/multi_read_from_secondaries.sql @@ -19,7 +19,7 @@ INSERT INTO dest_table (a, b) VALUES (2, 1); INSERT INTO source_table (a, b) VALUES (1, 5); INSERT INTO source_table (a, b) VALUES (10, 10); --- simluate actually having secondary nodes +-- simulate actually having secondary nodes SELECT nodeid, groupid, nodename, nodeport, noderack, isactive, noderole, nodecluster FROM pg_dist_node; UPDATE pg_dist_node SET noderole = 'secondary'; diff --git a/src/test/regress/sql/multi_remove_node_reference_table.sql b/src/test/regress/sql/multi_remove_node_reference_table.sql index 5f7e8357c..a79aa30ed 100644 --- a/src/test/regress/sql/multi_remove_node_reference_table.sql +++ b/src/test/regress/sql/multi_remove_node_reference_table.sql @@ -26,7 +26,13 @@ SELECT master_remove_node('localhost', 55555); -- verify node exist before removal SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; +-- test recovery when removing node +CREATE TABLE recovery_test (x int, y int); +SELECT create_distributed_table('recovery_test','x'); +DROP TABLE recovery_test; SELECT master_remove_node('localhost', :worker_2_port); +SELECT recover_prepared_transactions(); +SELECT count(*) FROM pg_dist_transaction; -- verify node is removed SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index 8acd08f32..b692d6bf3 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -654,7 +654,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SET citus.replicate_reference_tables_on_activate TO off; SET citus.shard_replication_factor TO 1; -select master_remove_node('localhost', :worker_2_port); +SELECT master_remove_node('localhost', :worker_2_port); CREATE TABLE ref (a int primary key, b int); SELECT create_reference_table('ref'); diff --git a/src/test/regress/sql/multi_transaction_recovery.sql b/src/test/regress/sql/multi_transaction_recovery.sql index 3b6efefba..4080e2b8c 100644 --- a/src/test/regress/sql/multi_transaction_recovery.sql +++ b/src/test/regress/sql/multi_transaction_recovery.sql @@ -200,5 +200,6 @@ SELECT pg_reload_conf(); DROP TABLE test_recovery_ref; DROP TABLE test_recovery; DROP TABLE test_recovery_single; +DROP TABLE test_reference_table; SELECT 1 FROM master_remove_node('localhost', :master_port);