diff --git a/src/backend/distributed/transaction/distributed_deadlock_detection.c b/src/backend/distributed/transaction/distributed_deadlock_detection.c index 82c274661..623d7d184 100644 --- a/src/backend/distributed/transaction/distributed_deadlock_detection.c +++ b/src/backend/distributed/transaction/distributed_deadlock_detection.c @@ -457,26 +457,30 @@ BuildAdjacencyListsForWaitGraph(WaitGraph *waitGraph) HTAB *adjacencyList = hash_create("distributed deadlock detection", 64, &info, hashFlags); - for (int edgeIndex = 0; edgeIndex < edgeCount; edgeIndex++) { WaitEdge *edge = &waitGraph->edges[edgeIndex]; bool transactionOriginator = false; + DistributedTransactionId waitingId = { edge->waitingNodeId, transactionOriginator, edge->waitingTransactionNum, - edge->waitingTransactionStamp + edge->waitingTransactionStamp, + edge->waitingGPid, }; DistributedTransactionId blockingId = { edge->blockingNodeId, transactionOriginator, edge->blockingTransactionNum, - edge->blockingTransactionStamp + edge->blockingTransactionStamp, + edge->blockingGPid, }; + elog(LOG, UINT64_FORMAT "->" UINT64_FORMAT, waitingId.gpid, blockingId.gpid); + TransactionNode *waitingTransaction = GetOrCreateTransactionNode(adjacencyList, &waitingId); TransactionNode *blockingTransaction = @@ -528,6 +532,8 @@ DistributedTransactionIdHash(const void *key, Size keysize) sizeof(int64))); hash = hash_combine(hash, hash_any((unsigned char *) &entry->timestamp, sizeof(TimestampTz))); + hash = hash_combine(hash, hash_any((unsigned char *) &entry->gpid, + sizeof(uint64))); return hash; } @@ -546,7 +552,15 @@ DistributedTransactionIdCompare(const void *a, const void *b, Size keysize) DistributedTransactionId *xactIdA = (DistributedTransactionId *) a; DistributedTransactionId *xactIdB = (DistributedTransactionId *) b; - if (!TimestampDifferenceExceeds(xactIdB->timestamp, xactIdA->timestamp, 0)) + if (xactIdA->gpid < xactIdB->gpid) + { + return -1; + } + else if (xactIdA->gpid > xactIdB->gpid) + { + return 1; + } + else if (!TimestampDifferenceExceeds(xactIdB->timestamp, xactIdA->timestamp, 0)) { /* ! (B <= A) = A < B */ return -1; @@ -594,11 +608,13 @@ LogCancellingBackend(TransactionNode *transactionNode) StringInfo logMessage = makeStringInfo(); - appendStringInfo(logMessage, "Cancelling the following backend " - "to resolve distributed deadlock " - "(transaction number = " UINT64_FORMAT ", pid = %d)", + appendStringInfo(logMessage, + "Cancelling the following backend " + "to resolve distributed deadlock " + "(transaction number = " UINT64_FORMAT + ", gpid = " UINT64_FORMAT ")", transactionNode->transactionId.transactionNumber, - transactionNode->initiatorProc->pid); + transactionNode->transactionId.gpid); LogDistributedDeadlockDebugMessage(logMessage->data); } @@ -620,12 +636,14 @@ LogTransactionNode(TransactionNode *transactionNode) DistributedTransactionId *transactionId = &(transactionNode->transactionId); appendStringInfo(logMessage, - "[DistributedTransactionId: (%d, " UINT64_FORMAT ", %s)] = ", + "[DistributedTransactionId: (" UINT64_FORMAT ", %d," UINT64_FORMAT + ", %s)] = ", + transactionId->gpid, transactionId->initiatorNodeIdentifier, transactionId->transactionNumber, timestamptz_to_str(transactionId->timestamp)); - appendStringInfo(logMessage, "[WaitsFor transaction numbers: %s]", + appendStringInfo(logMessage, "[WaitsFor gpids: %s]", WaitsForToString(transactionNode->waitsFor)); /* log the backend query if the proc is associated with the transaction */ @@ -679,7 +697,7 @@ WaitsForToString(List *waitsFor) } appendStringInfo(transactionIdStr, UINT64_FORMAT, - waitingNode->transactionId.transactionNumber); + waitingNode->transactionId.gpid); } return transactionIdStr->data; diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 62b5e4e04..f3b43f6e8 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -45,7 +45,6 @@ typedef struct PROCStack } PROCStack; -static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); static void AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); @@ -175,24 +174,12 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) { StringInfo queryString = makeStringInfo(); - if (onlyDistributedTx) - { - appendStringInfo(queryString, - "SELECT waiting_pid, waiting_node_id, " - "waiting_transaction_num, waiting_transaction_stamp, " - "blocking_pid, blocking_node_id, blocking_transaction_num, " - "blocking_transaction_stamp, blocking_transaction_waiting " - "FROM dump_local_wait_edges()"); - } - else - { - appendStringInfo(queryString, - "SELECT waiting_global_pid, waiting_pid, " - "waiting_node_id, waiting_transaction_num, waiting_transaction_stamp, " - "blocking_global_pid,blocking_pid, blocking_node_id, " - "blocking_transaction_num, blocking_transaction_stamp, blocking_transaction_waiting " - "FROM citus_internal_local_blocked_processes()"); - } + appendStringInfo(queryString, + "SELECT waiting_global_pid, waiting_pid, " + "waiting_node_id, waiting_transaction_num, waiting_transaction_stamp, " + "blocking_global_pid,blocking_pid, blocking_node_id, " + "blocking_transaction_num, blocking_transaction_stamp, blocking_transaction_waiting " + "FROM citus_internal_local_blocked_processes()"); int querySent = SendRemoteCommand(connection, queryString->data); if (querySent == 0) @@ -216,13 +203,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) int64 rowCount = PQntuples(result); int64 colCount = PQnfields(result); - if (onlyDistributedTx && colCount != 9) - { - ereport(WARNING, (errmsg("unexpected number of columns from " - "dump_local_wait_edges"))); - continue; - } - else if (!onlyDistributedTx && colCount != 11) + if (colCount != 11) { ereport(WARNING, (errmsg("unexpected number of columns from " "citus_internal_local_blocked_processes"))); @@ -231,14 +212,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) { - if (onlyDistributedTx) - { - AddWaitEdgeFromResult(waitGraph, result, rowIndex); - } - else - { - AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex); - } + AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex); } PQclear(result); @@ -249,29 +223,6 @@ BuildGlobalWaitGraph(bool onlyDistributedTx) } -/* - * AddWaitEdgeFromResult adds an edge to the wait graph that is read from - * a PGresult. - */ -static void -AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) -{ - WaitEdge *waitEdge = AllocWaitEdge(waitGraph); - - waitEdge->waitingGPid = 0; /* not requested for deadlock detection */ - waitEdge->waitingPid = ParseIntField(result, rowIndex, 0); - waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1); - waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2); - waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3); - waitEdge->blockingGPid = 0; /* not requested for deadlock detection */ - waitEdge->blockingPid = ParseIntField(result, rowIndex, 4); - waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5); - waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6); - waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7); - waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8); -} - - /* * AddWaitEdgeFromBlockedProcessResult adds an edge to the wait graph that * is read from a PGresult. diff --git a/src/include/distributed/transaction_identifier.h b/src/include/distributed/transaction_identifier.h index d206d1b85..df9cbed89 100644 --- a/src/include/distributed/transaction_identifier.h +++ b/src/include/distributed/transaction_identifier.h @@ -36,6 +36,7 @@ typedef struct DistributedTransactionId bool transactionOriginator; uint64 transactionNumber; TimestampTz timestamp; + uint64 gpid; } DistributedTransactionId;