From d65edaf4df18c7db2ec2865c9b136f9411aa8af4 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Thu, 17 Mar 2022 15:59:00 +0100 Subject: [PATCH] Use gpids in deadlock detector The deadlock detector had a bug where it would detect deadlocks even when there were none. This could happen when the lock graph contains a combination of transactions had a distributed transaction id assigned, but also some transactions that did not. Multiple different transactions without a transaction id were considered the same transaction (with transaction id 0). Thus a cycle would be instantly detected if two of those transactions were waiting on eachother. However, due to the nature of the deadlock detector these transaction without distributed transaction ids would normally not be included in the lock graph at all. Only when a transaction that has a distributed transaction id was waiting on one without, would it be included. Fixes #1803 --- .../distributed_deadlock_detection.c | 40 ++++++++---- .../distributed/transaction/lock_graph.c | 65 +++---------------- .../distributed/transaction_identifier.h | 1 + 3 files changed, 38 insertions(+), 68 deletions(-) diff --git a/src/backend/distributed/transaction/distributed_deadlock_detection.c b/src/backend/distributed/transaction/distributed_deadlock_detection.c index 82c274661..623d7d184 100644 --- a/src/backend/distributed/transaction/distributed_deadlock_detection.c +++ b/src/backend/distributed/transaction/distributed_deadlock_detection.c @@ -457,26 +457,30 @@ BuildAdjacencyListsForWaitGraph(WaitGraph *waitGraph) HTAB *adjacencyList = hash_create("distributed deadlock detection", 64, &info, hashFlags); - for (int edgeIndex = 0; edgeIndex < edgeCount; edgeIndex++) { WaitEdge *edge = &waitGraph->edges[edgeIndex]; bool transactionOriginator = false; + DistributedTransactionId waitingId = { edge->waitingNodeId, transactionOriginator, edge->waitingTransactionNum, - edge->waitingTransactionStamp + edge->waitingTransactionStamp, + edge->waitingGPid, }; DistributedTransactionId blockingId = { edge->blockingNodeId, transactionOriginator, edge->blockingTransactionNum, - edge->blockingTransactionStamp + edge->blockingTransactionStamp, + edge->blockingGPid, }; + elog(LOG, UINT64_FORMAT "->" UINT64_FORMAT, waitingId.gpid, blockingId.gpid); + TransactionNode *waitingTransaction = GetOrCreateTransactionNode(adjacencyList, &waitingId); TransactionNode *blockingTransaction = @@ -528,6 +532,8 @@ DistributedTransactionIdHash(const void *key, Size keysize) sizeof(int64))); hash = hash_combine(hash, hash_any((unsigned char *) &entry->timestamp, sizeof(TimestampTz))); + hash = hash_combine(hash, hash_any((unsigned char *) &entry->gpid, + sizeof(uint64))); return hash; } @@ -546,7 +552,15 @@ DistributedTransactionIdCompare(const void *a, const void *b, Size keysize) DistributedTransactionId *xactIdA = (DistributedTransactionId *) a; DistributedTransactionId *xactIdB = (DistributedTransactionId *) b; - if (!TimestampDifferenceExceeds(xactIdB->timestamp, xactIdA->timestamp, 0)) + if (xactIdA->gpid < xactIdB->gpid) + { + return -1; + } + else if (xactIdA->gpid > xactIdB->gpid) + { + return 1; + } + else if (!TimestampDifferenceExceeds(xactIdB->timestamp, xactIdA->timestamp, 0)) { /* ! (B <= A) = A < B */ return -1; @@ -594,11 +608,13 @@ LogCancellingBackend(TransactionNode *transactionNode) StringInfo logMessage = makeStringInfo(); - appendStringInfo(logMessage, "Cancelling the following backend " - "to resolve distributed deadlock " - "(transaction number = " UINT64_FORMAT ", pid = %d)", + appendStringInfo(logMessage, + "Cancelling the following backend " + "to resolve distributed deadlock " + "(transaction number = " UINT64_FORMAT + ", gpid = " UINT64_FORMAT ")", transactionNode->transactionId.transactionNumber, - transactionNode->initiatorProc->pid); + transactionNode->transactionId.gpid); LogDistributedDeadlockDebugMessage(logMessage->data); } @@ -620,12 +636,14 @@ LogTransactionNode(TransactionNode *transactionNode) DistributedTransactionId *transactionId = &(transactionNode->transactionId); appendStringInfo(logMessage, - "[DistributedTransactionId: (%d, " UINT64_FORMAT ", %s)] = ", + "[DistributedTransactionId: (" UINT64_FORMAT ", %d," UINT64_FORMAT + ", %s)] = ", + transactionId->gpid, transactionId->initiatorNodeIdentifier, transactionId->transactionNumber, timestamptz_to_str(transactionId->timestamp)); - appendStringInfo(logMessage, "[WaitsFor transaction numbers: %s]", + appendStringInfo(logMessage, "[WaitsFor gpids: %s]", WaitsForToString(transactionNode->waitsFor)); /* log the backend query if the proc is associated with the transaction */ @@ -679,7 +697,7 @@ WaitsForToString(List *waitsFor) } appendStringInfo(transactionIdStr, UINT64_FORMAT, - waitingNode->transactionId.transactionNumber); + waitingNode->transactionId.gpid); } return transactionIdStr->data; diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 62b5e4e04..f3b43f6e8 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -45,7 +45,6 @@ typedef struct PROCStack } PROCStack; -static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); static void AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); @@ -175,24 +174,12 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) { StringInfo queryString = makeStringInfo(); - if (onlyDistributedTx) - { - appendStringInfo(queryString, - "SELECT waiting_pid, waiting_node_id, " - "waiting_transaction_num, waiting_transaction_stamp, " - "blocking_pid, blocking_node_id, blocking_transaction_num, " - "blocking_transaction_stamp, blocking_transaction_waiting " - "FROM dump_local_wait_edges()"); - } - else - { - appendStringInfo(queryString, - "SELECT waiting_global_pid, waiting_pid, " - "waiting_node_id, waiting_transaction_num, waiting_transaction_stamp, " - "blocking_global_pid,blocking_pid, blocking_node_id, " - "blocking_transaction_num, blocking_transaction_stamp, blocking_transaction_waiting " - "FROM citus_internal_local_blocked_processes()"); - } + appendStringInfo(queryString, + "SELECT waiting_global_pid, waiting_pid, " + "waiting_node_id, waiting_transaction_num, waiting_transaction_stamp, " + "blocking_global_pid,blocking_pid, blocking_node_id, " + "blocking_transaction_num, blocking_transaction_stamp, blocking_transaction_waiting " + "FROM citus_internal_local_blocked_processes()"); int querySent = SendRemoteCommand(connection, queryString->data); if (querySent == 0) @@ -216,13 +203,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) int64 rowCount = PQntuples(result); int64 colCount = PQnfields(result); - if (onlyDistributedTx && colCount != 9) - { - ereport(WARNING, (errmsg("unexpected number of columns from " - "dump_local_wait_edges"))); - continue; - } - else if (!onlyDistributedTx && colCount != 11) + if (colCount != 11) { ereport(WARNING, (errmsg("unexpected number of columns from " "citus_internal_local_blocked_processes"))); @@ -231,14 +212,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) { - if (onlyDistributedTx) - { - AddWaitEdgeFromResult(waitGraph, result, rowIndex); - } - else - { - AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex); - } + AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex); } PQclear(result); @@ -249,29 +223,6 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) } -/* - * AddWaitEdgeFromResult adds an edge to the wait graph that is read from - * a PGresult. - */ -static void -AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) -{ - WaitEdge *waitEdge = AllocWaitEdge(waitGraph); - - waitEdge->waitingGPid = 0; /* not requested for deadlock detection */ - waitEdge->waitingPid = ParseIntField(result, rowIndex, 0); - waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1); - waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2); - waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3); - waitEdge->blockingGPid = 0; /* not requested for deadlock detection */ - waitEdge->blockingPid = ParseIntField(result, rowIndex, 4); - waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5); - waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6); - waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7); - waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8); -} - - /* * AddWaitEdgeFromBlockedProcessResult adds an edge to the wait graph that * is read from a PGresult. diff --git a/src/include/distributed/transaction_identifier.h b/src/include/distributed/transaction_identifier.h index d206d1b85..df9cbed89 100644 --- a/src/include/distributed/transaction_identifier.h +++ b/src/include/distributed/transaction_identifier.h @@ -36,6 +36,7 @@ typedef struct DistributedTransactionId bool transactionOriginator; uint64 transactionNumber; TimestampTz timestamp; + uint64 gpid; } DistributedTransactionId;