Merge pull request #1523 from citusdata/deadlock_detection_main

Convert the global wait edges to adjacency lists
pull/1501/head
Önder Kalacı 2017-07-27 20:14:49 +03:00 committed by GitHub
commit 6698ca8d9e
5 changed files with 394 additions and 0 deletions

View File

@ -0,0 +1,144 @@
/*-------------------------------------------------------------------------
*
* test/src/distributed_deadlock_detection.c
*
* This file contains functions to exercise distributed deadlock detection
* related lower level functionality.
*
* Copyright (c) 20167, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "access/hash.h"
#include "distributed/backend_data.h"
#include "distributed/distributed_deadlock_detection.h"
#include "distributed/hash_helpers.h"
#include "distributed/listutils.h"
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "distributed/transaction_identifier.h"
#include "nodes/pg_list.h"
#include "utils/hsearch.h"
#include "utils/timestamp.h"
static char * WaitsForToString(List *waitsFor);
PG_FUNCTION_INFO_V1(get_adjacency_list_wait_graph);
/*
* get_adjacency_list_wait_graph returns the wait graph in adjacency list format. For the
* details see BuildAdjacencyListForWaitGraph().
*
* This function is mostly useful for testing and debugging purposes.
*/
Datum
get_adjacency_list_wait_graph(PG_FUNCTION_ARGS)
{
ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = NULL;
MemoryContext perQueryContext = NULL;
MemoryContext oldContext = NULL;
WaitGraph *waitGraph = NULL;
HTAB *adjacencyList = NULL;
HASH_SEQ_STATUS status;
TransactionNode *transactionNode = NULL;
const int attributeCount = 2;
Datum values[attributeCount];
bool isNulls[attributeCount];
CheckCitusVersion(ERROR);
/* check to see if caller supports us returning a tuplestore */
if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context " \
"that cannot accept a set")));
}
if (!(returnSetInfo->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
}
/* build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
perQueryContext = returnSetInfo->econtext->ecxt_per_query_memory;
oldContext = MemoryContextSwitchTo(perQueryContext);
tupleStore = tuplestore_begin_heap(true, false, work_mem);
returnSetInfo->returnMode = SFRM_Materialize;
returnSetInfo->setResult = tupleStore;
returnSetInfo->setDesc = tupleDescriptor;
MemoryContextSwitchTo(oldContext);
waitGraph = BuildGlobalWaitGraph();
adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph);
/* iterate on all nodes */
hash_seq_init(&status, adjacencyList);
while ((transactionNode = (TransactionNode *) hash_seq_search(&status)) != 0)
{
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[0] = UInt64GetDatum(transactionNode->transactionId.transactionNumber);
values[1] = CStringGetDatum(WaitsForToString(transactionNode->waitsFor));
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}
/* clean up and return the tuplestore */
tuplestore_donestoring(tupleStore);
PG_RETURN_VOID();
}
/*
* WaitsForToString is only intended for testing and debugging. It gets a
* waitsForList and returns the list of transaction nodes' transactionNumber
* in a string.
*/
static char *
WaitsForToString(List *waitsFor)
{
StringInfo transactionIdStr = makeStringInfo();
ListCell *waitsForCell = NULL;
foreach(waitsForCell, waitsFor)
{
TransactionNode *waitingNode = (TransactionNode *) lfirst(waitsForCell);
if (transactionIdStr->len != 0)
{
appendStringInfoString(transactionIdStr, ",");
}
appendStringInfo(transactionIdStr, "%ld",
waitingNode->transactionId.transactionNumber);
}
return transactionIdStr->data;
}

View File

@ -0,0 +1,193 @@
/*-------------------------------------------------------------------------
*
* distributed_deadlock_detection.c
*
* Functions for performing distributed deadlock detection.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/hash.h"
#include "distributed/backend_data.h"
#include "distributed/distributed_deadlock_detection.h"
#include "distributed/hash_helpers.h"
#include "distributed/listutils.h"
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "distributed/transaction_identifier.h"
#include "nodes/pg_list.h"
#include "utils/hsearch.h"
#include "utils/timestamp.h"
static TransactionNode * GetOrCreateTransactionNode(HTAB *adjacencyList,
DistributedTransactionId *
transactionId);
static uint32 DistributedTransactionIdHash(const void *key, Size keysize);
static int DistributedTransactionIdCompare(const void *a, const void *b, Size keysize);
/*
* BuildAdjacencyListsForWaitGraph converts the input wait graph to
* an adjacency list for further processing.
*
* The input wait graph consists of set of wait edges between all
* backends in the Citus cluster.
*
* We represent the adjacency list with an HTAB structure. Each node is
* represented with a DistributedTransactionId and each edge is represented with
* a TransactionNode structure.
*
* While iterating over the input wait edges, we follow the algorithm
* below:
* for each edge in waitGraph:
* - find the corresponding nodes for waiting and
* blocking transactions in the adjacency list
* - if not found, add new node(s) to the list
* - Add blocking transaction to the waiting transaction's waitFor
* list
*
* The format of the adjacency list becomes the following:
* [transactionId] = [transactionNode->waitsFor {list of waiting transaction nodes}]
*/
extern HTAB *
BuildAdjacencyListsForWaitGraph(WaitGraph *waitGraph)
{
HASHCTL info;
uint32 hashFlags = 0;
HTAB *adjacencyList = NULL;
int edgeIndex = 0;
int edgeCount = waitGraph->edgeCount;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(DistributedTransactionId);
info.entrysize = sizeof(TransactionNode);
info.hash = DistributedTransactionIdHash;
info.match = DistributedTransactionIdCompare;
info.hcxt = CurrentMemoryContext;
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
adjacencyList = hash_create("distributed deadlock detection", 64, &info, hashFlags);
for (edgeIndex = 0; edgeIndex < edgeCount; edgeIndex++)
{
WaitEdge *edge = &waitGraph->edges[edgeIndex];
TransactionNode *waitingTransaction = NULL;
TransactionNode *blockingTransaction = NULL;
DistributedTransactionId waitingId = {
edge->waitingNodeId,
edge->waitingTransactionNum,
edge->waitingTransactionStamp
};
DistributedTransactionId blockingId = {
edge->blockingNodeId,
edge->blockingTransactionNum,
edge->blockingTransactionStamp
};
waitingTransaction =
GetOrCreateTransactionNode(adjacencyList, &waitingId);
blockingTransaction =
GetOrCreateTransactionNode(adjacencyList, &blockingId);
waitingTransaction->waitsFor = lappend(waitingTransaction->waitsFor,
blockingTransaction);
}
return adjacencyList;
}
/*
* GetOrCreateTransactionNode searches distributedTransactionHash for the given
* given transactionId. If the transaction is not found, a new transaction node
* with the given transaction identifier is added.
*/
static TransactionNode *
GetOrCreateTransactionNode(HTAB *adjacencyList, DistributedTransactionId *transactionId)
{
TransactionNode *transactionNode = NULL;
bool found = false;
transactionNode = (TransactionNode *) hash_search(adjacencyList, transactionId,
HASH_ENTER, &found);
if (!found)
{
transactionNode->waitsFor = NIL;
}
return transactionNode;
}
/*
* DistributedTransactionIdHash returns hashed value for a given distributed
* transaction id.
*/
static uint32
DistributedTransactionIdHash(const void *key, Size keysize)
{
DistributedTransactionId *entry = (DistributedTransactionId *) key;
uint32 hash = 0;
hash = hash_uint32(entry->initiatorNodeIdentifier);
hash = hash_combine(hash, hash_any((unsigned char *) &entry->transactionNumber,
sizeof(int64)));
hash = hash_combine(hash, hash_any((unsigned char *) &entry->timestamp,
sizeof(TimestampTz)));
return hash;
}
/*
* DistributedTransactionIdCompare compares DistributedTransactionId's a and b
* and returns -1 if a < b, 1 if a > b, 0 if they are equal.
*
* DistributedTransactionId are first compared by their timestamp, then transaction
* number, then node identifier.
*/
static int
DistributedTransactionIdCompare(const void *a, const void *b, Size keysize)
{
DistributedTransactionId *xactIdA = (DistributedTransactionId *) a;
DistributedTransactionId *xactIdB = (DistributedTransactionId *) b;
if (!TimestampDifferenceExceeds(xactIdB->timestamp, xactIdA->timestamp, 0))
{
/* ! (B <= A) = A < B */
return -1;
}
else if (!TimestampDifferenceExceeds(xactIdA->timestamp, xactIdB->timestamp, 0))
{
/* ! (A <= B) = A > B */
return 1;
}
else if (xactIdA->transactionNumber < xactIdB->transactionNumber)
{
return -1;
}
else if (xactIdA->transactionNumber > xactIdB->transactionNumber)
{
return 1;
}
else if (xactIdA->initiatorNodeIdentifier < xactIdB->initiatorNodeIdentifier)
{
return -1;
}
else if (xactIdA->initiatorNodeIdentifier > xactIdB->initiatorNodeIdentifier)
{
return 1;
}
else
{
return 0;
}
}

View File

@ -0,0 +1,35 @@
/*-------------------------------------------------------------------------
*
* distributed_deadlock_detection.h
* Type and function declarations used for performing distributed deadlock
* detection.
*
* Copyright (c) 2017, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef DISTRIBUTED_DEADLOCK_DETECTION_H
#define DISTRIBUTED_DEADLOCK_DETECTION_H
#include "postgres.h"
#include "access/hash.h"
#include "distributed/backend_data.h"
#include "distributed/listutils.h"
#include "distributed/lock_graph.h"
#include "distributed/transaction_identifier.h"
#include "nodes/pg_list.h"
typedef struct TransactionNode
{
DistributedTransactionId transactionId;
/* list of TransactionNode that this distributed transaction is waiting for */
List *waitsFor;
} TransactionNode;
HTAB * BuildAdjacencyListsForWaitGraph(WaitGraph *waitGraph);
#endif /* DISTRIBUTED_DEADLOCK_DETECTION_H */

View File

@ -25,9 +25,15 @@ step detector-dump-wait-edges:
blocking_transaction_num,
blocking_transaction_waiting;
SELECT * FROM get_adjacency_list_wait_graph() ORDER BY 1;
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
192 191 f
transactionnumberwaitingtransactionnumbers
191
192 191
step s1-abort:
ABORT;
@ -67,11 +73,18 @@ step detector-dump-wait-edges:
blocking_transaction_num,
blocking_transaction_waiting;
SELECT * FROM get_adjacency_list_wait_graph() ORDER BY 1;
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
196 195 f
197 195 f
197 196 t
transactionnumberwaitingtransactionnumbers
195
196 195
197 195,196
step s1-abort:
ABORT;

View File

@ -3,6 +3,13 @@ setup
CREATE TABLE distributed_table (x int primary key, y int);
SELECT create_distributed_table('distributed_table', 'x');
INSERT INTO distributed_table VALUES (1,0);
CREATE OR REPLACE FUNCTION get_adjacency_list_wait_graph(OUT transactionNumber int, OUT waitingTransactionNumbers cstring)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS 'citus', $$get_adjacency_list_wait_graph$$;
COMMENT ON FUNCTION get_adjacency_list_wait_graph(OUT transactionNumber int, OUT waitingTransactionNumbers cstring)
IS 'returns flattened wait graph';
}
teardown
@ -76,6 +83,8 @@ step "detector-dump-wait-edges"
waiting_transaction_num,
blocking_transaction_num,
blocking_transaction_waiting;
SELECT * FROM get_adjacency_list_wait_graph() ORDER BY 1;
}
# Distributed transaction blocked by another distributed transaction