From 81198a1d023951aa0f13f40879f1fb0a3ee49354 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 11 Jul 2017 17:52:27 +0200 Subject: [PATCH 1/2] Add function for dumping local wait edges --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--7.0-5--7.0-6.sql | 18 + src/backend/distributed/citus.control | 2 +- .../distributed/transaction/backend_data.c | 26 + .../distributed/transaction/lock_graph.c | 537 ++++++++++++++++++ src/include/distributed/backend_data.h | 3 + src/include/distributed/lock_graph.h | 57 ++ .../isolation_dump_local_wait_edges.out | 134 +++++ src/test/regress/expected/multi_extension.out | 2 + src/test/regress/isolation_schedule | 1 + .../isolation_dump_local_wait_edges.spec | 87 +++ src/test/regress/sql/multi_extension.sql | 2 + 12 files changed, 871 insertions(+), 2 deletions(-) create mode 100644 src/backend/distributed/citus--7.0-5--7.0-6.sql create mode 100644 src/backend/distributed/transaction/lock_graph.c create mode 100644 src/include/distributed/lock_graph.h create mode 100644 src/test/regress/expected/isolation_dump_local_wait_edges.out create mode 100644 src/test/regress/specs/isolation_dump_local_wait_edges.spec diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index f7f094846..038332bbd 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \ 6.2-1 6.2-2 6.2-3 6.2-4 \ - 7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 + 7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -149,6 +149,8 @@ $(EXTENSION)--7.0-4.sql: $(EXTENSION)--7.0-3.sql $(EXTENSION)--7.0-3--7.0-4.sql cat $^ > $@ $(EXTENSION)--7.0-5.sql: $(EXTENSION)--7.0-4.sql $(EXTENSION)--7.0-4--7.0-5.sql cat $^ > $@ +$(EXTENSION)--7.0-6.sql: $(EXTENSION)--7.0-5.sql $(EXTENSION)--7.0-5--7.0-6.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--7.0-5--7.0-6.sql b/src/backend/distributed/citus--7.0-5--7.0-6.sql new file mode 100644 index 000000000..e30273161 --- /dev/null +++ b/src/backend/distributed/citus--7.0-5--7.0-6.sql @@ -0,0 +1,18 @@ +/* citus--7.0-5--7.0-6 */ + +CREATE FUNCTION pg_catalog.dump_local_wait_edges( + IN source_node_id int4, + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE 'c' STRICT +AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(int) +IS 'returns a local list of blocked transactions originating from source_node_id'; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 661f7164b..f959059f3 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '7.0-5' +default_version = '7.0-6' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 67455c0f9..e3c46149a 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -501,3 +501,29 @@ CurrentDistributedTransactionNumber(void) return MyBackendData->transactionId.transactionNumber; } + + +/* + * GetBackendDataForProc writes the backend data for the given process to + * result. If the process is part of a lock group (parallel query) it + * returns the leader data instead. + */ +void +GetBackendDataForProc(PGPROC *proc, BackendData *result) +{ + BackendData *backendData = NULL; + int pgprocno = proc->pgprocno; + + if (proc->lockGroupLeader != NULL) + { + pgprocno = proc->lockGroupLeader->pgprocno; + } + + backendData = &backendManagementShmemData->backends[pgprocno]; + + SpinLockAcquire(&backendData->mutex); + + memcpy(result, backendData, sizeof(BackendData)); + + SpinLockRelease(&backendData->mutex); +} diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c new file mode 100644 index 000000000..5416bba6c --- /dev/null +++ b/src/backend/distributed/transaction/lock_graph.c @@ -0,0 +1,537 @@ +/*------------------------------------------------------------------------- + * + * lock_graph.c + * + * Functions for obtaining local and global lock graphs in which each + * node is a distributed transaction, and an edge represent a waiting-for + * relationship. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "libpq-fe.h" +#include "miscadmin.h" + +#include "access/hash.h" +#include "distributed/backend_data.h" +#include "distributed/hash_helpers.h" +#include "distributed/lock_graph.h" +#include "distributed/metadata_cache.h" +#include "storage/proc.h" +#include "utils/builtins.h" +#include "utils/hsearch.h" +#include "utils/timestamp.h" + + +/* + * PROCStack is a stack of PGPROC pointers used to perform a depth-first search + * through the lock graph. + */ +typedef struct PROCStack +{ + int procCount; + PGPROC **procs; +} PROCStack; + + +static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); +static WaitGraph * BuildWaitGraphForSourceNode(int sourceNodeId); +static void LockLockData(void); +static void UnlockLockData(void); +static void AddEdgesForLockWaits(WaitGraph *waitGraph, PGPROC *waitingProc, + PROCStack *remaining); +static void AddEdgesForWaitQueue(WaitGraph *waitGraph, PGPROC *waitingProc, + PROCStack *remaining); +static void AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc, + PROCStack *remaining); +static WaitEdge * AllocWaitEdge(WaitGraph *waitGraph); +static bool IsProcessWaitingForLock(PGPROC *proc); +static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc); +static bool IsConflictingLockMask(int holdMask, int conflictMask); +static bool IsInDistributedTransaction(BackendData *backendData); + + +PG_FUNCTION_INFO_V1(dump_local_wait_edges); + + +/* + * dump_local_wait_edges returns wait edges for distributed transactions + * running on the node on which it is called, which originate from the source node. + */ +Datum +dump_local_wait_edges(PG_FUNCTION_ARGS) +{ + int32 sourceNodeId = PG_GETARG_INT32(0); + + WaitGraph *waitGraph = BuildWaitGraphForSourceNode(sourceNodeId); + ReturnWaitGraph(waitGraph, fcinfo); + + return (Datum) 0; +} + + +/* + * ReturnWaitGraph returns a wait graph for a set returning function. + */ +static void +ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc = NULL; + Tuplestorestate *tupstore = NULL; + MemoryContext per_query_ctx = NULL; + MemoryContext oldcontext = NULL; + size_t curEdgeNum = 0; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "set-valued function called in context that cannot accept a set"))); + } + if (!(rsinfo->allowedModes & SFRM_Materialize)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + } + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + { + elog(ERROR, "return type must be a row type"); + } + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); + + /* + * Columns: + * 00: waiting_pid + * 01: waiting_node_id + * 02: waiting_transaction_num + * 03: waiting_transaction_stamp + * 04: blocking_pid + * 05: blocking__node_id + * 06: blocking_transaction_num + * 07: blocking_transaction_stamp + * 08: blocking_transaction_waiting + */ + for (curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++) + { + Datum values[9]; + bool nulls[9]; + WaitEdge *curEdge = &waitGraph->edges[curEdgeNum]; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(curEdge->waitingPid); + values[1] = Int32GetDatum(curEdge->waitingNodeId); + if (curEdge->waitingTransactionNum != 0) + { + values[2] = Int64GetDatum(curEdge->waitingTransactionNum); + values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp); + } + else + { + nulls[2] = true; + nulls[3] = true; + } + + values[4] = Int32GetDatum(curEdge->blockingPid); + values[5] = Int32GetDatum(curEdge->blockingNodeId); + if (curEdge->blockingTransactionNum != 0) + { + values[6] = Int64GetDatum(curEdge->blockingTransactionNum); + values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp); + } + else + { + nulls[6] = true; + nulls[7] = true; + } + values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); +} + + +/* + * BuildWaitGraphForSourceNode builds a wait graph for distributed transactions + * that originate from the given source node. + */ +static WaitGraph * +BuildWaitGraphForSourceNode(int sourceNodeId) +{ + WaitGraph *waitGraph = NULL; + int curBackend = 0; + bool visitedProcs[MaxBackends]; + PROCStack remaining; + + memset(visitedProcs, 0, MaxBackends); + + /* + * Try hard to avoid allocations while holding lock. Thus we pre-allocate + * space for locks in large batches - for common scenarios this should be + * more than enough space to build the list of wait edges without a single + * allocation. + */ + waitGraph = (WaitGraph *) palloc0(sizeof(WaitGraph)); + waitGraph->localNodeId = GetLocalGroupId(); + waitGraph->allocatedSize = MaxBackends * 3; + waitGraph->edgeCount = 0; + waitGraph->edges = (WaitEdge *) palloc(waitGraph->allocatedSize * sizeof(WaitEdge)); + + remaining.procs = (PGPROC **) palloc(sizeof(PGPROC *) * MaxBackends); + remaining.procCount = 0; + + LockLockData(); + + /* + * Build lock-graph. We do so by first finding all procs which we are + * interested in (originating on our source system, and blocked). Once + * those are collected, do depth first search over all procs blocking + * those. To avoid redundantly visiting procs, keep track of which procs + * already have been visited in a pgproc-indexed visitedProcs[] array. + */ + + /* build list of starting procs */ + for (curBackend = 0; curBackend < MaxBackends; curBackend++) + { + PGPROC *currentProc = &ProcGlobal->allProcs[curBackend]; + BackendData currentBackendData; + + /* skip if the PGPROC slot is unused */ + if (currentProc->pid == 0) + { + continue; + } + + GetBackendDataForProc(currentProc, ¤tBackendData); + + /* + * Only start searching from distributed transactions originating on the source + * node. Other deadlocks may exist, but the source node can only resolve those + * that involve its own transactions. + */ + if (sourceNodeId != currentBackendData.transactionId.initiatorNodeIdentifier || + !IsInDistributedTransaction(¤tBackendData)) + { + continue; + } + + /* skip if the process is not blocked */ + if (!IsProcessWaitingForLock(currentProc)) + { + continue; + } + + remaining.procs[remaining.procCount++] = currentProc; + } + + while (remaining.procCount > 0) + { + PGPROC *waitingProc = remaining.procs[--remaining.procCount]; + + /* + * We might find a process again if multiple distributed transactions are + * waiting for it, but we add all edges on the first visit so we don't need + * to visit it again. This also avoids getting into an infinite loop in + * case of a local deadlock. + */ + if (visitedProcs[waitingProc->pgprocno]) + { + continue; + } + + visitedProcs[waitingProc->pgprocno] = true; + + /* only blocked processes result in wait edges */ + if (!IsProcessWaitingForLock(waitingProc)) + { + continue; + } + + /* + * Record an edge for everyone already holding the lock in a + * conflicting manner ("hard edges" in postgres parlance). + */ + AddEdgesForLockWaits(waitGraph, waitingProc, &remaining); + + /* + * Record an edge for everyone in front of us in the wait-queue + * for the lock ("soft edges" in postgres parlance). + */ + AddEdgesForWaitQueue(waitGraph, waitingProc, &remaining); + } + + UnlockLockData(); + + return waitGraph; +} + + +/* + * LockLockData takes locks the shared lock data structure, which prevents + * concurrent lock acquisitions/releases. + */ +static void +LockLockData(void) +{ + int partitionNum = 0; + + for (partitionNum = 0; partitionNum < NUM_LOCK_PARTITIONS; partitionNum++) + { + LWLockAcquire(LockHashPartitionLockByIndex(partitionNum), LW_SHARED); + } +} + + +/* + * UnlockLockData unlocks the locks on the shared lock data structure in reverse + * order since LWLockRelease searches the given lock from the end of the + * held_lwlocks array. + */ +static void +UnlockLockData(void) +{ + int partitionNum = 0; + + for (partitionNum = NUM_LOCK_PARTITIONS - 1; partitionNum >= 0; partitionNum--) + { + LWLockRelease(LockHashPartitionLockByIndex(partitionNum)); + } +} + + +/* + * AddEdgesForLockWaits adds an edge to the wait graph for every granted lock + * that waitingProc is waiting for. + * + * This function iterates over the procLocks data structure in shared memory, + * which also contains entries for locks which have not been granted yet, but + * it does not reflect the order of the wait queue. We therefore handle the + * wait queue separately. + */ +static void +AddEdgesForLockWaits(WaitGraph *waitGraph, PGPROC *waitingProc, PROCStack *remaining) +{ + /* the lock for which this process is waiting */ + LOCK *waitLock = waitingProc->waitLock; + + /* determine the conflict mask for the lock level used by the process */ + LockMethod lockMethodTable = GetLocksMethodTable(waitLock); + int conflictMask = lockMethodTable->conflictTab[waitingProc->waitLockMode]; + + /* iterate through the queue of processes holding the lock */ + SHM_QUEUE *procLocks = &waitLock->procLocks; + PROCLOCK *procLock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, lockLink)); + + while (procLock != NULL) + { + PGPROC *currentProc = procLock->tag.myProc; + + /* skip processes from the same lock group and ones that don't conflict */ + if (!IsSameLockGroup(waitingProc, currentProc) && + IsConflictingLockMask(procLock->holdMask, conflictMask)) + { + AddWaitEdge(waitGraph, waitingProc, currentProc, remaining); + } + + procLock = (PROCLOCK *) SHMQueueNext(procLocks, &procLock->lockLink, + offsetof(PROCLOCK, lockLink)); + } +} + + +/* + * AddEdgesForWaitQueue adds an edge to the wait graph for processes in front of + * waitingProc in the wait queue that are trying to acquire a conflicting lock. + */ +static void +AddEdgesForWaitQueue(WaitGraph *waitGraph, PGPROC *waitingProc, PROCStack *remaining) +{ + /* the lock for which this process is waiting */ + LOCK *waitLock = waitingProc->waitLock; + + /* determine the conflict mask for the lock level used by the process */ + LockMethod lockMethodTable = GetLocksMethodTable(waitLock); + int conflictMask = lockMethodTable->conflictTab[waitingProc->waitLockMode]; + + /* iterate through the wait queue */ + PROC_QUEUE *waitQueue = &(waitLock->waitProcs); + int queueSize = waitQueue->size; + PGPROC *currentProc = (PGPROC *) waitQueue->links.next; + + /* + * Iterate through the queue from the start until we encounter waitingProc, + * since we only care about processes in front of waitingProc in the queue. + */ + while (queueSize-- > 0 && currentProc != waitingProc) + { + int awaitMask = LOCKBIT_ON(currentProc->waitLockMode); + + /* skip processes from the same lock group and ones that don't conflict */ + if (!IsSameLockGroup(waitingProc, currentProc) && + IsConflictingLockMask(awaitMask, conflictMask)) + { + AddWaitEdge(waitGraph, waitingProc, currentProc, remaining); + } + + currentProc = (PGPROC *) currentProc->links.next; + } +} + + +/* + * AddWaitEdge adds a new wait edge to a wait graph. The nodes in the graph are + * transactions and an edge indicates the "waiting" process is blocked on a lock + * held by the "blocking" process. + * + * If the blocking process is itself waiting then it is added to the remaining + * stack. + */ +static void +AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc, + PROCStack *remaining) +{ + WaitEdge *curEdge = AllocWaitEdge(waitGraph); + BackendData waitingBackendData; + BackendData blockingBackendData; + + GetBackendDataForProc(waitingProc, &waitingBackendData); + GetBackendDataForProc(blockingProc, &blockingBackendData); + + curEdge->isBlockingXactWaiting = IsProcessWaitingForLock(blockingProc); + if (curEdge->isBlockingXactWaiting) + { + remaining->procs[remaining->procCount++] = blockingProc; + } + + curEdge->waitingPid = waitingProc->pid; + + if (IsInDistributedTransaction(&waitingBackendData)) + { + DistributedTransactionId *waitingTransactionId = + &waitingBackendData.transactionId; + + curEdge->waitingNodeId = waitingTransactionId->initiatorNodeIdentifier; + curEdge->waitingTransactionNum = waitingTransactionId->transactionNumber; + curEdge->waitingTransactionStamp = waitingTransactionId->timestamp; + } + else + { + curEdge->waitingNodeId = waitGraph->localNodeId; + curEdge->waitingTransactionNum = 0; + curEdge->waitingTransactionStamp = 0; + } + + curEdge->blockingPid = blockingProc->pid; + + if (IsInDistributedTransaction(&blockingBackendData)) + { + DistributedTransactionId *blockingTransactionId = + &blockingBackendData.transactionId; + + curEdge->blockingNodeId = blockingTransactionId->initiatorNodeIdentifier; + curEdge->blockingTransactionNum = blockingTransactionId->transactionNumber; + curEdge->blockingTransactionStamp = blockingTransactionId->timestamp; + } + else + { + curEdge->blockingNodeId = waitGraph->localNodeId; + curEdge->blockingTransactionNum = 0; + curEdge->blockingTransactionStamp = 0; + } +} + + +/* + * AllocWaitEdge allocates a wait edge as part of the given wait graph. + * If the wait graph has insufficient space its size is doubled using + * repalloc. + */ +static WaitEdge * +AllocWaitEdge(WaitGraph *waitGraph) +{ + /* ensure space for new edge */ + if (waitGraph->allocatedSize == waitGraph->edgeCount) + { + waitGraph->allocatedSize *= 2; + waitGraph->edges = (WaitEdge *) + repalloc(waitGraph->edges, sizeof(WaitEdge) * + waitGraph->allocatedSize); + } + + return &waitGraph->edges[waitGraph->edgeCount++]; +} + + +/* + * IsProcessWaitingForLock returns whether a given process is waiting for a lock. + */ +static bool +IsProcessWaitingForLock(PGPROC *proc) +{ + return proc->waitStatus == STATUS_WAITING; +} + + +/* + * IsSameLockGroup returns whether two processes are part of the same lock group, + * meaning they are either the same process, or have the same lock group leader. + */ +static bool +IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc) +{ + return leftProc == rightProc || + (leftProc->lockGroupLeader != NULL && + leftProc->lockGroupLeader == rightProc->lockGroupLeader); +} + + +/* + * IsConflictingLockMask returns whether the given conflict mask conflicts with the + * holdMask. + * + * holdMask is a bitmask with the i-th bit turned on if a lock mode i is held. + * + * conflictMask is a bitmask with the j-th bit turned on if it conflicts with + * lock mode i. + */ +static bool +IsConflictingLockMask(int holdMask, int conflictMask) +{ + return (holdMask & conflictMask) != 0; +} + + +/* + * IsInDistributedTransaction returns whether the given backend is in a + * distributed transaction. + */ +static bool +IsInDistributedTransaction(BackendData *backendData) +{ + return backendData->transactionId.transactionNumber != 0; +} diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index 6d0e52adc..f5020c16d 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -16,6 +16,8 @@ #include "datatype/timestamp.h" #include "distributed/transaction_identifier.h" #include "nodes/pg_list.h" +#include "storage/lwlock.h" +#include "storage/proc.h" #include "storage/s_lock.h" @@ -35,5 +37,6 @@ extern void InitializeBackendManagement(void); extern void InitializeBackendData(void); extern void UnSetDistributedTransactionId(void); extern void AssignDistributedTransactionId(void); +extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); #endif /* BACKEND_DATA_H */ diff --git a/src/include/distributed/lock_graph.h b/src/include/distributed/lock_graph.h new file mode 100644 index 000000000..548b12ecb --- /dev/null +++ b/src/include/distributed/lock_graph.h @@ -0,0 +1,57 @@ +/* + * lock_graph.h + * + * Data structures and functions for gathering lock graphs between + * distributed transactions. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef LOCK_GRAPH_H +#define LOCK_GRAPH_H + + +#include "postgres.h" +#include "datatype/timestamp.h" + + +/* + * Describes an edge in a waiting-for graph of locks. This isn't used for + * deadlock-checking directly, but to gather the information necessary to + * do so. + * + * The datatypes here are a bit looser than strictly necessary, because + * they're transported as the return type from an SQL function. + */ +typedef struct WaitEdge +{ + int waitingPid; + int waitingNodeId; + int64 waitingTransactionNum; + TimestampTz waitingTransactionStamp; + + int blockingPid; + int blockingNodeId; + int64 blockingTransactionNum; + TimestampTz blockingTransactionStamp; + + /* blocking transaction is also waiting on a lock */ + bool isBlockingXactWaiting; +} WaitEdge; + + +/* + * WaitGraph represent a graph of wait edges as an adjacency list. + */ +typedef struct WaitGraph +{ + int localNodeId; + int allocatedSize; + int edgeCount; + WaitEdge *edges; +} WaitGraph; + + +#endif /* LOCK_GRAPH_H */ diff --git a/src/test/regress/expected/isolation_dump_local_wait_edges.out b/src/test/regress/expected/isolation_dump_local_wait_edges.out new file mode 100644 index 000000000..d0adacd3f --- /dev/null +++ b/src/test/regress/expected/isolation_dump_local_wait_edges.out @@ -0,0 +1,134 @@ +Parsed test spec with 4 sessions + +starting permutation: dist11-begin dist13-begin dist11-update dist13-update detector-dump-wait-edges dist11-abort dist13-abort +step dist11-begin: + BEGIN; + SELECT assign_distributed_transaction_id(11, 1, '2017-01-01 00:00:00+0'); + +assign_distributed_transaction_id + + +step dist13-begin: + BEGIN; + SELECT assign_distributed_transaction_id(13, 1, '2017-01-01 00:00:00+0'); + +assign_distributed_transaction_id + + +step dist11-update: + UPDATE local_table SET y = 1 WHERE x = 1; + +step dist13-update: + UPDATE local_table SET y = 3 WHERE x = 1; + +step detector-dump-wait-edges: + SELECT + waiting_node_id, + waiting_transaction_num, + blocking_node_id, + blocking_transaction_num, + blocking_transaction_waiting + FROM + dump_local_wait_edges(13); + +waiting_node_idwaiting_transaction_numblocking_node_idblocking_transaction_numblocking_transaction_waiting + +13 1 11 1 f +step dist11-abort: + ABORT; + +step dist13-update: <... completed> +step dist13-abort: + ABORT; + + +starting permutation: local-begin dist13-begin local-update dist13-update detector-dump-wait-edges local-abort dist13-abort +step local-begin: + BEGIN; + +step dist13-begin: + BEGIN; + SELECT assign_distributed_transaction_id(13, 1, '2017-01-01 00:00:00+0'); + +assign_distributed_transaction_id + + +step local-update: + UPDATE local_table SET y = 2 WHERE x = 1; + +step dist13-update: + UPDATE local_table SET y = 3 WHERE x = 1; + +step detector-dump-wait-edges: + SELECT + waiting_node_id, + waiting_transaction_num, + blocking_node_id, + blocking_transaction_num, + blocking_transaction_waiting + FROM + dump_local_wait_edges(13); + +waiting_node_idwaiting_transaction_numblocking_node_idblocking_transaction_numblocking_transaction_waiting + +13 1 0 f +step local-abort: + ABORT; + +step dist13-update: <... completed> +step dist13-abort: + ABORT; + + +starting permutation: dist11-begin local-begin dist13-begin dist11-update local-update dist13-update detector-dump-wait-edges dist11-abort local-abort dist13-abort +step dist11-begin: + BEGIN; + SELECT assign_distributed_transaction_id(11, 1, '2017-01-01 00:00:00+0'); + +assign_distributed_transaction_id + + +step local-begin: + BEGIN; + +step dist13-begin: + BEGIN; + SELECT assign_distributed_transaction_id(13, 1, '2017-01-01 00:00:00+0'); + +assign_distributed_transaction_id + + +step dist11-update: + UPDATE local_table SET y = 1 WHERE x = 1; + +step local-update: + UPDATE local_table SET y = 2 WHERE x = 1; + +step dist13-update: + UPDATE local_table SET y = 3 WHERE x = 1; + +step detector-dump-wait-edges: + SELECT + waiting_node_id, + waiting_transaction_num, + blocking_node_id, + blocking_transaction_num, + blocking_transaction_waiting + FROM + dump_local_wait_edges(13); + +waiting_node_idwaiting_transaction_numblocking_node_idblocking_transaction_numblocking_transaction_waiting + +13 1 0 t +0 11 1 f +step dist11-abort: + ABORT; + +step local-update: <... completed> +step local-abort: + ABORT; + +step dist13-update: <... completed> +step dist13-abort: + ABORT; + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 038c3b992..062528ba9 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -114,6 +114,8 @@ ALTER EXTENSION citus UPDATE TO '7.0-1'; ALTER EXTENSION citus UPDATE TO '7.0-2'; ALTER EXTENSION citus UPDATE TO '7.0-3'; ALTER EXTENSION citus UPDATE TO '7.0-4'; +ALTER EXTENSION citus UPDATE TO '7.0-5'; +ALTER EXTENSION citus UPDATE TO '7.0-6'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 8b963518e..84e796045 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -10,3 +10,4 @@ test: isolation_concurrent_dml isolation_data_migration test: isolation_drop_shards isolation_copy_placement_vs_modification test: isolation_insert_vs_vacuum isolation_transaction_recovery test: isolation_distributed_transaction_id +test: isolation_dump_local_wait_edges diff --git a/src/test/regress/specs/isolation_dump_local_wait_edges.spec b/src/test/regress/specs/isolation_dump_local_wait_edges.spec new file mode 100644 index 000000000..06a2ad97a --- /dev/null +++ b/src/test/regress/specs/isolation_dump_local_wait_edges.spec @@ -0,0 +1,87 @@ +setup +{ + CREATE TABLE local_table (x int primary key, y int); + INSERT INTO local_table VALUES (1,0); +} + +teardown +{ + DROP TABLE local_table; +} + +session "dist11" + +step "dist11-begin" +{ + BEGIN; + SELECT assign_distributed_transaction_id(11, 1, '2017-01-01 00:00:00+0'); +} + +step "dist11-update" +{ + UPDATE local_table SET y = 1 WHERE x = 1; +} + +step "dist11-abort" +{ + ABORT; +} + +session "local" + +step "local-begin" +{ + BEGIN; +} + +step "local-update" +{ + UPDATE local_table SET y = 2 WHERE x = 1; +} + +step "local-abort" +{ + ABORT; +} + +session "dist13" + +step "dist13-begin" +{ + BEGIN; + SELECT assign_distributed_transaction_id(13, 1, '2017-01-01 00:00:00+0'); +} + +step "dist13-update" +{ + UPDATE local_table SET y = 3 WHERE x = 1; +} + +step "dist13-abort" +{ + ABORT; +} + + +session "detector" + +step "detector-dump-wait-edges" +{ + SELECT + waiting_node_id, + waiting_transaction_num, + blocking_node_id, + blocking_transaction_num, + blocking_transaction_waiting + FROM + dump_local_wait_edges(13); +} + +# Distributed transaction blocked by another distributed transaction +permutation "dist11-begin" "dist13-begin" "dist11-update" "dist13-update" "detector-dump-wait-edges" "dist11-abort" "dist13-abort" + +# Distributed transaction blocked by a regular transaction +permutation "local-begin" "dist13-begin" "local-update" "dist13-update" "detector-dump-wait-edges" "local-abort" "dist13-abort" + +# Distributed transaction blocked by a regular transaction blocked by a distributed transaction +permutation "dist11-begin" "local-begin" "dist13-begin" "dist11-update" "local-update" "dist13-update" "detector-dump-wait-edges" "dist11-abort" "local-abort" "dist13-abort" diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 5895adb0d..5d8d124ba 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -114,6 +114,8 @@ ALTER EXTENSION citus UPDATE TO '7.0-1'; ALTER EXTENSION citus UPDATE TO '7.0-2'; ALTER EXTENSION citus UPDATE TO '7.0-3'; ALTER EXTENSION citus UPDATE TO '7.0-4'; +ALTER EXTENSION citus UPDATE TO '7.0-5'; +ALTER EXTENSION citus UPDATE TO '7.0-6'; -- show running version SHOW citus.version; From 80ea233ec1c5fb1510d9a075b32034c8bbf1f321 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 17 Jul 2017 17:26:01 +0200 Subject: [PATCH 2/2] Add function for dumping global wait edges --- .../distributed/citus--7.0-5--7.0-6.sql | 16 ++ .../distributed/transaction/lock_graph.c | 242 ++++++++++++++++-- src/include/distributed/lock_graph.h | 3 + .../isolation_dump_global_wait_edges.out | 85 ++++++ src/test/regress/isolation_schedule | 2 +- .../isolation_dump_global_wait_edges.spec | 85 ++++++ 6 files changed, 416 insertions(+), 17 deletions(-) create mode 100644 src/test/regress/expected/isolation_dump_global_wait_edges.out create mode 100644 src/test/regress/specs/isolation_dump_global_wait_edges.spec diff --git a/src/backend/distributed/citus--7.0-5--7.0-6.sql b/src/backend/distributed/citus--7.0-5--7.0-6.sql index e30273161..e09520be4 100644 --- a/src/backend/distributed/citus--7.0-5--7.0-6.sql +++ b/src/backend/distributed/citus--7.0-5--7.0-6.sql @@ -16,3 +16,19 @@ LANGUAGE 'c' STRICT AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(int) IS 'returns a local list of blocked transactions originating from source_node_id'; + +CREATE FUNCTION pg_catalog.dump_global_wait_edges( + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE 'c' STRICT +AS $$MODULE_PATHNAME$$, $$dump_global_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_global_wait_edges() +IS 'returns a global list of blocked transactions originating from this node'; diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 5416bba6c..9a790c0d4 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -19,9 +19,11 @@ #include "access/hash.h" #include "distributed/backend_data.h" +#include "distributed/connection_management.h" #include "distributed/hash_helpers.h" #include "distributed/lock_graph.h" #include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" #include "storage/proc.h" #include "utils/builtins.h" #include "utils/hsearch.h" @@ -39,6 +41,10 @@ typedef struct PROCStack } PROCStack; +static void AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex); +static int64 ParseIntField(PGresult *result, int rowIndex, int colIndex); +static bool ParseBoolField(PGresult *result, int rowIndex, int colIndex); +static TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex); static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); static WaitGraph * BuildWaitGraphForSourceNode(int sourceNodeId); static void LockLockData(void); @@ -57,6 +63,210 @@ static bool IsInDistributedTransaction(BackendData *backendData); PG_FUNCTION_INFO_V1(dump_local_wait_edges); +PG_FUNCTION_INFO_V1(dump_global_wait_edges); + + +/* + * dump_global_wait_edges returns global wait edges for distributed transactions + * originating from the node on which it is started. + */ +Datum +dump_global_wait_edges(PG_FUNCTION_ARGS) +{ + WaitGraph *waitGraph = NULL; + + waitGraph = BuildGlobalWaitGraph(); + + ReturnWaitGraph(waitGraph, fcinfo); + + return (Datum) 0; +} + + +/* + * BuildGlobalWaitGraph builds a wait graph for distributed transactions + * that originate from this node, including edges from all (other) worker + * nodes. + */ +WaitGraph * +BuildGlobalWaitGraph(void) +{ + List *workerNodeList = ActivePrimaryNodeList(); + ListCell *workerNodeCell = NULL; + char *nodeUser = CitusExtensionOwnerName(); + List *connectionList = NIL; + ListCell *connectionCell = NULL; + int localNodeId = GetLocalGroupId(); + + WaitGraph *waitGraph = BuildWaitGraphForSourceNode(localNodeId); + + /* open connections in parallel */ + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + MultiConnection *connection = NULL; + int connectionFlags = 0; + + if (workerNode->groupId == localNodeId) + { + /* we already have local wait edges */ + continue; + } + + connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + nodeUser, NULL); + + connectionList = lappend(connectionList, connection); + } + + FinishConnectionListEstablishment(connectionList); + + /* send commands in parallel */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + int querySent = false; + char *command = NULL; + const char *params[1]; + + params[0] = psprintf("%d", GetLocalGroupId()); + command = "SELECT * FROM dump_local_wait_edges($1)"; + + querySent = SendRemoteCommandParams(connection, command, 1, + NULL, params); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + } + + /* receive dump_local_wait_edges results */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + PGresult *result = NULL; + bool raiseInterrupts = true; + int64 rowIndex = 0; + int64 rowCount = 0; + int64 colCount = 0; + + result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + + rowCount = PQntuples(result); + colCount = PQnfields(result); + + if (colCount != 9) + { + ereport(ERROR, (errmsg("unexpected number of columns from " + "dump_local_wait_edges"))); + } + + for (rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + AddWaitEdgeFromResult(waitGraph, result, rowIndex); + } + + PQclear(result); + ForgetResults(connection); + } + + return waitGraph; +} + + +/* + * AddWaitEdgeFromResult adds an edge to the wait graph that is read from + * a PGresult. + */ +static void +AddWaitEdgeFromResult(WaitGraph *waitGraph, PGresult *result, int rowIndex) +{ + WaitEdge *waitEdge = AllocWaitEdge(waitGraph); + + waitEdge->waitingPid = ParseIntField(result, rowIndex, 0); + waitEdge->waitingNodeId = ParseIntField(result, rowIndex, 1); + waitEdge->waitingTransactionNum = ParseIntField(result, rowIndex, 2); + waitEdge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3); + waitEdge->blockingPid = ParseIntField(result, rowIndex, 4); + waitEdge->blockingNodeId = ParseIntField(result, rowIndex, 5); + waitEdge->blockingTransactionNum = ParseIntField(result, rowIndex, 6); + waitEdge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7); + waitEdge->isBlockingXactWaiting = ParseBoolField(result, rowIndex, 8); +} + + +/* + * ParseIntField parses a int64 from a remote result or returns 0 if the + * result is NULL. + */ +static int64 +ParseIntField(PGresult *result, int rowIndex, int colIndex) +{ + char *resultString = NULL; + + if (PQgetisnull(result, rowIndex, colIndex)) + { + return 0; + } + + resultString = PQgetvalue(result, rowIndex, colIndex); + + return pg_strtouint64(resultString, NULL, 10); +} + + +/* + * ParseBoolField parses a bool from a remote result or returns false if the + * result is NULL. + */ +static bool +ParseBoolField(PGresult *result, int rowIndex, int colIndex) +{ + char *resultString = NULL; + + if (PQgetisnull(result, rowIndex, colIndex)) + { + return false; + } + + resultString = PQgetvalue(result, rowIndex, colIndex); + if (strlen(resultString) != 1) + { + return false; + } + + return resultString[0] == 't'; +} + + +/* + * ParseTimestampTzField parses a timestamptz from a remote result or returns + * 0 if the result is NULL. + */ +static TimestampTz +ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex) +{ + char *resultString = NULL; + Datum resultStringDatum = 0; + Datum timestampDatum = 0; + + if (PQgetisnull(result, rowIndex, colIndex)) + { + return 0; + } + + resultString = PQgetvalue(result, rowIndex, colIndex); + resultStringDatum = CStringGetDatum(resultString); + timestampDatum = DirectFunctionCall3(timestamptz_in, resultStringDatum, 0, -1); + + return DatumGetTimestampTz(timestampDatum); +} /* @@ -81,22 +291,22 @@ dump_local_wait_edges(PG_FUNCTION_ARGS) static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) { - ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - TupleDesc tupdesc = NULL; - Tuplestorestate *tupstore = NULL; + ReturnSetInfo *resultInfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupleDesc = NULL; + Tuplestorestate *tupleStore = NULL; MemoryContext per_query_ctx = NULL; - MemoryContext oldcontext = NULL; + MemoryContext oldContext = NULL; size_t curEdgeNum = 0; /* check to see if caller supports us returning a tuplestore */ - if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + if (resultInfo == NULL || !IsA(resultInfo, ReturnSetInfo)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg( "set-valued function called in context that cannot accept a set"))); } - if (!(rsinfo->allowedModes & SFRM_Materialize)) + if (!(resultInfo->allowedModes & SFRM_Materialize)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -105,19 +315,19 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) } /* Build a tuple descriptor for our result type */ - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + if (get_call_result_type(fcinfo, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE) { elog(ERROR, "return type must be a row type"); } - per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; - oldcontext = MemoryContextSwitchTo(per_query_ctx); + per_query_ctx = resultInfo->econtext->ecxt_per_query_memory; + oldContext = MemoryContextSwitchTo(per_query_ctx); - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->returnMode = SFRM_Materialize; - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); + tupleStore = tuplestore_begin_heap(true, false, work_mem); + resultInfo->returnMode = SFRM_Materialize; + resultInfo->setResult = tupleStore; + resultInfo->setDesc = tupleDesc; + MemoryContextSwitchTo(oldContext); /* * Columns: @@ -167,11 +377,11 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) } values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting); - tuplestore_putvalues(tupstore, tupdesc, values, nulls); + tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); } /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); + tuplestore_donestoring(tupleStore); } diff --git a/src/include/distributed/lock_graph.h b/src/include/distributed/lock_graph.h index 548b12ecb..5084e38bc 100644 --- a/src/include/distributed/lock_graph.h +++ b/src/include/distributed/lock_graph.h @@ -54,4 +54,7 @@ typedef struct WaitGraph } WaitGraph; +extern WaitGraph * BuildGlobalWaitGraph(void); + + #endif /* LOCK_GRAPH_H */ diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out new file mode 100644 index 000000000..585a9fd9e --- /dev/null +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -0,0 +1,85 @@ +Parsed test spec with 4 sessions + +starting permutation: s1-begin s2-begin s1-update s2-update detector-dump-wait-edges s1-abort s2-abort +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-update: + UPDATE distributed_table SET y = 1 WHERE x = 1; + +step s2-update: + UPDATE distributed_table SET y = 2 WHERE x = 1; + +step detector-dump-wait-edges: + SELECT + waiting_transaction_num, + blocking_transaction_num, + blocking_transaction_waiting + FROM + dump_global_wait_edges() + ORDER BY + waiting_transaction_num, + blocking_transaction_num, + blocking_transaction_waiting; + +waiting_transaction_numblocking_transaction_numblocking_transaction_waiting + +192 191 f +step s1-abort: + ABORT; + +step s2-update: <... completed> +step s2-abort: + ABORT; + + +starting permutation: s1-begin s2-begin s3-begin s1-update s2-update s3-update detector-dump-wait-edges s1-abort s2-abort s3-abort +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s3-begin: + BEGIN; + +step s1-update: + UPDATE distributed_table SET y = 1 WHERE x = 1; + +step s2-update: + UPDATE distributed_table SET y = 2 WHERE x = 1; + +step s3-update: + UPDATE distributed_table SET y = 3 WHERE x = 1; + +step detector-dump-wait-edges: + SELECT + waiting_transaction_num, + blocking_transaction_num, + blocking_transaction_waiting + FROM + dump_global_wait_edges() + ORDER BY + waiting_transaction_num, + blocking_transaction_num, + blocking_transaction_waiting; + +waiting_transaction_numblocking_transaction_numblocking_transaction_waiting + +196 195 f +197 195 f +197 196 t +step s1-abort: + ABORT; + +step s2-update: <... completed> +step s2-abort: + ABORT; + +step s3-update: <... completed> +step s3-abort: + ABORT; + diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 84e796045..32aef1141 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -10,4 +10,4 @@ test: isolation_concurrent_dml isolation_data_migration test: isolation_drop_shards isolation_copy_placement_vs_modification test: isolation_insert_vs_vacuum isolation_transaction_recovery test: isolation_distributed_transaction_id -test: isolation_dump_local_wait_edges +test: isolation_dump_local_wait_edges isolation_dump_global_wait_edges diff --git a/src/test/regress/specs/isolation_dump_global_wait_edges.spec b/src/test/regress/specs/isolation_dump_global_wait_edges.spec new file mode 100644 index 000000000..869ccf569 --- /dev/null +++ b/src/test/regress/specs/isolation_dump_global_wait_edges.spec @@ -0,0 +1,85 @@ +setup +{ + CREATE TABLE distributed_table (x int primary key, y int); + SELECT create_distributed_table('distributed_table', 'x'); + INSERT INTO distributed_table VALUES (1,0); +} + +teardown +{ + DROP TABLE distributed_table; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-update" +{ + UPDATE distributed_table SET y = 1 WHERE x = 1; +} + +step "s1-abort" +{ + ABORT; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-update" +{ + UPDATE distributed_table SET y = 2 WHERE x = 1; +} + +step "s2-abort" +{ + ABORT; +} + +session "s3" + +step "s3-begin" +{ + BEGIN; +} + +step "s3-update" +{ + UPDATE distributed_table SET y = 3 WHERE x = 1; +} + +step "s3-abort" +{ + ABORT; +} + + +session "detector" + +step "detector-dump-wait-edges" +{ + SELECT + waiting_transaction_num, + blocking_transaction_num, + blocking_transaction_waiting + FROM + dump_global_wait_edges() + ORDER BY + waiting_transaction_num, + blocking_transaction_num, + blocking_transaction_waiting; +} + +# Distributed transaction blocked by another distributed transaction +permutation "s1-begin" "s2-begin" "s1-update" "s2-update" "detector-dump-wait-edges" "s1-abort" "s2-abort" + +# Distributed transaction blocked by another distributed transaction blocked by another distributed transaction +permutation "s1-begin" "s2-begin" "s3-begin" "s1-update" "s2-update" "s3-update" "detector-dump-wait-edges" "s1-abort" "s2-abort" "s3-abort"