Add function for dumping local wait edges

pull/1495/head
Marco Slot 2017-07-11 17:52:27 +02:00
parent b04aa9bf85
commit 81198a1d02
12 changed files with 871 additions and 2 deletions

View File

@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
6.2-1 6.2-2 6.2-3 6.2-4 \
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -149,6 +149,8 @@ $(EXTENSION)--7.0-4.sql: $(EXTENSION)--7.0-3.sql $(EXTENSION)--7.0-3--7.0-4.sql
cat $^ > $@
$(EXTENSION)--7.0-5.sql: $(EXTENSION)--7.0-4.sql $(EXTENSION)--7.0-4--7.0-5.sql
cat $^ > $@
$(EXTENSION)--7.0-6.sql: $(EXTENSION)--7.0-5.sql $(EXTENSION)--7.0-5--7.0-6.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,18 @@
/* citus--7.0-5--7.0-6 */
CREATE FUNCTION pg_catalog.dump_local_wait_edges(
IN source_node_id int4,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE 'c' STRICT
AS $$MODULE_PATHNAME$$, $$dump_local_wait_edges$$;
COMMENT ON FUNCTION pg_catalog.dump_local_wait_edges(int)
IS 'returns a local list of blocked transactions originating from source_node_id';

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '7.0-5'
default_version = '7.0-6'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -501,3 +501,29 @@ CurrentDistributedTransactionNumber(void)
return MyBackendData->transactionId.transactionNumber;
}
/*
* GetBackendDataForProc writes the backend data for the given process to
* result. If the process is part of a lock group (parallel query) it
* returns the leader data instead.
*/
void
GetBackendDataForProc(PGPROC *proc, BackendData *result)
{
BackendData *backendData = NULL;
int pgprocno = proc->pgprocno;
if (proc->lockGroupLeader != NULL)
{
pgprocno = proc->lockGroupLeader->pgprocno;
}
backendData = &backendManagementShmemData->backends[pgprocno];
SpinLockAcquire(&backendData->mutex);
memcpy(result, backendData, sizeof(BackendData));
SpinLockRelease(&backendData->mutex);
}

View File

@ -0,0 +1,537 @@
/*-------------------------------------------------------------------------
*
* lock_graph.c
*
* Functions for obtaining local and global lock graphs in which each
* node is a distributed transaction, and an edge represent a waiting-for
* relationship.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "access/hash.h"
#include "distributed/backend_data.h"
#include "distributed/hash_helpers.h"
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "storage/proc.h"
#include "utils/builtins.h"
#include "utils/hsearch.h"
#include "utils/timestamp.h"
/*
* PROCStack is a stack of PGPROC pointers used to perform a depth-first search
* through the lock graph.
*/
typedef struct PROCStack
{
int procCount;
PGPROC **procs;
} PROCStack;
static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
static WaitGraph * BuildWaitGraphForSourceNode(int sourceNodeId);
static void LockLockData(void);
static void UnlockLockData(void);
static void AddEdgesForLockWaits(WaitGraph *waitGraph, PGPROC *waitingProc,
PROCStack *remaining);
static void AddEdgesForWaitQueue(WaitGraph *waitGraph, PGPROC *waitingProc,
PROCStack *remaining);
static void AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
PROCStack *remaining);
static WaitEdge * AllocWaitEdge(WaitGraph *waitGraph);
static bool IsProcessWaitingForLock(PGPROC *proc);
static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc);
static bool IsConflictingLockMask(int holdMask, int conflictMask);
static bool IsInDistributedTransaction(BackendData *backendData);
PG_FUNCTION_INFO_V1(dump_local_wait_edges);
/*
* dump_local_wait_edges returns wait edges for distributed transactions
* running on the node on which it is called, which originate from the source node.
*/
Datum
dump_local_wait_edges(PG_FUNCTION_ARGS)
{
int32 sourceNodeId = PG_GETARG_INT32(0);
WaitGraph *waitGraph = BuildWaitGraphForSourceNode(sourceNodeId);
ReturnWaitGraph(waitGraph, fcinfo);
return (Datum) 0;
}
/*
* ReturnWaitGraph returns a wait graph for a set returning function.
*/
static void
ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc = NULL;
Tuplestorestate *tupstore = NULL;
MemoryContext per_query_ctx = NULL;
MemoryContext oldcontext = NULL;
size_t curEdgeNum = 0;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"set-valued function called in context that cannot accept a set")));
}
if (!(rsinfo->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));
}
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
/*
* Columns:
* 00: waiting_pid
* 01: waiting_node_id
* 02: waiting_transaction_num
* 03: waiting_transaction_stamp
* 04: blocking_pid
* 05: blocking__node_id
* 06: blocking_transaction_num
* 07: blocking_transaction_stamp
* 08: blocking_transaction_waiting
*/
for (curEdgeNum = 0; curEdgeNum < waitGraph->edgeCount; curEdgeNum++)
{
Datum values[9];
bool nulls[9];
WaitEdge *curEdge = &waitGraph->edges[curEdgeNum];
memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(curEdge->waitingPid);
values[1] = Int32GetDatum(curEdge->waitingNodeId);
if (curEdge->waitingTransactionNum != 0)
{
values[2] = Int64GetDatum(curEdge->waitingTransactionNum);
values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp);
}
else
{
nulls[2] = true;
nulls[3] = true;
}
values[4] = Int32GetDatum(curEdge->blockingPid);
values[5] = Int32GetDatum(curEdge->blockingNodeId);
if (curEdge->blockingTransactionNum != 0)
{
values[6] = Int64GetDatum(curEdge->blockingTransactionNum);
values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp);
}
else
{
nulls[6] = true;
nulls[7] = true;
}
values[8] = BoolGetDatum(curEdge->isBlockingXactWaiting);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);
}
/*
* BuildWaitGraphForSourceNode builds a wait graph for distributed transactions
* that originate from the given source node.
*/
static WaitGraph *
BuildWaitGraphForSourceNode(int sourceNodeId)
{
WaitGraph *waitGraph = NULL;
int curBackend = 0;
bool visitedProcs[MaxBackends];
PROCStack remaining;
memset(visitedProcs, 0, MaxBackends);
/*
* Try hard to avoid allocations while holding lock. Thus we pre-allocate
* space for locks in large batches - for common scenarios this should be
* more than enough space to build the list of wait edges without a single
* allocation.
*/
waitGraph = (WaitGraph *) palloc0(sizeof(WaitGraph));
waitGraph->localNodeId = GetLocalGroupId();
waitGraph->allocatedSize = MaxBackends * 3;
waitGraph->edgeCount = 0;
waitGraph->edges = (WaitEdge *) palloc(waitGraph->allocatedSize * sizeof(WaitEdge));
remaining.procs = (PGPROC **) palloc(sizeof(PGPROC *) * MaxBackends);
remaining.procCount = 0;
LockLockData();
/*
* Build lock-graph. We do so by first finding all procs which we are
* interested in (originating on our source system, and blocked). Once
* those are collected, do depth first search over all procs blocking
* those. To avoid redundantly visiting procs, keep track of which procs
* already have been visited in a pgproc-indexed visitedProcs[] array.
*/
/* build list of starting procs */
for (curBackend = 0; curBackend < MaxBackends; curBackend++)
{
PGPROC *currentProc = &ProcGlobal->allProcs[curBackend];
BackendData currentBackendData;
/* skip if the PGPROC slot is unused */
if (currentProc->pid == 0)
{
continue;
}
GetBackendDataForProc(currentProc, &currentBackendData);
/*
* Only start searching from distributed transactions originating on the source
* node. Other deadlocks may exist, but the source node can only resolve those
* that involve its own transactions.
*/
if (sourceNodeId != currentBackendData.transactionId.initiatorNodeIdentifier ||
!IsInDistributedTransaction(&currentBackendData))
{
continue;
}
/* skip if the process is not blocked */
if (!IsProcessWaitingForLock(currentProc))
{
continue;
}
remaining.procs[remaining.procCount++] = currentProc;
}
while (remaining.procCount > 0)
{
PGPROC *waitingProc = remaining.procs[--remaining.procCount];
/*
* We might find a process again if multiple distributed transactions are
* waiting for it, but we add all edges on the first visit so we don't need
* to visit it again. This also avoids getting into an infinite loop in
* case of a local deadlock.
*/
if (visitedProcs[waitingProc->pgprocno])
{
continue;
}
visitedProcs[waitingProc->pgprocno] = true;
/* only blocked processes result in wait edges */
if (!IsProcessWaitingForLock(waitingProc))
{
continue;
}
/*
* Record an edge for everyone already holding the lock in a
* conflicting manner ("hard edges" in postgres parlance).
*/
AddEdgesForLockWaits(waitGraph, waitingProc, &remaining);
/*
* Record an edge for everyone in front of us in the wait-queue
* for the lock ("soft edges" in postgres parlance).
*/
AddEdgesForWaitQueue(waitGraph, waitingProc, &remaining);
}
UnlockLockData();
return waitGraph;
}
/*
* LockLockData takes locks the shared lock data structure, which prevents
* concurrent lock acquisitions/releases.
*/
static void
LockLockData(void)
{
int partitionNum = 0;
for (partitionNum = 0; partitionNum < NUM_LOCK_PARTITIONS; partitionNum++)
{
LWLockAcquire(LockHashPartitionLockByIndex(partitionNum), LW_SHARED);
}
}
/*
* UnlockLockData unlocks the locks on the shared lock data structure in reverse
* order since LWLockRelease searches the given lock from the end of the
* held_lwlocks array.
*/
static void
UnlockLockData(void)
{
int partitionNum = 0;
for (partitionNum = NUM_LOCK_PARTITIONS - 1; partitionNum >= 0; partitionNum--)
{
LWLockRelease(LockHashPartitionLockByIndex(partitionNum));
}
}
/*
* AddEdgesForLockWaits adds an edge to the wait graph for every granted lock
* that waitingProc is waiting for.
*
* This function iterates over the procLocks data structure in shared memory,
* which also contains entries for locks which have not been granted yet, but
* it does not reflect the order of the wait queue. We therefore handle the
* wait queue separately.
*/
static void
AddEdgesForLockWaits(WaitGraph *waitGraph, PGPROC *waitingProc, PROCStack *remaining)
{
/* the lock for which this process is waiting */
LOCK *waitLock = waitingProc->waitLock;
/* determine the conflict mask for the lock level used by the process */
LockMethod lockMethodTable = GetLocksMethodTable(waitLock);
int conflictMask = lockMethodTable->conflictTab[waitingProc->waitLockMode];
/* iterate through the queue of processes holding the lock */
SHM_QUEUE *procLocks = &waitLock->procLocks;
PROCLOCK *procLock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
offsetof(PROCLOCK, lockLink));
while (procLock != NULL)
{
PGPROC *currentProc = procLock->tag.myProc;
/* skip processes from the same lock group and ones that don't conflict */
if (!IsSameLockGroup(waitingProc, currentProc) &&
IsConflictingLockMask(procLock->holdMask, conflictMask))
{
AddWaitEdge(waitGraph, waitingProc, currentProc, remaining);
}
procLock = (PROCLOCK *) SHMQueueNext(procLocks, &procLock->lockLink,
offsetof(PROCLOCK, lockLink));
}
}
/*
* AddEdgesForWaitQueue adds an edge to the wait graph for processes in front of
* waitingProc in the wait queue that are trying to acquire a conflicting lock.
*/
static void
AddEdgesForWaitQueue(WaitGraph *waitGraph, PGPROC *waitingProc, PROCStack *remaining)
{
/* the lock for which this process is waiting */
LOCK *waitLock = waitingProc->waitLock;
/* determine the conflict mask for the lock level used by the process */
LockMethod lockMethodTable = GetLocksMethodTable(waitLock);
int conflictMask = lockMethodTable->conflictTab[waitingProc->waitLockMode];
/* iterate through the wait queue */
PROC_QUEUE *waitQueue = &(waitLock->waitProcs);
int queueSize = waitQueue->size;
PGPROC *currentProc = (PGPROC *) waitQueue->links.next;
/*
* Iterate through the queue from the start until we encounter waitingProc,
* since we only care about processes in front of waitingProc in the queue.
*/
while (queueSize-- > 0 && currentProc != waitingProc)
{
int awaitMask = LOCKBIT_ON(currentProc->waitLockMode);
/* skip processes from the same lock group and ones that don't conflict */
if (!IsSameLockGroup(waitingProc, currentProc) &&
IsConflictingLockMask(awaitMask, conflictMask))
{
AddWaitEdge(waitGraph, waitingProc, currentProc, remaining);
}
currentProc = (PGPROC *) currentProc->links.next;
}
}
/*
* AddWaitEdge adds a new wait edge to a wait graph. The nodes in the graph are
* transactions and an edge indicates the "waiting" process is blocked on a lock
* held by the "blocking" process.
*
* If the blocking process is itself waiting then it is added to the remaining
* stack.
*/
static void
AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
PROCStack *remaining)
{
WaitEdge *curEdge = AllocWaitEdge(waitGraph);
BackendData waitingBackendData;
BackendData blockingBackendData;
GetBackendDataForProc(waitingProc, &waitingBackendData);
GetBackendDataForProc(blockingProc, &blockingBackendData);
curEdge->isBlockingXactWaiting = IsProcessWaitingForLock(blockingProc);
if (curEdge->isBlockingXactWaiting)
{
remaining->procs[remaining->procCount++] = blockingProc;
}
curEdge->waitingPid = waitingProc->pid;
if (IsInDistributedTransaction(&waitingBackendData))
{
DistributedTransactionId *waitingTransactionId =
&waitingBackendData.transactionId;
curEdge->waitingNodeId = waitingTransactionId->initiatorNodeIdentifier;
curEdge->waitingTransactionNum = waitingTransactionId->transactionNumber;
curEdge->waitingTransactionStamp = waitingTransactionId->timestamp;
}
else
{
curEdge->waitingNodeId = waitGraph->localNodeId;
curEdge->waitingTransactionNum = 0;
curEdge->waitingTransactionStamp = 0;
}
curEdge->blockingPid = blockingProc->pid;
if (IsInDistributedTransaction(&blockingBackendData))
{
DistributedTransactionId *blockingTransactionId =
&blockingBackendData.transactionId;
curEdge->blockingNodeId = blockingTransactionId->initiatorNodeIdentifier;
curEdge->blockingTransactionNum = blockingTransactionId->transactionNumber;
curEdge->blockingTransactionStamp = blockingTransactionId->timestamp;
}
else
{
curEdge->blockingNodeId = waitGraph->localNodeId;
curEdge->blockingTransactionNum = 0;
curEdge->blockingTransactionStamp = 0;
}
}
/*
* AllocWaitEdge allocates a wait edge as part of the given wait graph.
* If the wait graph has insufficient space its size is doubled using
* repalloc.
*/
static WaitEdge *
AllocWaitEdge(WaitGraph *waitGraph)
{
/* ensure space for new edge */
if (waitGraph->allocatedSize == waitGraph->edgeCount)
{
waitGraph->allocatedSize *= 2;
waitGraph->edges = (WaitEdge *)
repalloc(waitGraph->edges, sizeof(WaitEdge) *
waitGraph->allocatedSize);
}
return &waitGraph->edges[waitGraph->edgeCount++];
}
/*
* IsProcessWaitingForLock returns whether a given process is waiting for a lock.
*/
static bool
IsProcessWaitingForLock(PGPROC *proc)
{
return proc->waitStatus == STATUS_WAITING;
}
/*
* IsSameLockGroup returns whether two processes are part of the same lock group,
* meaning they are either the same process, or have the same lock group leader.
*/
static bool
IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc)
{
return leftProc == rightProc ||
(leftProc->lockGroupLeader != NULL &&
leftProc->lockGroupLeader == rightProc->lockGroupLeader);
}
/*
* IsConflictingLockMask returns whether the given conflict mask conflicts with the
* holdMask.
*
* holdMask is a bitmask with the i-th bit turned on if a lock mode i is held.
*
* conflictMask is a bitmask with the j-th bit turned on if it conflicts with
* lock mode i.
*/
static bool
IsConflictingLockMask(int holdMask, int conflictMask)
{
return (holdMask & conflictMask) != 0;
}
/*
* IsInDistributedTransaction returns whether the given backend is in a
* distributed transaction.
*/
static bool
IsInDistributedTransaction(BackendData *backendData)
{
return backendData->transactionId.transactionNumber != 0;
}

View File

@ -16,6 +16,8 @@
#include "datatype/timestamp.h"
#include "distributed/transaction_identifier.h"
#include "nodes/pg_list.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/s_lock.h"
@ -35,5 +37,6 @@ extern void InitializeBackendManagement(void);
extern void InitializeBackendData(void);
extern void UnSetDistributedTransactionId(void);
extern void AssignDistributedTransactionId(void);
extern void GetBackendDataForProc(PGPROC *proc, BackendData *result);
#endif /* BACKEND_DATA_H */

View File

@ -0,0 +1,57 @@
/*
* lock_graph.h
*
* Data structures and functions for gathering lock graphs between
* distributed transactions.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef LOCK_GRAPH_H
#define LOCK_GRAPH_H
#include "postgres.h"
#include "datatype/timestamp.h"
/*
* Describes an edge in a waiting-for graph of locks. This isn't used for
* deadlock-checking directly, but to gather the information necessary to
* do so.
*
* The datatypes here are a bit looser than strictly necessary, because
* they're transported as the return type from an SQL function.
*/
typedef struct WaitEdge
{
int waitingPid;
int waitingNodeId;
int64 waitingTransactionNum;
TimestampTz waitingTransactionStamp;
int blockingPid;
int blockingNodeId;
int64 blockingTransactionNum;
TimestampTz blockingTransactionStamp;
/* blocking transaction is also waiting on a lock */
bool isBlockingXactWaiting;
} WaitEdge;
/*
* WaitGraph represent a graph of wait edges as an adjacency list.
*/
typedef struct WaitGraph
{
int localNodeId;
int allocatedSize;
int edgeCount;
WaitEdge *edges;
} WaitGraph;
#endif /* LOCK_GRAPH_H */

View File

@ -0,0 +1,134 @@
Parsed test spec with 4 sessions
starting permutation: dist11-begin dist13-begin dist11-update dist13-update detector-dump-wait-edges dist11-abort dist13-abort
step dist11-begin:
BEGIN;
SELECT assign_distributed_transaction_id(11, 1, '2017-01-01 00:00:00+0');
assign_distributed_transaction_id
step dist13-begin:
BEGIN;
SELECT assign_distributed_transaction_id(13, 1, '2017-01-01 00:00:00+0');
assign_distributed_transaction_id
step dist11-update:
UPDATE local_table SET y = 1 WHERE x = 1;
step dist13-update:
UPDATE local_table SET y = 3 WHERE x = 1;
<waiting ...>
step detector-dump-wait-edges:
SELECT
waiting_node_id,
waiting_transaction_num,
blocking_node_id,
blocking_transaction_num,
blocking_transaction_waiting
FROM
dump_local_wait_edges(13);
waiting_node_idwaiting_transaction_numblocking_node_idblocking_transaction_numblocking_transaction_waiting
13 1 11 1 f
step dist11-abort:
ABORT;
step dist13-update: <... completed>
step dist13-abort:
ABORT;
starting permutation: local-begin dist13-begin local-update dist13-update detector-dump-wait-edges local-abort dist13-abort
step local-begin:
BEGIN;
step dist13-begin:
BEGIN;
SELECT assign_distributed_transaction_id(13, 1, '2017-01-01 00:00:00+0');
assign_distributed_transaction_id
step local-update:
UPDATE local_table SET y = 2 WHERE x = 1;
step dist13-update:
UPDATE local_table SET y = 3 WHERE x = 1;
<waiting ...>
step detector-dump-wait-edges:
SELECT
waiting_node_id,
waiting_transaction_num,
blocking_node_id,
blocking_transaction_num,
blocking_transaction_waiting
FROM
dump_local_wait_edges(13);
waiting_node_idwaiting_transaction_numblocking_node_idblocking_transaction_numblocking_transaction_waiting
13 1 0 f
step local-abort:
ABORT;
step dist13-update: <... completed>
step dist13-abort:
ABORT;
starting permutation: dist11-begin local-begin dist13-begin dist11-update local-update dist13-update detector-dump-wait-edges dist11-abort local-abort dist13-abort
step dist11-begin:
BEGIN;
SELECT assign_distributed_transaction_id(11, 1, '2017-01-01 00:00:00+0');
assign_distributed_transaction_id
step local-begin:
BEGIN;
step dist13-begin:
BEGIN;
SELECT assign_distributed_transaction_id(13, 1, '2017-01-01 00:00:00+0');
assign_distributed_transaction_id
step dist11-update:
UPDATE local_table SET y = 1 WHERE x = 1;
step local-update:
UPDATE local_table SET y = 2 WHERE x = 1;
<waiting ...>
step dist13-update:
UPDATE local_table SET y = 3 WHERE x = 1;
<waiting ...>
step detector-dump-wait-edges:
SELECT
waiting_node_id,
waiting_transaction_num,
blocking_node_id,
blocking_transaction_num,
blocking_transaction_waiting
FROM
dump_local_wait_edges(13);
waiting_node_idwaiting_transaction_numblocking_node_idblocking_transaction_numblocking_transaction_waiting
13 1 0 t
0 11 1 f
step dist11-abort:
ABORT;
step local-update: <... completed>
step local-abort:
ABORT;
step dist13-update: <... completed>
step dist13-abort:
ABORT;

View File

@ -114,6 +114,8 @@ ALTER EXTENSION citus UPDATE TO '7.0-1';
ALTER EXTENSION citus UPDATE TO '7.0-2';
ALTER EXTENSION citus UPDATE TO '7.0-3';
ALTER EXTENSION citus UPDATE TO '7.0-4';
ALTER EXTENSION citus UPDATE TO '7.0-5';
ALTER EXTENSION citus UPDATE TO '7.0-6';
-- show running version
SHOW citus.version;
citus.version

View File

@ -10,3 +10,4 @@ test: isolation_concurrent_dml isolation_data_migration
test: isolation_drop_shards isolation_copy_placement_vs_modification
test: isolation_insert_vs_vacuum isolation_transaction_recovery
test: isolation_distributed_transaction_id
test: isolation_dump_local_wait_edges

View File

@ -0,0 +1,87 @@
setup
{
CREATE TABLE local_table (x int primary key, y int);
INSERT INTO local_table VALUES (1,0);
}
teardown
{
DROP TABLE local_table;
}
session "dist11"
step "dist11-begin"
{
BEGIN;
SELECT assign_distributed_transaction_id(11, 1, '2017-01-01 00:00:00+0');
}
step "dist11-update"
{
UPDATE local_table SET y = 1 WHERE x = 1;
}
step "dist11-abort"
{
ABORT;
}
session "local"
step "local-begin"
{
BEGIN;
}
step "local-update"
{
UPDATE local_table SET y = 2 WHERE x = 1;
}
step "local-abort"
{
ABORT;
}
session "dist13"
step "dist13-begin"
{
BEGIN;
SELECT assign_distributed_transaction_id(13, 1, '2017-01-01 00:00:00+0');
}
step "dist13-update"
{
UPDATE local_table SET y = 3 WHERE x = 1;
}
step "dist13-abort"
{
ABORT;
}
session "detector"
step "detector-dump-wait-edges"
{
SELECT
waiting_node_id,
waiting_transaction_num,
blocking_node_id,
blocking_transaction_num,
blocking_transaction_waiting
FROM
dump_local_wait_edges(13);
}
# Distributed transaction blocked by another distributed transaction
permutation "dist11-begin" "dist13-begin" "dist11-update" "dist13-update" "detector-dump-wait-edges" "dist11-abort" "dist13-abort"
# Distributed transaction blocked by a regular transaction
permutation "local-begin" "dist13-begin" "local-update" "dist13-update" "detector-dump-wait-edges" "local-abort" "dist13-abort"
# Distributed transaction blocked by a regular transaction blocked by a distributed transaction
permutation "dist11-begin" "local-begin" "dist13-begin" "dist11-update" "local-update" "dist13-update" "detector-dump-wait-edges" "dist11-abort" "local-abort" "dist13-abort"

View File

@ -114,6 +114,8 @@ ALTER EXTENSION citus UPDATE TO '7.0-1';
ALTER EXTENSION citus UPDATE TO '7.0-2';
ALTER EXTENSION citus UPDATE TO '7.0-3';
ALTER EXTENSION citus UPDATE TO '7.0-4';
ALTER EXTENSION citus UPDATE TO '7.0-5';
ALTER EXTENSION citus UPDATE TO '7.0-6';
-- show running version
SHOW citus.version;