mirror of https://github.com/citusdata/citus.git
Initial attempt at fixing deadlock detection
The deadlock detector had a bug where it would detect deadlocks even when there were none. This could happen when the lock graph contains a combination of transactions had a distributed transaction id assigned, but also some transactions that did not. Multiple different transactions without a transaction id were considered the same transaction (with transaction id 0). Thus a cycle would be instantly detected if two were waiting on eachother. However, due to the nature of the deadlock detector these transaction without distributed transaction ids would normally not be included in the lock graph at all. Only when a transaction that has a distributed transaction id was waiting on one without, would it be included.parallelize-arbitrary-config-setup
parent
84dcd659ff
commit
c6077feb76
|
@ -458,24 +458,26 @@ BuildAdjacencyListsForWaitGraph(WaitGraph *waitGraph)
|
|||
|
||||
HTAB *adjacencyList = hash_create("distributed deadlock detection", 64, &info,
|
||||
hashFlags);
|
||||
|
||||
for (int edgeIndex = 0; edgeIndex < edgeCount; edgeIndex++)
|
||||
{
|
||||
WaitEdge *edge = &waitGraph->edges[edgeIndex];
|
||||
bool transactionOriginator = false;
|
||||
|
||||
|
||||
DistributedTransactionId waitingId = {
|
||||
edge->waitingNodeId,
|
||||
transactionOriginator,
|
||||
edge->waitingTransactionNum,
|
||||
edge->waitingTransactionStamp
|
||||
edge->waitingTransactionStamp,
|
||||
edge->waitingGPid,
|
||||
};
|
||||
|
||||
DistributedTransactionId blockingId = {
|
||||
edge->blockingNodeId,
|
||||
transactionOriginator,
|
||||
edge->blockingTransactionNum,
|
||||
edge->blockingTransactionStamp
|
||||
edge->blockingTransactionStamp,
|
||||
edge->blockingGPid,
|
||||
};
|
||||
|
||||
TransactionNode *waitingTransaction =
|
||||
|
@ -529,6 +531,8 @@ DistributedTransactionIdHash(const void *key, Size keysize)
|
|||
sizeof(int64)));
|
||||
hash = hash_combine(hash, hash_any((unsigned char *) &entry->timestamp,
|
||||
sizeof(TimestampTz)));
|
||||
hash = hash_combine(hash, hash_any((unsigned char *) &entry->gpid,
|
||||
sizeof(uint64)));
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
@ -547,7 +551,15 @@ DistributedTransactionIdCompare(const void *a, const void *b, Size keysize)
|
|||
DistributedTransactionId *xactIdA = (DistributedTransactionId *) a;
|
||||
DistributedTransactionId *xactIdB = (DistributedTransactionId *) b;
|
||||
|
||||
if (!TimestampDifferenceExceeds(xactIdB->timestamp, xactIdA->timestamp, 0))
|
||||
if (xactIdA->gpid < xactIdB->gpid)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
else if (xactIdA->gpid < xactIdB->gpid)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else if (!TimestampDifferenceExceeds(xactIdB->timestamp, xactIdA->timestamp, 0))
|
||||
{
|
||||
/* ! (B <= A) = A < B */
|
||||
return -1;
|
||||
|
@ -595,11 +607,13 @@ LogCancellingBackend(TransactionNode *transactionNode)
|
|||
|
||||
StringInfo logMessage = makeStringInfo();
|
||||
|
||||
appendStringInfo(logMessage, "Cancelling the following backend "
|
||||
"to resolve distributed deadlock "
|
||||
"(transaction number = " UINT64_FORMAT ", pid = %d)",
|
||||
appendStringInfo(logMessage,
|
||||
"Cancelling the following backend "
|
||||
"to resolve distributed deadlock "
|
||||
"(transaction number = " UINT64_FORMAT
|
||||
", gpid = " UINT64_FORMAT ")",
|
||||
transactionNode->transactionId.transactionNumber,
|
||||
transactionNode->initiatorProc->pid);
|
||||
transactionNode->transactionId.gpid);
|
||||
|
||||
LogDistributedDeadlockDebugMessage(logMessage->data);
|
||||
}
|
||||
|
@ -621,12 +635,14 @@ LogTransactionNode(TransactionNode *transactionNode)
|
|||
DistributedTransactionId *transactionId = &(transactionNode->transactionId);
|
||||
|
||||
appendStringInfo(logMessage,
|
||||
"[DistributedTransactionId: (%d, " UINT64_FORMAT ", %s)] = ",
|
||||
"[DistributedTransactionId: (" UINT64_FORMAT ", %d," UINT64_FORMAT
|
||||
", %s)] = ",
|
||||
transactionId->gpid,
|
||||
transactionId->initiatorNodeIdentifier,
|
||||
transactionId->transactionNumber,
|
||||
timestamptz_to_str(transactionId->timestamp));
|
||||
|
||||
appendStringInfo(logMessage, "[WaitsFor transaction numbers: %s]",
|
||||
appendStringInfo(logMessage, "[WaitsFor gpids: %s]",
|
||||
WaitsForToString(transactionNode->waitsFor));
|
||||
|
||||
/* log the backend query if the proc is associated with the transaction */
|
||||
|
@ -680,7 +696,7 @@ WaitsForToString(List *waitsFor)
|
|||
}
|
||||
|
||||
appendStringInfo(transactionIdStr, UINT64_FORMAT,
|
||||
waitingNode->transactionId.transactionNumber);
|
||||
waitingNode->transactionId.gpid);
|
||||
}
|
||||
|
||||
return transactionIdStr->data;
|
||||
|
|
|
@ -36,6 +36,7 @@ typedef struct DistributedTransactionId
|
|||
bool transactionOriginator;
|
||||
uint64 transactionNumber;
|
||||
TimestampTz timestamp;
|
||||
uint64 gpid;
|
||||
} DistributedTransactionId;
|
||||
|
||||
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
test: intermediate_result_pruning_create prepared_statements_create_load ch_benchmarks_create_load dropped_columns_create_load distributed_planning_create_load local_dist_join_load nested_execution_create partitioned_indexes_create
|
||||
test: intermediate_result_pruning_create prepared_statements_create_load ch_benchmarks_create_load dropped_columns_create_load distributed_planning_create_load local_dist_join_load nested_execution_create partitioned_indexes_create sequences_create
|
||||
test: connectivity_checks
|
||||
test: schemas_create
|
||||
test: views_create
|
||||
test: sequences_create
|
||||
test: index_create
|
||||
test: function_create
|
||||
test: arbitrary_configs_truncate_create
|
||||
|
|
Loading…
Reference in New Issue