diff --git a/src/backend/distributed/transaction/distributed_deadlock_detection.c b/src/backend/distributed/transaction/distributed_deadlock_detection.c index 3863b8bba..94389ae15 100644 --- a/src/backend/distributed/transaction/distributed_deadlock_detection.c +++ b/src/backend/distributed/transaction/distributed_deadlock_detection.c @@ -167,6 +167,8 @@ CheckForDistributedDeadlocks(void) LogDistributedDeadlockDebugMessage("Distributed deadlock found among the " "following distributed transactions:"); + Bitmapset *involvedNodes = NULL; + /* * We search for the youngest participant for two reasons * (i) predictable results (ii) cancel the youngest transaction @@ -203,6 +205,23 @@ CheckForDistributedDeadlocks(void) { youngestAliveTransaction = currentNode; } + + involvedNodes = bms_add_members(involvedNodes, + currentNode->blockedOnNodes); + } + + /* + * If only a single node is involved in the cycle we rely on the + * local deadlock detection from Postgres. We do this because + * Postgres its local deadlock detection is more advanced than + * Citus its distributed deadlock detection. Postgres deadlock + * detection can sometimes resolve a deadlock without canceling any + * query. It does this by changing the order of processes in the + * lock wait queues. + */ + if (bms_num_members(involvedNodes)) + { + continue; } /* we found the deadlock and its associated proc exists */ @@ -487,6 +506,8 @@ BuildAdjacencyListsForWaitGraph(WaitGraph *waitGraph) waitingTransaction->waitsFor = lappend(waitingTransaction->waitsFor, blockingTransaction); + waitingTransaction->blockedOnNodes = bms_add_member( + waitingTransaction->blockedOnNodes, edge->nodeId); } return adjacencyList; @@ -511,6 +532,7 @@ GetOrCreateTransactionNode(HTAB *adjacencyList, DistributedTransactionId *transa { transactionNode->waitsFor = NIL; transactionNode->initiatorProc = NULL; + transactionNode->blockedOnNodes = NULL; } return transactionNode; diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 8f8c21915..b487cb93d 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -46,10 +46,11 @@ typedef struct PROCStack } PROCStack; -static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); +static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex, + WorkerNode *worker); static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); static void AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, - int rowIndex); + int rowIndex, WorkerNode *worker); static void ReturnBlockedProcessGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); static WaitGraph * BuildLocalWaitGraph(bool onlyDistributedTx); static bool IsProcessWaitingForSafeOperations(PGPROC *proc); @@ -203,7 +204,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) } /* receive dump_local_wait_edges results */ - foreach_ptr(connection, connectionList) + forboth_ptr(connection, connectionList, workerNode, workerNodeList) { bool raiseInterrupts = true; @@ -234,11 +235,12 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) { if (onlyDistributedTx) { - AddWaitEdgeFromResult(waitGraph, result, rowIndex); + AddWaitEdgeFromResult(waitGraph, result, rowIndex, workerNode); } else { - AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex); + AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex, + workerNode); } } @@ -255,7 +257,8 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) * a PGresult. */ static void -AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) +AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex, + WorkerNode *worker) { WaitEdge *waitEdge = AllocWaitEdge(waitGraph); @@ -270,6 +273,7 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6); waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7); waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8); + waitEdge->nodeId = worker->nodeId; } @@ -278,7 +282,8 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) * is read from a PGresult. */ static void -AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) +AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, int rowIndex, + WorkerNode *worker) { WaitEdge *waitEdge = AllocWaitEdge(waitGraph); @@ -293,6 +298,7 @@ AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, int waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 8); waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 9); waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 10); + waitEdge->nodeId = worker->nodeId; } diff --git a/src/include/distributed/distributed_deadlock_detection.h b/src/include/distributed/distributed_deadlock_detection.h index 23f6554ef..8c5f0fd6a 100644 --- a/src/include/distributed/distributed_deadlock_detection.h +++ b/src/include/distributed/distributed_deadlock_detection.h @@ -31,6 +31,7 @@ typedef struct TransactionNode PGPROC *initiatorProc; bool transactionVisited; + Bitmapset *blockedOnNodes; } TransactionNode; diff --git a/src/include/distributed/lock_graph.h b/src/include/distributed/lock_graph.h index f204ebb03..dca8cac5e 100644 --- a/src/include/distributed/lock_graph.h +++ b/src/include/distributed/lock_graph.h @@ -45,6 +45,8 @@ typedef struct WaitEdge /* blocking transaction is also waiting on a lock */ bool isBlockingXactWaiting; + + int nodeId; } WaitEdge;