mirror of https://github.com/citusdata/citus.git
Fix Deadlock with transaction recovery is possible during Citus upgrades (#7910)
DESCRIPTION: Fixes deadlock with transaction recovery that is possible during Citus upgrades. Fixes #7875. This commit addresses two interrelated deadlock issues uncovered during Citus upgrades: 1. Local Deadlock: - **Problem:** In `RecoverWorkerTransactions()`, a new connection is created for each worker node to perform transaction recovery by locking the `pg_dist_transaction` catalog table until the end of the transaction. When `RecoverTwoPhaseCommits()` calls this function for each worker node, the order of acquiring locks on `pg_dist_authinfo` and `pg_dist_transaction` can alternate. This reversal can lead to a deadlock if any concurrent process requires locks on these tables. - **Fix:** Pre-establish all worker node connections upfront so that `RecoverWorkerTransactions()` operates with a single, consistent connection. This ensures that locks on `pg_dist_authinfo` and `pg_dist_transaction` are always acquired in the correct order, thereby preventing the local deadlock. 2. Distributed Deadlock: - **Problem:** After resolving the local deadlock, a distributed deadlock issue emerges. The maintenance daemon calls `RecoverWorkerTransactions()` on each worker node— including the local node—which leads to a complex locking sequence: - A RowExclusiveLock is taken on the `pg_dist_transaction` table in `RecoverWorkerTransactions()`. - An update extension then tries to acquire an AccessExclusiveLock on the same table, getting blocked by the RowExclusiveLock. - A subsequent query (e.g., a SELECT on `pg_prepared_xacts`) issued using a separate connection on the local node gets blocked due to locks held during a call to `BuildCitusTableCacheEntry()`. - The maintenance daemon waits for this query, resulting in a circular wait and stalling the entire cluster. - **Fix:** Avoid cache lookups for internal PostgreSQL tables by implementing an early bailout for relation IDs below `FirstNormalObjectId` (system objects). This eliminates unnecessary calls to `BuildCitusTableCache`, reducing lock contention and mitigating the distributed deadlock. Furthermore, this optimization improves performance in fast connect→query_catalog→disconnect cycles by eliminating redundant cache creation and lookups. 3. Also reverts the commit that disabled the relevant test cases.naisila/merge_13_0_first_try^2
parent
86107ca191
commit
43f3786c1f
|
@ -660,6 +660,18 @@ GetTableTypeName(Oid tableId)
|
||||||
bool
|
bool
|
||||||
IsCitusTable(Oid relationId)
|
IsCitusTable(Oid relationId)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* PostgreSQL's OID generator assigns user operation OIDs starting
|
||||||
|
* from FirstNormalObjectId. This means no user object can have
|
||||||
|
* an OID lower than FirstNormalObjectId. Therefore, if the
|
||||||
|
* relationId is less than FirstNormalObjectId
|
||||||
|
* (i.e. in PostgreSQL's reserved range), we can immediately
|
||||||
|
* return false, since such objects cannot be Citus tables.
|
||||||
|
*/
|
||||||
|
if (relationId < FirstNormalObjectId)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return LookupCitusTableCacheEntry(relationId) != NULL;
|
return LookupCitusTableCacheEntry(relationId) != NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,8 @@ PG_FUNCTION_INFO_V1(recover_prepared_transactions);
|
||||||
|
|
||||||
|
|
||||||
/* Local functions forward declarations */
|
/* Local functions forward declarations */
|
||||||
static int RecoverWorkerTransactions(WorkerNode *workerNode);
|
static int RecoverWorkerTransactions(WorkerNode *workerNode,
|
||||||
|
MultiConnection *connection);
|
||||||
static List * PendingWorkerTransactionList(MultiConnection *connection);
|
static List * PendingWorkerTransactionList(MultiConnection *connection);
|
||||||
static bool IsTransactionInProgress(HTAB *activeTransactionNumberSet,
|
static bool IsTransactionInProgress(HTAB *activeTransactionNumberSet,
|
||||||
char *preparedTransactionName);
|
char *preparedTransactionName);
|
||||||
|
@ -123,10 +124,51 @@ RecoverTwoPhaseCommits(void)
|
||||||
LockTransactionRecovery(ShareUpdateExclusiveLock);
|
LockTransactionRecovery(ShareUpdateExclusiveLock);
|
||||||
|
|
||||||
List *workerList = ActivePrimaryNodeList(NoLock);
|
List *workerList = ActivePrimaryNodeList(NoLock);
|
||||||
|
List *workerConnections = NIL;
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
|
MultiConnection *connection = NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Pre-establish all connections to worker nodes.
|
||||||
|
*
|
||||||
|
* We do this to enforce a consistent lock acquisition order and prevent deadlocks.
|
||||||
|
* Currently, during extension updates, we take strong locks on the Citus
|
||||||
|
* catalog tables in a specific order: first on pg_dist_authinfo, then on
|
||||||
|
* pg_dist_transaction. It's critical that any operation locking these two
|
||||||
|
* tables adheres to this order, or a deadlock could occur.
|
||||||
|
*
|
||||||
|
* Note that RecoverWorkerTransactions() retains its lock until the end
|
||||||
|
* of the transaction, while GetNodeConnection() releases its lock after
|
||||||
|
* the catalog lookup. So when there are multiple workers in the active primary
|
||||||
|
* node list, the lock acquisition order may reverse in subsequent iterations
|
||||||
|
* of the loop calling RecoverWorkerTransactions(), increasing the risk
|
||||||
|
* of deadlock.
|
||||||
|
*
|
||||||
|
* By establishing all worker connections upfront, we ensure that
|
||||||
|
* RecoverWorkerTransactions() deals with a single distributed catalog table,
|
||||||
|
* thereby preventing deadlocks regardless of the lock acquisition sequence
|
||||||
|
* used in the upgrade extension script.
|
||||||
|
*/
|
||||||
|
|
||||||
foreach_declared_ptr(workerNode, workerList)
|
foreach_declared_ptr(workerNode, workerList)
|
||||||
{
|
{
|
||||||
recoveredTransactionCount += RecoverWorkerTransactions(workerNode);
|
int connectionFlags = 0;
|
||||||
|
char *nodeName = workerNode->workerName;
|
||||||
|
int nodePort = workerNode->workerPort;
|
||||||
|
|
||||||
|
connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
|
||||||
|
Assert(connection != NULL);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't verify connection validity here.
|
||||||
|
* Instead, RecoverWorkerTransactions() performs the necessary
|
||||||
|
* sanity checks on the connection state.
|
||||||
|
*/
|
||||||
|
workerConnections = lappend(workerConnections, connection);
|
||||||
|
}
|
||||||
|
forboth_ptr(workerNode, workerList, connection, workerConnections)
|
||||||
|
{
|
||||||
|
recoveredTransactionCount += RecoverWorkerTransactions(workerNode, connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
return recoveredTransactionCount;
|
return recoveredTransactionCount;
|
||||||
|
@ -138,7 +180,7 @@ RecoverTwoPhaseCommits(void)
|
||||||
* started by this node on the specified worker.
|
* started by this node on the specified worker.
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
RecoverWorkerTransactions(WorkerNode *workerNode)
|
RecoverWorkerTransactions(WorkerNode *workerNode, MultiConnection *connection)
|
||||||
{
|
{
|
||||||
int recoveredTransactionCount = 0;
|
int recoveredTransactionCount = 0;
|
||||||
|
|
||||||
|
@ -156,8 +198,7 @@ RecoverWorkerTransactions(WorkerNode *workerNode)
|
||||||
|
|
||||||
bool recoveryFailed = false;
|
bool recoveryFailed = false;
|
||||||
|
|
||||||
int connectionFlags = 0;
|
Assert(connection != NULL);
|
||||||
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
|
|
||||||
if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK)
|
if (connection->pgConn == NULL || PQstatus(connection->pgConn) != CONNECTION_OK)
|
||||||
{
|
{
|
||||||
ereport(WARNING, (errmsg("transaction recovery cannot connect to %s:%d",
|
ereport(WARNING, (errmsg("transaction recovery cannot connect to %s:%d",
|
||||||
|
|
|
@ -62,16 +62,10 @@ def run_citus_upgrade_tests(config, before_upgrade_schedule, after_upgrade_sched
|
||||||
|
|
||||||
install_citus(config.post_tar_path)
|
install_citus(config.post_tar_path)
|
||||||
|
|
||||||
# disable 2pc recovery for all nodes to work around https://github.com/citusdata/citus/issues/7875
|
|
||||||
disable_2pc_recovery_for_all_nodes(config.bindir, config)
|
|
||||||
|
|
||||||
restart_databases(config.bindir, config.datadir, config.mixed_mode, config)
|
restart_databases(config.bindir, config.datadir, config.mixed_mode, config)
|
||||||
run_alter_citus(config.bindir, config.mixed_mode, config)
|
run_alter_citus(config.bindir, config.mixed_mode, config)
|
||||||
verify_upgrade(config, config.mixed_mode, config.node_name_to_ports.values())
|
verify_upgrade(config, config.mixed_mode, config.node_name_to_ports.values())
|
||||||
|
|
||||||
# re-enable 2pc recovery for all nodes
|
|
||||||
enable_2pc_recovery_for_all_nodes(config.bindir, config)
|
|
||||||
|
|
||||||
run_test_on_coordinator(config, after_upgrade_schedule)
|
run_test_on_coordinator(config, after_upgrade_schedule)
|
||||||
remove_citus(config.post_tar_path)
|
remove_citus(config.post_tar_path)
|
||||||
|
|
||||||
|
@ -152,18 +146,6 @@ def restart_database(pg_path, abs_data_path, node_name, node_ports, logfile_pref
|
||||||
subprocess.run(command, check=True)
|
subprocess.run(command, check=True)
|
||||||
|
|
||||||
|
|
||||||
def disable_2pc_recovery_for_all_nodes(pg_path, config):
|
|
||||||
for port in config.node_name_to_ports.values():
|
|
||||||
utils.psql(pg_path, port, "ALTER SYSTEM SET citus.recover_2pc_interval TO -1;")
|
|
||||||
utils.psql(pg_path, port, "SELECT pg_reload_conf();")
|
|
||||||
|
|
||||||
|
|
||||||
def enable_2pc_recovery_for_all_nodes(pg_path, config):
|
|
||||||
for port in config.node_name_to_ports.values():
|
|
||||||
utils.psql(pg_path, port, "ALTER SYSTEM RESET citus.recover_2pc_interval;")
|
|
||||||
utils.psql(pg_path, port, "SELECT pg_reload_conf();")
|
|
||||||
|
|
||||||
|
|
||||||
def run_alter_citus(pg_path, mixed_mode, config):
|
def run_alter_citus(pg_path, mixed_mode, config):
|
||||||
for port in config.node_name_to_ports.values():
|
for port in config.node_name_to_ports.values():
|
||||||
if mixed_mode and port in (
|
if mixed_mode and port in (
|
||||||
|
|
Loading…
Reference in New Issue