mirror of https://github.com/citusdata/citus.git
Use gpids in deadlock detector
The deadlock detector had a bug where it would detect deadlocks even when there were none. This could happen when the lock graph contains a combination of transactions had a distributed transaction id assigned, but also some transactions that did not. Multiple different transactions without a transaction id were considered the same transaction (with transaction id 0). Thus a cycle would be instantly detected if two of those transactions were waiting on eachother. However, due to the nature of the deadlock detector these transaction without distributed transaction ids would normally not be included in the lock graph at all. Only when a transaction that has a distributed transaction id was waiting on one without, would it be included. Fixes #1803use-gpids-in-deadlock-detection
parent
f34fc37478
commit
d65edaf4df
|
@ -457,26 +457,30 @@ BuildAdjacencyListsForWaitGraph(WaitGraph *waitGraph)
|
|||
|
||||
HTAB *adjacencyList = hash_create("distributed deadlock detection", 64, &info,
|
||||
hashFlags);
|
||||
|
||||
for (int edgeIndex = 0; edgeIndex < edgeCount; edgeIndex++)
|
||||
{
|
||||
WaitEdge *edge = &waitGraph->edges[edgeIndex];
|
||||
bool transactionOriginator = false;
|
||||
|
||||
|
||||
DistributedTransactionId waitingId = {
|
||||
edge->waitingNodeId,
|
||||
transactionOriginator,
|
||||
edge->waitingTransactionNum,
|
||||
edge->waitingTransactionStamp
|
||||
edge->waitingTransactionStamp,
|
||||
edge->waitingGPid,
|
||||
};
|
||||
|
||||
DistributedTransactionId blockingId = {
|
||||
edge->blockingNodeId,
|
||||
transactionOriginator,
|
||||
edge->blockingTransactionNum,
|
||||
edge->blockingTransactionStamp
|
||||
edge->blockingTransactionStamp,
|
||||
edge->blockingGPid,
|
||||
};
|
||||
|
||||
elog(LOG, UINT64_FORMAT "->" UINT64_FORMAT, waitingId.gpid, blockingId.gpid);
|
||||
|
||||
TransactionNode *waitingTransaction =
|
||||
GetOrCreateTransactionNode(adjacencyList, &waitingId);
|
||||
TransactionNode *blockingTransaction =
|
||||
|
@ -528,6 +532,8 @@ DistributedTransactionIdHash(const void *key, Size keysize)
|
|||
sizeof(int64)));
|
||||
hash = hash_combine(hash, hash_any((unsigned char *) &entry->timestamp,
|
||||
sizeof(TimestampTz)));
|
||||
hash = hash_combine(hash, hash_any((unsigned char *) &entry->gpid,
|
||||
sizeof(uint64)));
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
@ -546,7 +552,15 @@ DistributedTransactionIdCompare(const void *a, const void *b, Size keysize)
|
|||
DistributedTransactionId *xactIdA = (DistributedTransactionId *) a;
|
||||
DistributedTransactionId *xactIdB = (DistributedTransactionId *) b;
|
||||
|
||||
if (!TimestampDifferenceExceeds(xactIdB->timestamp, xactIdA->timestamp, 0))
|
||||
if (xactIdA->gpid < xactIdB->gpid)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
else if (xactIdA->gpid > xactIdB->gpid)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else if (!TimestampDifferenceExceeds(xactIdB->timestamp, xactIdA->timestamp, 0))
|
||||
{
|
||||
/* ! (B <= A) = A < B */
|
||||
return -1;
|
||||
|
@ -594,11 +608,13 @@ LogCancellingBackend(TransactionNode *transactionNode)
|
|||
|
||||
StringInfo logMessage = makeStringInfo();
|
||||
|
||||
appendStringInfo(logMessage, "Cancelling the following backend "
|
||||
"to resolve distributed deadlock "
|
||||
"(transaction number = " UINT64_FORMAT ", pid = %d)",
|
||||
appendStringInfo(logMessage,
|
||||
"Cancelling the following backend "
|
||||
"to resolve distributed deadlock "
|
||||
"(transaction number = " UINT64_FORMAT
|
||||
", gpid = " UINT64_FORMAT ")",
|
||||
transactionNode->transactionId.transactionNumber,
|
||||
transactionNode->initiatorProc->pid);
|
||||
transactionNode->transactionId.gpid);
|
||||
|
||||
LogDistributedDeadlockDebugMessage(logMessage->data);
|
||||
}
|
||||
|
@ -620,12 +636,14 @@ LogTransactionNode(TransactionNode *transactionNode)
|
|||
DistributedTransactionId *transactionId = &(transactionNode->transactionId);
|
||||
|
||||
appendStringInfo(logMessage,
|
||||
"[DistributedTransactionId: (%d, " UINT64_FORMAT ", %s)] = ",
|
||||
"[DistributedTransactionId: (" UINT64_FORMAT ", %d," UINT64_FORMAT
|
||||
", %s)] = ",
|
||||
transactionId->gpid,
|
||||
transactionId->initiatorNodeIdentifier,
|
||||
transactionId->transactionNumber,
|
||||
timestamptz_to_str(transactionId->timestamp));
|
||||
|
||||
appendStringInfo(logMessage, "[WaitsFor transaction numbers: %s]",
|
||||
appendStringInfo(logMessage, "[WaitsFor gpids: %s]",
|
||||
WaitsForToString(transactionNode->waitsFor));
|
||||
|
||||
/* log the backend query if the proc is associated with the transaction */
|
||||
|
@ -679,7 +697,7 @@ WaitsForToString(List *waitsFor)
|
|||
}
|
||||
|
||||
appendStringInfo(transactionIdStr, UINT64_FORMAT,
|
||||
waitingNode->transactionId.transactionNumber);
|
||||
waitingNode->transactionId.gpid);
|
||||
}
|
||||
|
||||
return transactionIdStr->data;
|
||||
|
|
|
@ -45,7 +45,6 @@ typedef struct PROCStack
|
|||
} PROCStack;
|
||||
|
||||
|
||||
static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex);
|
||||
static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
|
||||
static void AddWaitEdgeFromBlockedProcessResult(WaitGraph *waitGraph, PGresult *result,
|
||||
int rowIndex);
|
||||
|
@ -175,24 +174,12 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
|
|||
{
|
||||
StringInfo queryString = makeStringInfo();
|
||||
|
||||
if (onlyDistributedTx)
|
||||
{
|
||||
appendStringInfo(queryString,
|
||||
"SELECT waiting_pid, waiting_node_id, "
|
||||
"waiting_transaction_num, waiting_transaction_stamp, "
|
||||
"blocking_pid, blocking_node_id, blocking_transaction_num, "
|
||||
"blocking_transaction_stamp, blocking_transaction_waiting "
|
||||
"FROM dump_local_wait_edges()");
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfo(queryString,
|
||||
"SELECT waiting_global_pid, waiting_pid, "
|
||||
"waiting_node_id, waiting_transaction_num, waiting_transaction_stamp, "
|
||||
"blocking_global_pid,blocking_pid, blocking_node_id, "
|
||||
"blocking_transaction_num, blocking_transaction_stamp, blocking_transaction_waiting "
|
||||
"FROM citus_internal_local_blocked_processes()");
|
||||
}
|
||||
appendStringInfo(queryString,
|
||||
"SELECT waiting_global_pid, waiting_pid, "
|
||||
"waiting_node_id, waiting_transaction_num, waiting_transaction_stamp, "
|
||||
"blocking_global_pid,blocking_pid, blocking_node_id, "
|
||||
"blocking_transaction_num, blocking_transaction_stamp, blocking_transaction_waiting "
|
||||
"FROM citus_internal_local_blocked_processes()");
|
||||
|
||||
int querySent = SendRemoteCommand(connection, queryString->data);
|
||||
if (querySent == 0)
|
||||
|
@ -216,13 +203,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
|
|||
int64 rowCount = PQntuples(result);
|
||||
int64 colCount = PQnfields(result);
|
||||
|
||||
if (onlyDistributedTx && colCount != 9)
|
||||
{
|
||||
ereport(WARNING, (errmsg("unexpected number of columns from "
|
||||
"dump_local_wait_edges")));
|
||||
continue;
|
||||
}
|
||||
else if (!onlyDistributedTx && colCount != 11)
|
||||
if (colCount != 11)
|
||||
{
|
||||
ereport(WARNING, (errmsg("unexpected number of columns from "
|
||||
"citus_internal_local_blocked_processes")));
|
||||
|
@ -231,14 +212,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
|
|||
|
||||
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
|
||||
{
|
||||
if (onlyDistributedTx)
|
||||
{
|
||||
AddWaitEdgeFromResult(waitGraph, result, rowIndex);
|
||||
}
|
||||
else
|
||||
{
|
||||
AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex);
|
||||
}
|
||||
AddWaitEdgeFromBlockedProcessResult(waitGraph, result, rowIndex);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
@ -249,29 +223,6 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* AddWaitEdgeFromResult adds an edge to the wait graph that is read from
|
||||
* a PGresult.
|
||||
*/
|
||||
static void
|
||||
AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex)
|
||||
{
|
||||
WaitEdge *waitEdge = AllocWaitEdge(waitGraph);
|
||||
|
||||
waitEdge->waitingGPid = 0; /* not requested for deadlock detection */
|
||||
waitEdge->waitingPid = ParseIntField(result, rowIndex, 0);
|
||||
waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1);
|
||||
waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2);
|
||||
waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3);
|
||||
waitEdge->blockingGPid = 0; /* not requested for deadlock detection */
|
||||
waitEdge->blockingPid = ParseIntField(result, rowIndex, 4);
|
||||
waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5);
|
||||
waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6);
|
||||
waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7);
|
||||
waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AddWaitEdgeFromBlockedProcessResult adds an edge to the wait graph that
|
||||
* is read from a PGresult.
|
||||
|
|
|
@ -36,6 +36,7 @@ typedef struct DistributedTransactionId
|
|||
bool transactionOriginator;
|
||||
uint64 transactionNumber;
|
||||
TimestampTz timestamp;
|
||||
uint64 gpid;
|
||||
} DistributedTransactionId;
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue