diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index f7f094846..038332bbd 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \ 6.2-1 6.2-2 6.2-3 6.2-4 \ - 7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 + 7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -149,6 +149,8 @@ $(EXTENSION)--7.0-4.sql: $(EXTENSION)--7.0-3.sql $(EXTENSION)--7.0-3--7.0-4.sql cat $^ > $@ $(EXTENSION)--7.0-5.sql: $(EXTENSION)--7.0-4.sql $(EXTENSION)--7.0-4--7.0-5.sql cat $^ > $@ +$(EXTENSION)--7.0-6.sql: $(EXTENSION)--7.0-5.sql $(EXTENSION)--7.0-5--7.0-6.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--7.0-5--7.0-6.sql b/src/backend/distributed/citus--7.0-5--7.0-6.sql new file mode 100644 index 000000000..e30273161 --- /dev/null +++ b/src/backend/distributed/citus--7.0-5--7.0-6.sql @@ -0,0 +1,18 @@ +/* citus--7.0-5--7.0-6 */ + +CREATE FUNCTION pg_catalog.dump_local_wait_edges( + IN source_node_id int4, + OUT waiting_pid int4, + OUT waiting_node_id int4, + OUT waiting_transaction_num int8, + OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int4, + OUT blocking_node_id int4, + OUT blocking_transaction_num int8, + OUT blocking_transaction_stamp timestamptz, + OUT blocking_transaction_waiting bool) +RETURNS SETOF RECORD +LANGUAGE 'c' STRICT +AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$; +COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(int) +IS 'returns a local list of blocked transactions originating from source_node_id'; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 661f7164b..f959059f3 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '7.0-5' +default_version = '7.0-6' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 67455c0f9..e3c46149a 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -501,3 +501,29 @@ CurrentDistributedTransactionNumber(void) return MyBackendData->transactionId.transactionNumber; } + + +/* + * GetBackendDataForProc writes the backend data for the given process to + * result. If the process is part of a lock group (parallel query) it + * returns the leader data instead. + */ +void +GetBackendDataForProc(PGPROC *proc, BackendData *result) +{ + BackendData *backendData = NULL; + int pgprocno = proc->pgprocno; + + if (proc->lockGroupLeader != NULL) + { + pgprocno = proc->lockGroupLeader->pgprocno; + } + + backendData = &backendManagementShmemData->backends[pgprocno]; + + SpinLockAcquire(&backendData->mutex); + + memcpy(result, backendData, sizeof(BackendData)); + + SpinLockRelease(&backendData->mutex); +} diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c new file mode 100644 index 000000000..5416bba6c --- /dev/null +++ b/src/backend/distributed/transaction/lock_graph.c @@ -0,0 +1,537 @@ +/*------------------------------------------------------------------------- + * + * lock_graph.c + * + * Functions for obtaining local and global lock graphs in which each + * node is a distributed transaction, and an edge represent a waiting-for + * relationship. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "libpq-fe.h" +#include "miscadmin.h" + +#include "access/hash.h" +#include "distributed/backend_data.h" +#include "distributed/hash_helpers.h" +#include "distributed/lock_graph.h" +#include "distributed/metadata_cache.h" +#include "storage/proc.h" +#include "utils/builtins.h" +#include "utils/hsearch.h" +#include "utils/timestamp.h" + + +/* + * PROCStack is a stack of PGPROC pointers used to perform a depth-first search + * through the lock graph. + */ +typedef struct PROCStack +{ + int procCount; + PGPROC **procs; +} PROCStack; + + +static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); +static WaitGraph * BuildWaitGraphForSourceNode(int sourceNodeId); +static void LockLockData(void); +static void UnlockLockData(void); +static void AddEdgesForLockWaits(WaitGraph *waitGraph, PGPROC *waitingProc, + PROCStack *remaining); +static void AddEdgesForWaitQueue(WaitGraph *waitGraph, PGPROC *waitingProc, + PROCStack *remaining); +static void AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc, + PROCStack *remaining); +static WaitEdge * AllocWaitEdge(WaitGraph *waitGraph); +static bool IsProcessWaitingForLock(PGPROC *proc); +static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc); +static bool IsConflictingLockMask(int holdMask, int conflictMask); +static bool IsInDistributedTransaction(BackendData *backendData); + + +PG_FUNCTION_INFO_V1(dump_local_wait_edges); + + +/* + * dump_local_wait_edges returns wait edges for distributed transactions + * running on the node on which it is called, which originate from the source node. + */ +Datum +dump_local_wait_edges(PG_FUNCTION_ARGS) +{ + int32 sourceNodeId = PG_GETARG_INT32(0); + + WaitGraph *waitGraph = BuildWaitGraphForSourceNode(sourceNodeId); + ReturnWaitGraph(waitGraph, fcinfo); + + return (Datum) 0; +} + + +/* + * ReturnWaitGraph returns a wait graph for a set returning function. + */ +static void +ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc = NULL; + Tuplestorestate *tupstore = NULL; + MemoryContext per_query_ctx = NULL; + MemoryContext oldcontext = NULL; + size_t curEdgeNum = 0; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "set-valued function called in context that cannot accept a set"))); + } + if (!(rsinfo->allowedModes & SFRM_Materialize)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + } + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + { + elog(ERROR, "return type must be a row type"); + } + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); + + /* + * Columns: + * 00: waiting_pid + * 01: waiting_node_id + * 02: waiting_transaction_num + * 03: waiting_transaction_stamp + * 04: blocking_pid + * 05: blocking__node_id + * 06: blocking_transaction_num + * 07: blocking_transaction_stamp + * 08: blocking_transaction_waiting + */ + for (curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++) + { + Datum values[9]; + bool nulls[9]; + WaitEdge *curEdge = &waitGraph->edges[curEdgeNum]; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(curEdge->waitingPid); + values[1] = Int32GetDatum(curEdge->waitingNodeId); + if (curEdge->waitingTransactionNum != 0) + { + values[2] = Int64GetDatum(curEdge->waitingTransactionNum); + values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp); + } + else + { + nulls[2] = true; + nulls[3] = true; + } + + values[4] = Int32GetDatum(curEdge->blockingPid); + values[5] = Int32GetDatum(curEdge->blockingNodeId); + if (curEdge->blockingTransactionNum != 0) + { + values[6] = Int64GetDatum(curEdge->blockingTransactionNum); + values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp); + } + else + { + nulls[6] = true; + nulls[7] = true; + } + values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); +} + + +/* + * BuildWaitGraphForSourceNode builds a wait graph for distributed transactions + * that originate from the given source node. + */ +static WaitGraph * +BuildWaitGraphForSourceNode(int sourceNodeId) +{ + WaitGraph *waitGraph = NULL; + int curBackend = 0; + bool visitedProcs[MaxBackends]; + PROCStack remaining; + + memset(visitedProcs, 0, MaxBackends); + + /* + * Try hard to avoid allocations while holding lock. Thus we pre-allocate + * space for locks in large batches - for common scenarios this should be + * more than enough space to build the list of wait edges without a single + * allocation. + */ + waitGraph = (WaitGraph *) palloc0(sizeof(WaitGraph)); + waitGraph->localNodeId = GetLocalGroupId(); + waitGraph->allocatedSize = MaxBackends * 3; + waitGraph->edgeCount = 0; + waitGraph->edges = (WaitEdge *) palloc(waitGraph->allocatedSize * sizeof(WaitEdge)); + + remaining.procs = (PGPROC **) palloc(sizeof(PGPROC *) * MaxBackends); + remaining.procCount = 0; + + LockLockData(); + + /* + * Build lock-graph. We do so by first finding all procs which we are + * interested in (originating on our source system, and blocked). Once + * those are collected, do depth first search over all procs blocking + * those. To avoid redundantly visiting procs, keep track of which procs + * already have been visited in a pgproc-indexed visitedProcs[] array. + */ + + /* build list of starting procs */ + for (curBackend = 0; curBackend < MaxBackends; curBackend++) + { + PGPROC *currentProc = &ProcGlobal->allProcs[curBackend]; + BackendData currentBackendData; + + /* skip if the PGPROC slot is unused */ + if (currentProc->pid == 0) + { + continue; + } + + GetBackendDataForProc(currentProc, ¤tBackendData); + + /* + * Only start searching from distributed transactions originating on the source + * node. Other deadlocks may exist, but the source node can only resolve those + * that involve its own transactions. + */ + if (sourceNodeId != currentBackendData.transactionId.initiatorNodeIdentifier || + !IsInDistributedTransaction(¤tBackendData)) + { + continue; + } + + /* skip if the process is not blocked */ + if (!IsProcessWaitingForLock(currentProc)) + { + continue; + } + + remaining.procs[remaining.procCount++] = currentProc; + } + + while (remaining.procCount > 0) + { + PGPROC *waitingProc = remaining.procs[--remaining.procCount]; + + /* + * We might find a process again if multiple distributed transactions are + * waiting for it, but we add all edges on the first visit so we don't need + * to visit it again. This also avoids getting into an infinite loop in + * case of a local deadlock. + */ + if (visitedProcs[waitingProc->pgprocno]) + { + continue; + } + + visitedProcs[waitingProc->pgprocno] = true; + + /* only blocked processes result in wait edges */ + if (!IsProcessWaitingForLock(waitingProc)) + { + continue; + } + + /* + * Record an edge for everyone already holding the lock in a + * conflicting manner ("hard edges" in postgres parlance). + */ + AddEdgesForLockWaits(waitGraph, waitingProc, &remaining); + + /* + * Record an edge for everyone in front of us in the wait-queue + * for the lock ("soft edges" in postgres parlance). + */ + AddEdgesForWaitQueue(waitGraph, waitingProc, &remaining); + } + + UnlockLockData(); + + return waitGraph; +} + + +/* + * LockLockData takes locks the shared lock data structure, which prevents + * concurrent lock acquisitions/releases. + */ +static void +LockLockData(void) +{ + int partitionNum = 0; + + for (partitionNum = 0; partitionNum < NUM_LOCK_PARTITIONS; partitionNum++) + { + LWLockAcquire(LockHashPartitionLockByIndex(partitionNum), LW_SHARED); + } +} + + +/* + * UnlockLockData unlocks the locks on the shared lock data structure in reverse + * order since LWLockRelease searches the given lock from the end of the + * held_lwlocks array. + */ +static void +UnlockLockData(void) +{ + int partitionNum = 0; + + for (partitionNum = NUM_LOCK_PARTITIONS - 1; partitionNum >= 0; partitionNum--) + { + LWLockRelease(LockHashPartitionLockByIndex(partitionNum)); + } +} + + +/* + * AddEdgesForLockWaits adds an edge to the wait graph for every granted lock + * that waitingProc is waiting for. + * + * This function iterates over the procLocks data structure in shared memory, + * which also contains entries for locks which have not been granted yet, but + * it does not reflect the order of the wait queue. We therefore handle the + * wait queue separately. + */ +static void +AddEdgesForLockWaits(WaitGraph *waitGraph, PGPROC *waitingProc, PROCStack *remaining) +{ + /* the lock for which this process is waiting */ + LOCK *waitLock = waitingProc->waitLock; + + /* determine the conflict mask for the lock level used by the process */ + LockMethod lockMethodTable = GetLocksMethodTable(waitLock); + int conflictMask = lockMethodTable->conflictTab[waitingProc->waitLockMode]; + + /* iterate through the queue of processes holding the lock */ + SHM_QUEUE *procLocks = &waitLock->procLocks; + PROCLOCK *procLock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, lockLink)); + + while (procLock != NULL) + { + PGPROC *currentProc = procLock->tag.myProc; + + /* skip processes from the same lock group and ones that don't conflict */ + if (!IsSameLockGroup(waitingProc, currentProc) && + IsConflictingLockMask(procLock->holdMask, conflictMask)) + { + AddWaitEdge(waitGraph, waitingProc, currentProc, remaining); + } + + procLock = (PROCLOCK *) SHMQueueNext(procLocks, &procLock->lockLink, + offsetof(PROCLOCK, lockLink)); + } +} + + +/* + * AddEdgesForWaitQueue adds an edge to the wait graph for processes in front of + * waitingProc in the wait queue that are trying to acquire a conflicting lock. + */ +static void +AddEdgesForWaitQueue(WaitGraph *waitGraph, PGPROC *waitingProc, PROCStack *remaining) +{ + /* the lock for which this process is waiting */ + LOCK *waitLock = waitingProc->waitLock; + + /* determine the conflict mask for the lock level used by the process */ + LockMethod lockMethodTable = GetLocksMethodTable(waitLock); + int conflictMask = lockMethodTable->conflictTab[waitingProc->waitLockMode]; + + /* iterate through the wait queue */ + PROC_QUEUE *waitQueue = &(waitLock->waitProcs); + int queueSize = waitQueue->size; + PGPROC *currentProc = (PGPROC *) waitQueue->links.next; + + /* + * Iterate through the queue from the start until we encounter waitingProc, + * since we only care about processes in front of waitingProc in the queue. + */ + while (queueSize-- > 0 && currentProc != waitingProc) + { + int awaitMask = LOCKBIT_ON(currentProc->waitLockMode); + + /* skip processes from the same lock group and ones that don't conflict */ + if (!IsSameLockGroup(waitingProc, currentProc) && + IsConflictingLockMask(awaitMask, conflictMask)) + { + AddWaitEdge(waitGraph, waitingProc, currentProc, remaining); + } + + currentProc = (PGPROC *) currentProc->links.next; + } +} + + +/* + * AddWaitEdge adds a new wait edge to a wait graph. The nodes in the graph are + * transactions and an edge indicates the "waiting" process is blocked on a lock + * held by the "blocking" process. + * + * If the blocking process is itself waiting then it is added to the remaining + * stack. + */ +static void +AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc, + PROCStack *remaining) +{ + WaitEdge *curEdge = AllocWaitEdge(waitGraph); + BackendData waitingBackendData; + BackendData blockingBackendData; + + GetBackendDataForProc(waitingProc, &waitingBackendData); + GetBackendDataForProc(blockingProc, &blockingBackendData); + + curEdge->isBlockingXactWaiting = IsProcessWaitingForLock(blockingProc); + if (curEdge->isBlockingXactWaiting) + { + remaining->procs[remaining->procCount++] = blockingProc; + } + + curEdge->waitingPid = waitingProc->pid; + + if (IsInDistributedTransaction(&waitingBackendData)) + { + DistributedTransactionId *waitingTransactionId = + &waitingBackendData.transactionId; + + curEdge->waitingNodeId = waitingTransactionId->initiatorNodeIdentifier; + curEdge->waitingTransactionNum = waitingTransactionId->transactionNumber; + curEdge->waitingTransactionStamp = waitingTransactionId->timestamp; + } + else + { + curEdge->waitingNodeId = waitGraph->localNodeId; + curEdge->waitingTransactionNum = 0; + curEdge->waitingTransactionStamp = 0; + } + + curEdge->blockingPid = blockingProc->pid; + + if (IsInDistributedTransaction(&blockingBackendData)) + { + DistributedTransactionId *blockingTransactionId = + &blockingBackendData.transactionId; + + curEdge->blockingNodeId = blockingTransactionId->initiatorNodeIdentifier; + curEdge->blockingTransactionNum = blockingTransactionId->transactionNumber; + curEdge->blockingTransactionStamp = blockingTransactionId->timestamp; + } + else + { + curEdge->blockingNodeId = waitGraph->localNodeId; + curEdge->blockingTransactionNum = 0; + curEdge->blockingTransactionStamp = 0; + } +} + + +/* + * AllocWaitEdge allocates a wait edge as part of the given wait graph. + * If the wait graph has insufficient space its size is doubled using + * repalloc. + */ +static WaitEdge * +AllocWaitEdge(WaitGraph *waitGraph) +{ + /* ensure space for new edge */ + if (waitGraph->allocatedSize == waitGraph->edgeCount) + { + waitGraph->allocatedSize *= 2; + waitGraph->edges = (WaitEdge *) + repalloc(waitGraph->edges, sizeof(WaitEdge) * + waitGraph->allocatedSize); + } + + return &waitGraph->edges[waitGraph->edgeCount++]; +} + + +/* + * IsProcessWaitingForLock returns whether a given process is waiting for a lock. + */ +static bool +IsProcessWaitingForLock(PGPROC *proc) +{ + return proc->waitStatus == STATUS_WAITING; +} + + +/* + * IsSameLockGroup returns whether two processes are part of the same lock group, + * meaning they are either the same process, or have the same lock group leader. + */ +static bool +IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc) +{ + return leftProc == rightProc || + (leftProc->lockGroupLeader != NULL && + leftProc->lockGroupLeader == rightProc->lockGroupLeader); +} + + +/* + * IsConflictingLockMask returns whether the given conflict mask conflicts with the + * holdMask. + * + * holdMask is a bitmask with the i-th bit turned on if a lock mode i is held. + * + * conflictMask is a bitmask with the j-th bit turned on if it conflicts with + * lock mode i. + */ +static bool +IsConflictingLockMask(int holdMask, int conflictMask) +{ + return (holdMask & conflictMask) != 0; +} + + +/* + * IsInDistributedTransaction returns whether the given backend is in a + * distributed transaction. + */ +static bool +IsInDistributedTransaction(BackendData *backendData) +{ + return backendData->transactionId.transactionNumber != 0; +} diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index 6d0e52adc..f5020c16d 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -16,6 +16,8 @@ #include "datatype/timestamp.h" #include "distributed/transaction_identifier.h" #include "nodes/pg_list.h" +#include "storage/lwlock.h" +#include "storage/proc.h" #include "storage/s_lock.h" @@ -35,5 +37,6 @@ extern void InitializeBackendManagement(void); extern void InitializeBackendData(void); extern void UnSetDistributedTransactionId(void); extern void AssignDistributedTransactionId(void); +extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); #endif /* BACKEND_DATA_H */ diff --git a/src/include/distributed/lock_graph.h b/src/include/distributed/lock_graph.h new file mode 100644 index 000000000..548b12ecb --- /dev/null +++ b/src/include/distributed/lock_graph.h @@ -0,0 +1,57 @@ +/* + * lock_graph.h + * + * Data structures and functions for gathering lock graphs between + * distributed transactions. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef LOCK_GRAPH_H +#define LOCK_GRAPH_H + + +#include "postgres.h" +#include "datatype/timestamp.h" + + +/* + * Describes an edge in a waiting-for graph of locks. This isn't used for + * deadlock-checking directly, but to gather the information necessary to + * do so. + * + * The datatypes here are a bit looser than strictly necessary, because + * they're transported as the return type from an SQL function. + */ +typedef struct WaitEdge +{ + int waitingPid; + int waitingNodeId; + int64 waitingTransactionNum; + TimestampTz waitingTransactionStamp; + + int blockingPid; + int blockingNodeId; + int64 blockingTransactionNum; + TimestampTz blockingTransactionStamp; + + /* blocking transaction is also waiting on a lock */ + bool isBlockingXactWaiting; +} WaitEdge; + + +/* + * WaitGraph represent a graph of wait edges as an adjacency list. + */ +typedef struct WaitGraph +{ + int localNodeId; + int allocatedSize; + int edgeCount; + WaitEdge *edges; +} WaitGraph; + + +#endif /* LOCK_GRAPH_H */ diff --git a/src/test/regress/expected/isolation_dump_local_wait_edges.out b/src/test/regress/expected/isolation_dump_local_wait_edges.out new file mode 100644 index 000000000..d0adacd3f --- /dev/null +++ b/src/test/regress/expected/isolation_dump_local_wait_edges.out @@ -0,0 +1,134 @@ +Parsed test spec with 4 sessions + +starting permutation: dist11-begin dist13-begin dist11-update dist13-update detector-dump-wait-edges dist11-abort dist13-abort +step dist11-begin: + BEGIN; + SELECT assign_distributed_transaction_id(11, 1, '2017-01-01 00:00:00+0'); + +assign_distributed_transaction_id + + +step dist13-begin: + BEGIN; + SELECT assign_distributed_transaction_id(13, 1, '2017-01-01 00:00:00+0'); + +assign_distributed_transaction_id + + +step dist11-update: + UPDATE local_table SET y = 1 WHERE x = 1; + +step dist13-update: + UPDATE local_table SET y = 3 WHERE x = 1; + +step detector-dump-wait-edges: + SELECT + waiting_node_id, + waiting_transaction_num, + blocking_node_id, + blocking_transaction_num, + blocking_transaction_waiting + FROM + dump_local_wait_edges(13); + +waiting_node_idwaiting_transaction_numblocking_node_idblocking_transaction_numblocking_transaction_waiting + +13 1 11 1 f +step dist11-abort: + ABORT; + +step dist13-update: <... completed> +step dist13-abort: + ABORT; + + +starting permutation: local-begin dist13-begin local-update dist13-update detector-dump-wait-edges local-abort dist13-abort +step local-begin: + BEGIN; + +step dist13-begin: + BEGIN; + SELECT assign_distributed_transaction_id(13, 1, '2017-01-01 00:00:00+0'); + +assign_distributed_transaction_id + + +step local-update: + UPDATE local_table SET y = 2 WHERE x = 1; + +step dist13-update: + UPDATE local_table SET y = 3 WHERE x = 1; + +step detector-dump-wait-edges: + SELECT + waiting_node_id, + waiting_transaction_num, + blocking_node_id, + blocking_transaction_num, + blocking_transaction_waiting + FROM + dump_local_wait_edges(13); + +waiting_node_idwaiting_transaction_numblocking_node_idblocking_transaction_numblocking_transaction_waiting + +13 1 0 f +step local-abort: + ABORT; + +step dist13-update: <... completed> +step dist13-abort: + ABORT; + + +starting permutation: dist11-begin local-begin dist13-begin dist11-update local-update dist13-update detector-dump-wait-edges dist11-abort local-abort dist13-abort +step dist11-begin: + BEGIN; + SELECT assign_distributed_transaction_id(11, 1, '2017-01-01 00:00:00+0'); + +assign_distributed_transaction_id + + +step local-begin: + BEGIN; + +step dist13-begin: + BEGIN; + SELECT assign_distributed_transaction_id(13, 1, '2017-01-01 00:00:00+0'); + +assign_distributed_transaction_id + + +step dist11-update: + UPDATE local_table SET y = 1 WHERE x = 1; + +step local-update: + UPDATE local_table SET y = 2 WHERE x = 1; + +step dist13-update: + UPDATE local_table SET y = 3 WHERE x = 1; + +step detector-dump-wait-edges: + SELECT + waiting_node_id, + waiting_transaction_num, + blocking_node_id, + blocking_transaction_num, + blocking_transaction_waiting + FROM + dump_local_wait_edges(13); + +waiting_node_idwaiting_transaction_numblocking_node_idblocking_transaction_numblocking_transaction_waiting + +13 1 0 t +0 11 1 f +step dist11-abort: + ABORT; + +step local-update: <... completed> +step local-abort: + ABORT; + +step dist13-update: <... completed> +step dist13-abort: + ABORT; + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 038c3b992..062528ba9 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -114,6 +114,8 @@ ALTER EXTENSION citus UPDATE TO '7.0-1'; ALTER EXTENSION citus UPDATE TO '7.0-2'; ALTER EXTENSION citus UPDATE TO '7.0-3'; ALTER EXTENSION citus UPDATE TO '7.0-4'; +ALTER EXTENSION citus UPDATE TO '7.0-5'; +ALTER EXTENSION citus UPDATE TO '7.0-6'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 8b963518e..84e796045 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -10,3 +10,4 @@ test: isolation_concurrent_dml isolation_data_migration test: isolation_drop_shards isolation_copy_placement_vs_modification test: isolation_insert_vs_vacuum isolation_transaction_recovery test: isolation_distributed_transaction_id +test: isolation_dump_local_wait_edges diff --git a/src/test/regress/specs/isolation_dump_local_wait_edges.spec b/src/test/regress/specs/isolation_dump_local_wait_edges.spec new file mode 100644 index 000000000..06a2ad97a --- /dev/null +++ b/src/test/regress/specs/isolation_dump_local_wait_edges.spec @@ -0,0 +1,87 @@ +setup +{ + CREATE TABLE local_table (x int primary key, y int); + INSERT INTO local_table VALUES (1,0); +} + +teardown +{ + DROP TABLE local_table; +} + +session "dist11" + +step "dist11-begin" +{ + BEGIN; + SELECT assign_distributed_transaction_id(11, 1, '2017-01-01 00:00:00+0'); +} + +step "dist11-update" +{ + UPDATE local_table SET y = 1 WHERE x = 1; +} + +step "dist11-abort" +{ + ABORT; +} + +session "local" + +step "local-begin" +{ + BEGIN; +} + +step "local-update" +{ + UPDATE local_table SET y = 2 WHERE x = 1; +} + +step "local-abort" +{ + ABORT; +} + +session "dist13" + +step "dist13-begin" +{ + BEGIN; + SELECT assign_distributed_transaction_id(13, 1, '2017-01-01 00:00:00+0'); +} + +step "dist13-update" +{ + UPDATE local_table SET y = 3 WHERE x = 1; +} + +step "dist13-abort" +{ + ABORT; +} + + +session "detector" + +step "detector-dump-wait-edges" +{ + SELECT + waiting_node_id, + waiting_transaction_num, + blocking_node_id, + blocking_transaction_num, + blocking_transaction_waiting + FROM + dump_local_wait_edges(13); +} + +# Distributed transaction blocked by another distributed transaction +permutation "dist11-begin" "dist13-begin" "dist11-update" "dist13-update" "detector-dump-wait-edges" "dist11-abort" "dist13-abort" + +# Distributed transaction blocked by a regular transaction +permutation "local-begin" "dist13-begin" "local-update" "dist13-update" "detector-dump-wait-edges" "local-abort" "dist13-abort" + +# Distributed transaction blocked by a regular transaction blocked by a distributed transaction +permutation "dist11-begin" "local-begin" "dist13-begin" "dist11-update" "local-update" "dist13-update" "detector-dump-wait-edges" "dist11-abort" "local-abort" "dist13-abort" diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 5895adb0d..5d8d124ba 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -114,6 +114,8 @@ ALTER EXTENSION citus UPDATE TO '7.0-1'; ALTER EXTENSION citus UPDATE TO '7.0-2'; ALTER EXTENSION citus UPDATE TO '7.0-3'; ALTER EXTENSION citus UPDATE TO '7.0-4'; +ALTER EXTENSION citus UPDATE TO '7.0-5'; +ALTER EXTENSION citus UPDATE TO '7.0-6'; -- show running version SHOW citus.version;