diff --git a/src/backend/distributed/test/distributed_deadlock_detection.c b/src/backend/distributed/test/distributed_deadlock_detection.c new file mode 100644 index 000000000..dc3373dfc --- /dev/null +++ b/src/backend/distributed/test/distributed_deadlock_detection.c @@ -0,0 +1,144 @@ +/*------------------------------------------------------------------------- + * + * test/src/distributed_deadlock_detection.c + * + * This file contains functions to exercise distributed deadlock detection + * related lower level functionality. + * + * Copyright (c) 20167, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "funcapi.h" +#include "miscadmin.h" + +#include "access/hash.h" +#include "distributed/backend_data.h" +#include "distributed/distributed_deadlock_detection.h" +#include "distributed/hash_helpers.h" +#include "distributed/listutils.h" +#include "distributed/lock_graph.h" +#include "distributed/metadata_cache.h" +#include "distributed/transaction_identifier.h" +#include "nodes/pg_list.h" +#include "utils/hsearch.h" +#include "utils/timestamp.h" + + +static char * WaitsForToString(List *waitsFor); + + +PG_FUNCTION_INFO_V1(get_adjacency_list_wait_graph); + + +/* + * get_adjacency_list_wait_graph returns the wait graph in adjacency list format. For the + * details see BuildAdjacencyListForWaitGraph(). + * + * This function is mostly useful for testing and debugging purposes. + */ +Datum +get_adjacency_list_wait_graph(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = NULL; + MemoryContext perQueryContext = NULL; + MemoryContext oldContext = NULL; + + WaitGraph *waitGraph = NULL; + HTAB *adjacencyList = NULL; + HASH_SEQ_STATUS status; + TransactionNode *transactionNode = NULL; + + const int attributeCount = 2; + Datum values[attributeCount]; + bool isNulls[attributeCount]; + + CheckCitusVersion(ERROR); + + /* check to see if caller supports us returning a tuplestore */ + if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context " \ + "that cannot accept a set"))); + } + + if (!(returnSetInfo->allowedModes & SFRM_Materialize)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + } + + /* build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE) + { + elog(ERROR, "return type must be a row type"); + } + + perQueryContext = returnSetInfo->econtext->ecxt_per_query_memory; + + oldContext = MemoryContextSwitchTo(perQueryContext); + + tupleStore = tuplestore_begin_heap(true, false, work_mem); + returnSetInfo->returnMode = SFRM_Materialize; + returnSetInfo->setResult = tupleStore; + returnSetInfo->setDesc = tupleDescriptor; + + MemoryContextSwitchTo(oldContext); + + waitGraph = BuildGlobalWaitGraph(); + adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph); + + /* iterate on all nodes */ + hash_seq_init(&status, adjacencyList); + + while ((transactionNode = (TransactionNode *) hash_seq_search(&status)) != 0) + { + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[0] = UInt64GetDatum(transactionNode->transactionId.transactionNumber); + values[1] = CStringGetDatum(WaitsForToString(transactionNode->waitsFor)); + + tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupleStore); + + PG_RETURN_VOID(); +} + + +/* + * WaitsForToString is only intended for testing and debugging. It gets a + * waitsForList and returns the list of transaction nodes' transactionNumber + * in a string. + */ +static char * +WaitsForToString(List *waitsFor) +{ + StringInfo transactionIdStr = makeStringInfo(); + ListCell *waitsForCell = NULL; + + foreach(waitsForCell, waitsFor) + { + TransactionNode *waitingNode = (TransactionNode *) lfirst(waitsForCell); + + if (transactionIdStr->len != 0) + { + appendStringInfoString(transactionIdStr, ","); + } + + appendStringInfo(transactionIdStr, "%ld", + waitingNode->transactionId.transactionNumber); + } + + return transactionIdStr->data; +} diff --git a/src/backend/distributed/transaction/distributed_deadlock_detection.c b/src/backend/distributed/transaction/distributed_deadlock_detection.c new file mode 100644 index 000000000..fc11e3b77 --- /dev/null +++ b/src/backend/distributed/transaction/distributed_deadlock_detection.c @@ -0,0 +1,193 @@ +/*------------------------------------------------------------------------- + * + * distributed_deadlock_detection.c + * + * Functions for performing distributed deadlock detection. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" + +#include "access/hash.h" +#include "distributed/backend_data.h" +#include "distributed/distributed_deadlock_detection.h" +#include "distributed/hash_helpers.h" +#include "distributed/listutils.h" +#include "distributed/lock_graph.h" +#include "distributed/metadata_cache.h" +#include "distributed/transaction_identifier.h" +#include "nodes/pg_list.h" +#include "utils/hsearch.h" +#include "utils/timestamp.h" + + +static TransactionNode * GetOrCreateTransactionNode(HTAB *adjacencyList, + DistributedTransactionId * + transactionId); +static uint32 DistributedTransactionIdHash(const void *key, Size keysize); +static int DistributedTransactionIdCompare(const void *a, const void *b, Size keysize); + + +/* + * BuildAdjacencyListsForWaitGraph converts the input wait graph to + * an adjacency list for further processing. + * + * The input wait graph consists of set of wait edges between all + * backends in the Citus cluster. + * + * We represent the adjacency list with an HTAB structure. Each node is + * represented with a DistributedTransactionId and each edge is represented with + * a TransactionNode structure. + * + * While iterating over the input wait edges, we follow the algorithm + * below: + * for each edge in waitGraph: + * - find the corresponding nodes for waiting and + * blocking transactions in the adjacency list + * - if not found, add new node(s) to the list + * - Add blocking transaction to the waiting transaction's waitFor + * list + * + * The format of the adjacency list becomes the following: + * [transactionId] = [transactionNode->waitsFor {list of waiting transaction nodes}] + */ +extern HTAB * +BuildAdjacencyListsForWaitGraph(WaitGraph *waitGraph) +{ + HASHCTL info; + uint32 hashFlags = 0; + HTAB *adjacencyList = NULL; + int edgeIndex = 0; + int edgeCount = waitGraph->edgeCount; + + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(DistributedTransactionId); + info.entrysize = sizeof(TransactionNode); + info.hash = DistributedTransactionIdHash; + info.match = DistributedTransactionIdCompare; + info.hcxt = CurrentMemoryContext; + hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE); + + adjacencyList = hash_create("distributed deadlock detection", 64, &info, hashFlags); + + for (edgeIndex = 0; edgeIndex < edgeCount; edgeIndex++) + { + WaitEdge *edge = &waitGraph->edges[edgeIndex]; + TransactionNode *waitingTransaction = NULL; + TransactionNode *blockingTransaction = NULL; + + DistributedTransactionId waitingId = { + edge->waitingNodeId, + edge->waitingTransactionNum, + edge->waitingTransactionStamp + }; + + DistributedTransactionId blockingId = { + edge->blockingNodeId, + edge->blockingTransactionNum, + edge->blockingTransactionStamp + }; + + waitingTransaction = + GetOrCreateTransactionNode(adjacencyList, &waitingId); + blockingTransaction = + GetOrCreateTransactionNode(adjacencyList, &blockingId); + + waitingTransaction->waitsFor = lappend(waitingTransaction->waitsFor, + blockingTransaction); + } + + return adjacencyList; +} + + +/* + * GetOrCreateTransactionNode searches distributedTransactionHash for the given + * given transactionId. If the transaction is not found, a new transaction node + * with the given transaction identifier is added. + */ +static TransactionNode * +GetOrCreateTransactionNode(HTAB *adjacencyList, DistributedTransactionId *transactionId) +{ + TransactionNode *transactionNode = NULL; + bool found = false; + + transactionNode = (TransactionNode *) hash_search(adjacencyList, transactionId, + HASH_ENTER, &found); + if (!found) + { + transactionNode->waitsFor = NIL; + } + + return transactionNode; +} + + +/* + * DistributedTransactionIdHash returns hashed value for a given distributed + * transaction id. + */ +static uint32 +DistributedTransactionIdHash(const void *key, Size keysize) +{ + DistributedTransactionId *entry = (DistributedTransactionId *) key; + uint32 hash = 0; + + hash = hash_uint32(entry->initiatorNodeIdentifier); + hash = hash_combine(hash, hash_any((unsigned char *) &entry->transactionNumber, + sizeof(int64))); + hash = hash_combine(hash, hash_any((unsigned char *) &entry->timestamp, + sizeof(TimestampTz))); + + return hash; +} + + +/* + * DistributedTransactionIdCompare compares DistributedTransactionId's a and b + * and returns -1 if a < b, 1 if a > b, 0 if they are equal. + * + * DistributedTransactionId are first compared by their timestamp, then transaction + * number, then node identifier. + */ +static int +DistributedTransactionIdCompare(const void *a, const void *b, Size keysize) +{ + DistributedTransactionId *xactIdA = (DistributedTransactionId *) a; + DistributedTransactionId *xactIdB = (DistributedTransactionId *) b; + + if (!TimestampDifferenceExceeds(xactIdB->timestamp, xactIdA->timestamp, 0)) + { + /* ! (B <= A) = A < B */ + return -1; + } + else if (!TimestampDifferenceExceeds(xactIdA->timestamp, xactIdB->timestamp, 0)) + { + /* ! (A <= B) = A > B */ + return 1; + } + else if (xactIdA->transactionNumber < xactIdB->transactionNumber) + { + return -1; + } + else if (xactIdA->transactionNumber > xactIdB->transactionNumber) + { + return 1; + } + else if (xactIdA->initiatorNodeIdentifier < xactIdB->initiatorNodeIdentifier) + { + return -1; + } + else if (xactIdA->initiatorNodeIdentifier > xactIdB->initiatorNodeIdentifier) + { + return 1; + } + else + { + return 0; + } +} diff --git a/src/include/distributed/distributed_deadlock_detection.h b/src/include/distributed/distributed_deadlock_detection.h new file mode 100644 index 000000000..f00c98317 --- /dev/null +++ b/src/include/distributed/distributed_deadlock_detection.h @@ -0,0 +1,35 @@ +/*------------------------------------------------------------------------- + * + * distributed_deadlock_detection.h + * Type and function declarations used for performing distributed deadlock + * detection. + * + * Copyright (c) 2017, Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#ifndef DISTRIBUTED_DEADLOCK_DETECTION_H +#define DISTRIBUTED_DEADLOCK_DETECTION_H + +#include "postgres.h" + +#include "access/hash.h" +#include "distributed/backend_data.h" +#include "distributed/listutils.h" +#include "distributed/lock_graph.h" +#include "distributed/transaction_identifier.h" +#include "nodes/pg_list.h" + +typedef struct TransactionNode +{ + DistributedTransactionId transactionId; + + /* list of TransactionNode that this distributed transaction is waiting for */ + List *waitsFor; +} TransactionNode; + + +HTAB * BuildAdjacencyListsForWaitGraph(WaitGraph *waitGraph); + + +#endif /* DISTRIBUTED_DEADLOCK_DETECTION_H */ diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out index 585a9fd9e..16b50ba93 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -25,9 +25,15 @@ step detector-dump-wait-edges: blocking_transaction_num, blocking_transaction_waiting; + SELECT * FROM get_adjacency_list_wait_graph() ORDER BY 1; + waiting_transaction_numblocking_transaction_numblocking_transaction_waiting 192 191 f +transactionnumberwaitingtransactionnumbers + +191 +192 191 step s1-abort: ABORT; @@ -67,11 +73,18 @@ step detector-dump-wait-edges: blocking_transaction_num, blocking_transaction_waiting; + SELECT * FROM get_adjacency_list_wait_graph() ORDER BY 1; + waiting_transaction_numblocking_transaction_numblocking_transaction_waiting 196 195 f 197 195 f 197 196 t +transactionnumberwaitingtransactionnumbers + +195 +196 195 +197 195,196 step s1-abort: ABORT; diff --git a/src/test/regress/specs/isolation_dump_global_wait_edges.spec b/src/test/regress/specs/isolation_dump_global_wait_edges.spec index 869ccf569..23f2b761e 100644 --- a/src/test/regress/specs/isolation_dump_global_wait_edges.spec +++ b/src/test/regress/specs/isolation_dump_global_wait_edges.spec @@ -3,6 +3,13 @@ setup CREATE TABLE distributed_table (x int primary key, y int); SELECT create_distributed_table('distributed_table', 'x'); INSERT INTO distributed_table VALUES (1,0); + + CREATE OR REPLACE FUNCTION get_adjacency_list_wait_graph(OUT transactionNumber int, OUT waitingTransactionNumbers cstring) + RETURNS SETOF RECORD + LANGUAGE C STRICT + AS 'citus', $$get_adjacency_list_wait_graph$$; + COMMENT ON FUNCTION get_adjacency_list_wait_graph(OUT transactionNumber int, OUT waitingTransactionNumbers cstring) + IS 'returns flattened wait graph'; } teardown @@ -76,6 +83,8 @@ step "detector-dump-wait-edges" waiting_transaction_num, blocking_transaction_num, blocking_transaction_waiting; + + SELECT * FROM get_adjacency_list_wait_graph() ORDER BY 1; } # Distributed transaction blocked by another distributed transaction