mirror of https://github.com/citusdata/citus.git
Add logging infrasture for distributed deadlock detection
We added a new GUC citus.log_distributed_deadlock_detection which is off by default. When set to on, we log some debug messages related to the distributed deadlock to the server logs.pull/1529/head
parent
e5d5bdff51
commit
59133415b0
|
@ -23,6 +23,7 @@
|
||||||
#include "distributed/citus_nodefuncs.h"
|
#include "distributed/citus_nodefuncs.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
|
#include "distributed/distributed_deadlock_detection.h"
|
||||||
#include "distributed/maintenanced.h"
|
#include "distributed/maintenanced.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
|
@ -361,6 +362,17 @@ RegisterCitusConfigVariables(void)
|
||||||
0,
|
0,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomBoolVariable(
|
||||||
|
"citus.log_distributed_deadlock_detection",
|
||||||
|
gettext_noop("Log distributed deadlock detection related processing in "
|
||||||
|
"the server log"),
|
||||||
|
NULL,
|
||||||
|
&LogDistributedDeadlockDetection,
|
||||||
|
false,
|
||||||
|
PGC_SIGHUP,
|
||||||
|
GUC_NO_SHOW_ALL,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.explain_distributed_queries",
|
"citus.explain_distributed_queries",
|
||||||
gettext_noop("Enables Explain for distributed queries."),
|
gettext_noop("Enables Explain for distributed queries."),
|
||||||
|
|
|
@ -26,9 +26,6 @@
|
||||||
#include "utils/timestamp.h"
|
#include "utils/timestamp.h"
|
||||||
|
|
||||||
|
|
||||||
static char * WaitsForToString(List *waitsFor);
|
|
||||||
|
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(get_adjacency_list_wait_graph);
|
PG_FUNCTION_INFO_V1(get_adjacency_list_wait_graph);
|
||||||
|
|
||||||
|
|
||||||
|
@ -114,31 +111,3 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* WaitsForToString is only intended for testing and debugging. It gets a
|
|
||||||
* waitsForList and returns the list of transaction nodes' transactionNumber
|
|
||||||
* in a string.
|
|
||||||
*/
|
|
||||||
static char *
|
|
||||||
WaitsForToString(List *waitsFor)
|
|
||||||
{
|
|
||||||
StringInfo transactionIdStr = makeStringInfo();
|
|
||||||
ListCell *waitsForCell = NULL;
|
|
||||||
|
|
||||||
foreach(waitsForCell, waitsFor)
|
|
||||||
{
|
|
||||||
TransactionNode *waitingNode = (TransactionNode *) lfirst(waitsForCell);
|
|
||||||
|
|
||||||
if (transactionIdStr->len != 0)
|
|
||||||
{
|
|
||||||
appendStringInfoString(transactionIdStr, ",");
|
|
||||||
}
|
|
||||||
|
|
||||||
appendStringInfo(transactionIdStr, "%ld",
|
|
||||||
waitingNode->transactionId.transactionNumber);
|
|
||||||
}
|
|
||||||
|
|
||||||
return transactionIdStr->data;
|
|
||||||
}
|
|
||||||
|
|
|
@ -10,7 +10,9 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
#include "pgstat.h"
|
||||||
|
|
||||||
#include "access/hash.h"
|
#include "access/hash.h"
|
||||||
#include "distributed/backend_data.h"
|
#include "distributed/backend_data.h"
|
||||||
|
@ -55,7 +57,9 @@ static TransactionNode * GetOrCreateTransactionNode(HTAB *adjacencyList,
|
||||||
transactionId);
|
transactionId);
|
||||||
static uint32 DistributedTransactionIdHash(const void *key, Size keysize);
|
static uint32 DistributedTransactionIdHash(const void *key, Size keysize);
|
||||||
static int DistributedTransactionIdCompare(const void *a, const void *b, Size keysize);
|
static int DistributedTransactionIdCompare(const void *a, const void *b, Size keysize);
|
||||||
|
static void LogCancellingBackend(TransactionNode *transactionNode);
|
||||||
|
static void LogTransactionNode(TransactionNode *transactionNode);
|
||||||
|
static void LogDistributedDeadlockDebugMessage(const char *errorMessage);
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(check_distributed_deadlocks);
|
PG_FUNCTION_INFO_V1(check_distributed_deadlocks);
|
||||||
|
|
||||||
|
@ -134,6 +138,9 @@ CheckForDistributedDeadlocks(void)
|
||||||
/* there should be at least two transactions to get into a deadlock */
|
/* there should be at least two transactions to get into a deadlock */
|
||||||
Assert(list_length(deadlockPath) > 1);
|
Assert(list_length(deadlockPath) > 1);
|
||||||
|
|
||||||
|
LogDistributedDeadlockDebugMessage("Distributed deadlock found among the "
|
||||||
|
"following distributed transactions:");
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We search for the youngest participant for two reasons
|
* We search for the youngest participant for two reasons
|
||||||
* (i) predictable results (ii) cancel the youngest transaction
|
* (i) predictable results (ii) cancel the youngest transaction
|
||||||
|
@ -154,6 +161,8 @@ CheckForDistributedDeadlocks(void)
|
||||||
|
|
||||||
AssociateDistributedTransactionWithBackendProc(currentNode);
|
AssociateDistributedTransactionWithBackendProc(currentNode);
|
||||||
|
|
||||||
|
LogTransactionNode(currentNode);
|
||||||
|
|
||||||
if (currentNode->transactionId.initiatorNodeIdentifier ==
|
if (currentNode->transactionId.initiatorNodeIdentifier ==
|
||||||
GetLocalGroupId() &&
|
GetLocalGroupId() &&
|
||||||
timestamptz_cmp_internal(currentTimestamp, youngestTimestamp) == 1)
|
timestamptz_cmp_internal(currentTimestamp, youngestTimestamp) == 1)
|
||||||
|
@ -166,6 +175,7 @@ CheckForDistributedDeadlocks(void)
|
||||||
Assert(youngestTransaction->initiatorProc != NULL);
|
Assert(youngestTransaction->initiatorProc != NULL);
|
||||||
|
|
||||||
CancelTransactionDueToDeadlock(youngestTransaction->initiatorProc);
|
CancelTransactionDueToDeadlock(youngestTransaction->initiatorProc);
|
||||||
|
LogCancellingBackend(youngestTransaction);
|
||||||
|
|
||||||
hash_seq_term(&status);
|
hash_seq_term(&status);
|
||||||
|
|
||||||
|
@ -522,3 +532,116 @@ DistributedTransactionIdCompare(const void *a, const void *b, Size keysize)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LogCancellingBackend should only be called when a distributed transaction's
|
||||||
|
* backend is cancelled due to distributed deadlocks. It sends which transaction
|
||||||
|
* is cancelled and its corresponding pid to the log.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
LogCancellingBackend(TransactionNode *transactionNode)
|
||||||
|
{
|
||||||
|
StringInfo logMessage = NULL;
|
||||||
|
|
||||||
|
if (!LogDistributedDeadlockDetection)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
logMessage = makeStringInfo();
|
||||||
|
|
||||||
|
appendStringInfo(logMessage, "Cancelling the following backend "
|
||||||
|
"to resolve distributed deadlock "
|
||||||
|
"(transaction numner = %ld, pid = %d)",
|
||||||
|
transactionNode->transactionId.transactionNumber,
|
||||||
|
transactionNode->initiatorProc->pid);
|
||||||
|
|
||||||
|
LogDistributedDeadlockDebugMessage(logMessage->data);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LogTransactionNode converts the transaction node to a human readable form
|
||||||
|
* and sends to the logs via LogDistributedDeadlockDebugMessage().
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
LogTransactionNode(TransactionNode *transactionNode)
|
||||||
|
{
|
||||||
|
StringInfo logMessage = NULL;
|
||||||
|
DistributedTransactionId *transactionId = NULL;
|
||||||
|
|
||||||
|
if (!LogDistributedDeadlockDetection)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
logMessage = makeStringInfo();
|
||||||
|
transactionId = &(transactionNode->transactionId);
|
||||||
|
|
||||||
|
appendStringInfo(logMessage, "[DistributedTransactionId: (%d, %ld, %s)] = ",
|
||||||
|
transactionId->initiatorNodeIdentifier,
|
||||||
|
transactionId->transactionNumber,
|
||||||
|
timestamptz_to_str(transactionId->timestamp));
|
||||||
|
|
||||||
|
appendStringInfo(logMessage, "[WaitsFor transaction numbers: %s]",
|
||||||
|
WaitsForToString(transactionNode->waitsFor));
|
||||||
|
|
||||||
|
/* log the backend query if the proc is associated with the transaction */
|
||||||
|
if (transactionNode->initiatorProc != NULL)
|
||||||
|
{
|
||||||
|
const char *backendQuery =
|
||||||
|
pgstat_get_backend_current_activity(transactionNode->initiatorProc->pid,
|
||||||
|
false);
|
||||||
|
|
||||||
|
appendStringInfo(logMessage, "[Backend Query: %s]", backendQuery);
|
||||||
|
}
|
||||||
|
|
||||||
|
LogDistributedDeadlockDebugMessage(logMessage->data);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LogDistributedDeadlockDebugMessage checks EnableDistributedDeadlockDebugging flag. If
|
||||||
|
* it is true, the input message is sent to the logs with LOG level. Also, current timestamp
|
||||||
|
* is prepanded to the message.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
LogDistributedDeadlockDebugMessage(const char *errorMessage)
|
||||||
|
{
|
||||||
|
if (!LogDistributedDeadlockDetection)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ereport(LOG, (errmsg("[%s] %s", timestamptz_to_str(GetCurrentTimestamp()),
|
||||||
|
errorMessage)));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* WaitsForToString is only intended for testing and debugging. It gets a
|
||||||
|
* waitsForList and returns the list of transaction nodes' transactionNumber
|
||||||
|
* in a string.
|
||||||
|
*/
|
||||||
|
char *
|
||||||
|
WaitsForToString(List *waitsFor)
|
||||||
|
{
|
||||||
|
StringInfo transactionIdStr = makeStringInfo();
|
||||||
|
ListCell *waitsForCell = NULL;
|
||||||
|
|
||||||
|
foreach(waitsForCell, waitsFor)
|
||||||
|
{
|
||||||
|
TransactionNode *waitingNode = (TransactionNode *) lfirst(waitsForCell);
|
||||||
|
|
||||||
|
if (transactionIdStr->len != 0)
|
||||||
|
{
|
||||||
|
appendStringInfoString(transactionIdStr, ",");
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfo(transactionIdStr, "%ld",
|
||||||
|
waitingNode->transactionId.transactionNumber);
|
||||||
|
}
|
||||||
|
|
||||||
|
return transactionIdStr->data;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue