Don't cancel queries in a cycle if they are all on a single node

parallelize-arbitrary-config-setup
Jelte Fennema 2022-03-24 14:37:00 +01:00
parent 7283d8daf7
commit ee7f0f9b5b
4 changed files with 38 additions and 7 deletions

View File

@ -167,6 +167,8 @@ CheckForDistributedDeadlocks(void)
LogDistributedDeadlockDebugMessage("Distributed deadlock found among the " LogDistributedDeadlockDebugMessage("Distributed deadlock found among the "
"following distributed transactions:"); "following distributed transactions:");
Bitmapset *involvedNodes = NULL;
/* /*
* We search for the youngest participant for two reasons * We search for the youngest participant for two reasons
* (i) predictable results (ii) cancel the youngest transaction * (i) predictable results (ii) cancel the youngest transaction
@ -203,6 +205,23 @@ CheckForDistributedDeadlocks(void)
{ {
youngestAliveTransaction = currentNode; youngestAliveTransaction = currentNode;
} }
involvedNodes = bms_add_members(involvedNodes,
currentNode->blockedOnNodes);
}
/*
* If only a single node is involved in the cycle we rely on the
* local deadlock detection from Postgres. We do this because
* Postgres its local deadlock detection is more advanced than
* Citus its distributed deadlock detection. Postgres deadlock
* detection can sometimes resolve a deadlock without canceling any
* query. It does this by changing the order of processes in the
* lock wait queues.
*/
if (bms_num_members(involvedNodes))
{
continue;
} }
/* we found the deadlock and its associated proc exists */ /* we found the deadlock and its associated proc exists */
@ -487,6 +506,8 @@ BuildAdjacencyListsForWaitGraph(WaitGraph *waitGraph)
waitingTransaction->waitsFor = lappend(waitingTransaction->waitsFor, waitingTransaction->waitsFor = lappend(waitingTransaction->waitsFor,
blockingTransaction); blockingTransaction);
waitingTransaction->blockedOnNodes = bms_add_member(
waitingTransaction->blockedOnNodes, edge->nodeId);
} }
return adjacencyList; return adjacencyList;
@ -511,6 +532,7 @@ GetOrCreateTransactionNode(HTAB *adjacencyList, DistributedTransactionId *transa
{ {
transactionNode->waitsFor = NIL; transactionNode->waitsFor = NIL;
transactionNode->initiatorProc = NULL; transactionNode->initiatorProc = NULL;
transactionNode->blockedOnNodes = NULL;
} }
return transactionNode; return transactionNode;

View File

@ -46,10 +46,11 @@ typedef struct PROCStack
} PROCStack; } PROCStack;
static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex,
WorkerNode *worker);
static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
static void AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, static void AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result,
int rowIndex); int rowIndex, WorkerNode *worker);
static void ReturnBlockedProcessGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); static void ReturnBlockedProcessGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
static WaitGraph * BuildLocalWaitGraph(bool onlyDistributedTx); static WaitGraph * BuildLocalWaitGraph(bool onlyDistributedTx);
static bool IsProcessWaitingForSafeOperations(PGPROC *proc); static bool IsProcessWaitingForSafeOperations(PGPROC *proc);
@ -203,7 +204,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
} }
/* receive dump_local_wait_edges results */ /* receive dump_local_wait_edges results */
foreach_ptr(connection, connectionList) forboth_ptr(connection, connectionList, workerNode, workerNodeList)
{ {
bool raiseInterrupts = true; bool raiseInterrupts = true;
@ -234,11 +235,12 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
{ {
if (onlyDistributedTx) if (onlyDistributedTx)
{ {
AddWaitEdgeFromResult(waitGraph, result, rowIndex); AddWaitEdgeFromResult(waitGraph, result, rowIndex, workerNode);
} }
else else
{ {
AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex); AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex,
workerNode);
} }
} }
@ -255,7 +257,8 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
* a PGresult. * a PGresult.
*/ */
static void static void
AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex,
WorkerNode *worker)
{ {
WaitEdge *waitEdge = AllocWaitEdge(waitGraph); WaitEdge *waitEdge = AllocWaitEdge(waitGraph);
@ -270,6 +273,7 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex)
waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6); waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6);
waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7); waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7);
waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8); waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8);
waitEdge->nodeId = worker->nodeId;
} }
@ -278,7 +282,8 @@ AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex)
* is read from a PGresult. * is read from a PGresult.
*/ */
static void static void
AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, int rowIndex,
WorkerNode *worker)
{ {
WaitEdge *waitEdge = AllocWaitEdge(waitGraph); WaitEdge *waitEdge = AllocWaitEdge(waitGraph);
@ -293,6 +298,7 @@ AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, int
waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 8); waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 8);
waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 9); waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 9);
waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 10); waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 10);
waitEdge->nodeId = worker->nodeId;
} }

View File

@ -31,6 +31,7 @@ typedef struct TransactionNode
PGPROC *initiatorProc; PGPROC *initiatorProc;
bool transactionVisited; bool transactionVisited;
Bitmapset *blockedOnNodes;
} TransactionNode; } TransactionNode;

View File

@ -45,6 +45,8 @@ typedef struct WaitEdge
/* blocking transaction is also waiting on a lock */ /* blocking transaction is also waiting on a lock */
bool isBlockingXactWaiting; bool isBlockingXactWaiting;
int nodeId;
} WaitEdge; } WaitEdge;