From 95da74c47f4f95361e6b053ef8febcde0fed3f1c Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Tue, 4 Mar 2025 15:11:01 +0500 Subject: [PATCH] Fix Deadlock with transaction recovery is possible during Citus upgrades (#7910) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DESCRIPTION: Fixes deadlock with transaction recovery that is possible during Citus upgrades. Fixes #7875. This commit addresses two interrelated deadlock issues uncovered during Citus upgrades: 1. Local Deadlock: - **Problem:** In `RecoverWorkerTransactions()`, a new connection is created for each worker node to perform transaction recovery by locking the `pg_dist_transaction` catalog table until the end of the transaction. When `RecoverTwoPhaseCommits()` calls this function for each worker node, the order of acquiring locks on `pg_dist_authinfo` and `pg_dist_transaction` can alternate. This reversal can lead to a deadlock if any concurrent process requires locks on these tables. - **Fix:** Pre-establish all worker node connections upfront so that `RecoverWorkerTransactions()` operates with a single, consistent connection. This ensures that locks on `pg_dist_authinfo` and `pg_dist_transaction` are always acquired in the correct order, thereby preventing the local deadlock. 2. Distributed Deadlock: - **Problem:** After resolving the local deadlock, a distributed deadlock issue emerges. The maintenance daemon calls `RecoverWorkerTransactions()` on each worker node— including the local node—which leads to a complex locking sequence: - A RowExclusiveLock is taken on the `pg_dist_transaction` table in `RecoverWorkerTransactions()`. - An update extension then tries to acquire an AccessExclusiveLock on the same table, getting blocked by the RowExclusiveLock. - A subsequent query (e.g., a SELECT on `pg_prepared_xacts`) issued using a separate connection on the local node gets blocked due to locks held during a call to `BuildCitusTableCacheEntry()`. - The maintenance daemon waits for this query, resulting in a circular wait and stalling the entire cluster. - **Fix:** Avoid cache lookups for internal PostgreSQL tables by implementing an early bailout for relation IDs below `FirstNormalObjectId` (system objects). This eliminates unnecessary calls to `BuildCitusTableCache`, reducing lock contention and mitigating the distributed deadlock. Furthermore, this optimization improves performance in fast connect→query_catalog→disconnect cycles by eliminating redundant cache creation and lookups. 3. Also reverts the commit that disabled the relevant test cases. --- .../distributed/metadata/metadata_cache.c | 12 +++++ .../transaction/transaction_recovery.c | 51 +++++++++++++++++-- .../citus_tests/upgrade/citus_upgrade_test.py | 18 ------- 3 files changed, 58 insertions(+), 23 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index b603e9dda..79cc61092 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -661,6 +661,18 @@ GetTableTypeName(Oid tableId) bool IsCitusTable(Oid relationId) { + /* + * PostgreSQL's OID generator assigns user operation OIDs starting + * from FirstNormalObjectId. This means no user object can have + * an OID lower than FirstNormalObjectId. Therefore, if the + * relationId is less than FirstNormalObjectId + * (i.e. in PostgreSQL's reserved range), we can immediately + * return false, since such objects cannot be Citus tables. + */ + if (relationId < FirstNormalObjectId) + { + return false; + } return LookupCitusTableCacheEntry(relationId) != NULL; } diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 0eede84ca..59432a313 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -56,7 +56,8 @@ PG_FUNCTION_INFO_V1(recover_prepared_transactions); /* Local functions forward declarations */ -static int RecoverWorkerTransactions(WorkerNode *workerNode); +static int RecoverWorkerTransactions(WorkerNode *workerNode, + MultiConnection *connection); static List * PendingWorkerTransactionList(MultiConnection *connection); static bool IsTransactionInProgress(HTAB *activeTransactionNumberSet, char *preparedTransactionName); @@ -127,10 +128,51 @@ RecoverTwoPhaseCommits(void) LockTransactionRecovery(ShareUpdateExclusiveLock); List *workerList = ActivePrimaryNodeList(NoLock); + List *workerConnections = NIL; WorkerNode *workerNode = NULL; + MultiConnection *connection = NULL; + + /* + * Pre-establish all connections to worker nodes. + * + * We do this to enforce a consistent lock acquisition order and prevent deadlocks. + * Currently, during extension updates, we take strong locks on the Citus + * catalog tables in a specific order: first on pg_dist_authinfo, then on + * pg_dist_transaction. It's critical that any operation locking these two + * tables adheres to this order, or a deadlock could occur. + * + * Note that RecoverWorkerTransactions() retains its lock until the end + * of the transaction, while GetNodeConnection() releases its lock after + * the catalog lookup. So when there are multiple workers in the active primary + * node list, the lock acquisition order may reverse in subsequent iterations + * of the loop calling RecoverWorkerTransactions(), increasing the risk + * of deadlock. + * + * By establishing all worker connections upfront, we ensure that + * RecoverWorkerTransactions() deals with a single distributed catalog table, + * thereby preventing deadlocks regardless of the lock acquisition sequence + * used in the upgrade extension script. + */ + foreach_declared_ptr(workerNode, workerList) { - recoveredTransactionCount += RecoverWorkerTransactions(workerNode); + int connectionFlags = 0; + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + + connection = GetNodeConnection(connectionFlags, nodeName, nodePort); + Assert(connection != NULL); + + /* + * We don't verify connection validity here. + * Instead, RecoverWorkerTransactions() performs the necessary + * sanity checks on the connection state. + */ + workerConnections = lappend(workerConnections, connection); + } + forboth_ptr(workerNode, workerList, connection, workerConnections) + { + recoveredTransactionCount += RecoverWorkerTransactions(workerNode, connection); } return recoveredTransactionCount; @@ -142,7 +184,7 @@ RecoverTwoPhaseCommits(void) * started by this node on the specified worker. */ static int -RecoverWorkerTransactions(WorkerNode *workerNode) +RecoverWorkerTransactions(WorkerNode *workerNode, MultiConnection *connection) { int recoveredTransactionCount = 0; @@ -160,8 +202,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode) bool recoveryFailed = false; - int connectionFlags = 0; - MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); + Assert(connection != NULL); if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK) { ereport(WARNING, (errmsg("transaction recovery cannot connect to %s:%d", diff --git a/src/test/regress/citus_tests/upgrade/citus_upgrade_test.py b/src/test/regress/citus_tests/upgrade/citus_upgrade_test.py index c25a34482..1ab448031 100755 --- a/src/test/regress/citus_tests/upgrade/citus_upgrade_test.py +++ b/src/test/regress/citus_tests/upgrade/citus_upgrade_test.py @@ -62,16 +62,10 @@ def run_citus_upgrade_tests(config, before_upgrade_schedule, after_upgrade_sched install_citus(config.post_tar_path) - # disable 2pc recovery for all nodes to work around https://github.com/citusdata/citus/issues/7875 - disable_2pc_recovery_for_all_nodes(config.bindir, config) - restart_databases(config.bindir, config.datadir, config.mixed_mode, config) run_alter_citus(config.bindir, config.mixed_mode, config) verify_upgrade(config, config.mixed_mode, config.node_name_to_ports.values()) - # re-enable 2pc recovery for all nodes - enable_2pc_recovery_for_all_nodes(config.bindir, config) - run_test_on_coordinator(config, after_upgrade_schedule) remove_citus(config.post_tar_path) @@ -152,18 +146,6 @@ def restart_database(pg_path, abs_data_path, node_name, node_ports, logfile_pref subprocess.run(command, check=True) -def disable_2pc_recovery_for_all_nodes(pg_path, config): - for port in config.node_name_to_ports.values(): - utils.psql(pg_path, port, "ALTER SYSTEM SET citus.recover_2pc_interval TO -1;") - utils.psql(pg_path, port, "SELECT pg_reload_conf();") - - -def enable_2pc_recovery_for_all_nodes(pg_path, config): - for port in config.node_name_to_ports.values(): - utils.psql(pg_path, port, "ALTER SYSTEM RESET citus.recover_2pc_interval;") - utils.psql(pg_path, port, "SELECT pg_reload_conf();") - - def run_alter_citus(pg_path, mixed_mode, config): for port in config.node_name_to_ports.values(): if mixed_mode and port in (