diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index cfd4e1ea8..797ba15fd 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \ 6.2-1 6.2-2 6.2-3 6.2-4 \ - 7.0-1 + 7.0-1 7.0-2 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -141,6 +141,8 @@ $(EXTENSION)--6.2-4.sql: $(EXTENSION)--6.2-3.sql $(EXTENSION)--6.2-3--6.2-4.sql cat $^ > $@ $(EXTENSION)--7.0-1.sql: $(EXTENSION)--6.2-4.sql $(EXTENSION)--6.2-4--7.0-1.sql cat $^ > $@ +$(EXTENSION)--7.0-2.sql: $(EXTENSION)--7.0-1.sql $(EXTENSION)--7.0-1--7.0-2.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--7.0-1--7.0-2.sql b/src/backend/distributed/citus--7.0-1--7.0-2.sql new file mode 100644 index 000000000..36f479089 --- /dev/null +++ b/src/backend/distributed/citus--7.0-1--7.0-2.sql @@ -0,0 +1,42 @@ +/* citus--7.0-1--7.0-2.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION assign_distributed_transaction_id(originNodeId int8, transactionId int8, transactionStamp timestamptz) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$assign_distributed_transaction_id$$; +COMMENT ON FUNCTION assign_distributed_transaction_id(originNodeId int8, transactionId int8, transactionStamp timestamptz) + IS 'sets distributed transaction id'; + +CREATE FUNCTION pg_catalog.dump_local_wait_edges( + IN source_node_id int, IN local_node_id int, + OUT waiting_pid int4, + OUT waiting_node_id int4, OUT waiting_transaction_id int8, OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int, + OUT blocking_nodeid int4, OUT blocking_transactionid int8, OUT blocking_transactionstamp timestamptz, + OUT blocking_also_blocked bool) +RETURNS SETOF RECORD +LANGUAGE 'c' +STRICT +AS $$citus$$, $$dump_local_wait_edges$$; + +CREATE FUNCTION pg_catalog.dump_all_wait_edges( + OUT pid int4, + OUT nodeId int8, OUT transactionId int8, OUT transactionStamp timestamptz, + OUT blocked_pid int, + OUT blocked_nodeid int8, OUT blocked_transactionid int8, OUT blocked_transactionstamp timestamptz, + OUT blocked_blocked bool) +RETURNS SETOF RECORD +LANGUAGE 'c' +STRICT +AS $$citus$$, $$dump_all_wait_edges$$; + + +CREATE FUNCTION pg_catalog.this_machine_kills_deadlocks() +RETURNS bool +LANGUAGE 'c' +STRICT +AS $$citus$$, $$this_machine_kills_deadlocks$$; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index ecc22e45e..abd503b5e 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '7.0-1' +default_version = '7.0-2' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index ed7e1a2f5..c8a831efc 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -78,8 +78,7 @@ static void AssignInsertTaskShardId(Query *jobQuery, List *taskList); static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); -static List * GetModifyConnections(List *taskPlacementList, bool markCritical, - bool startedInTransaction); +static List * GetModifyConnections(List *taskPlacementList, bool markCritical); static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults); static int64 ExecuteModifyTasks(List *taskList, bool expectResults, @@ -666,8 +665,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult char *queryString = task->queryString; bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC); - bool startedInTransaction = - InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA; if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) { @@ -706,8 +703,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult * table) and start a transaction (when in a transaction). */ connectionList = GetModifyConnections(taskPlacementList, - taskRequiresTwoPhaseCommit, - startedInTransaction); + taskRequiresTwoPhaseCommit); /* prevent replicas of the same shard from diverging */ AcquireExecutorShardLock(task, operation); @@ -808,7 +804,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult * transaction in progress. */ static List * -GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTransactions) +GetModifyConnections(List *taskPlacementList, bool markCritical) { ListCell *taskPlacementCell = NULL; List *multiConnectionList = NIL; @@ -828,26 +824,6 @@ GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTrans */ multiConnection = StartPlacementConnection(connectionFlags, taskPlacement, NULL); - /* - * If already in a transaction, disallow expanding set of remote - * transactions. That prevents some forms of distributed deadlocks. - */ - if (noNewTransactions) - { - RemoteTransaction *transaction = &multiConnection->remoteTransaction; - - if (EnableDeadlockPrevention && - transaction->transactionState == REMOTE_TRANS_INVALID) - { - ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), - errmsg("no transaction participant matches %s:%d", - taskPlacement->nodeName, taskPlacement->nodePort), - errdetail("Transactions which modify distributed tables " - "may only target nodes affected by the " - "modification command which began the transaction."))); - } - } - if (markCritical) { MarkRemoteTransactionCritical(multiConnection); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index c4931d0cb..3f3e37488 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -197,6 +197,7 @@ void StartupCitusBackend(void) { InitializeMaintenanceDaemonBackend(); + InitializeTransactionManagementBackend(); } diff --git a/src/backend/distributed/transaction/deadlock.c b/src/backend/distributed/transaction/deadlock.c new file mode 100644 index 000000000..7d6d9de77 --- /dev/null +++ b/src/backend/distributed/transaction/deadlock.c @@ -0,0 +1,926 @@ +/*------------------------------------------------------------------------- + * + * deadlock.c + * + * Basic implementation of distributed deadlock detection. The algorithm used + * is quite naive, in the hope that that's sufficient. The approach basically is: + * + * 1) regularly poll all workers if they've any sessions blocked longer than + * deadlock_timeout * fuzz_factor. This avoids triggering more expensive + * work if there's no potential for deadlocks. (currently unimplemented) + * + * 2) If 1) finds there's potential for deadlock, query each worker node for a + * wait-graphs for transactions originating on the corresponding + * coordinator. This graph can include non-distributed transactions, + * transactions originating on another systems, etc., just not as the + * "starting points". All the nodes are identified by their distributed + * transaction id (inventing one for non-distributed transactions). + * + * 3) The coordinator combines the wait-graphs from each worker, and builds a + * wait-graph spanning all of these. + * + * 4) Perform cycle detection for each distributed transaction started on the + * local node. The algorithm used is a simple graph traversal that finds a + * deadlock when the starting node is reached again. As already visited + * nodes are skipped, this is O(edges) for a single distributed + * transaction. Meaning the total cost is O(edges * + * distributed_transactions), where distributed_transactions is limited by + * max_connections. The cost can be limited by only considering + * transactions that have been running for longer than deadlock_timeout + * (unimplemented). + * If necessary this could be implemented in O(transactions + edges). + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "libpq-fe.h" +#include "miscadmin.h" + +#include "access/hash.h" +#include "distributed/connection_management.h" +#include "distributed/hash_helpers.h" +#include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" +#include "distributed/transaction_management.h" +#include "storage/proc.h" +#include "utils/builtins.h" +#include "utils/hsearch.h" + +typedef struct LockDepNode +{ + TmgmtTransactionId transactionId; + List *deps; + int initial_pid; + bool visited; +} LockDepNode; + +/* + * Describes an edge in a waiting-for graph of locks. This isn't used for + * deadlock-checking itself, but to gather the information necessary to do so. + * It can be acquired locally, or from a remote node. + * + * The datatypes here are a bit looser than strictly necessary, because + * they're transported as the return type from an SQL function. + */ +typedef struct WaitEdge +{ + int waitingPid; + int waitingNodeId; + int64 waitingTransactionId; + TimestampTz waitingTransactionStamp; + + int blockingPid; + int blockingNodeId; + int64 blockingTransactionId; + TimestampTz blockingTransactionStamp; + bool blockingAlsoBlocked; +} WaitEdge; + +/* FIXME: better name? More adjacency list than graph... */ +typedef struct WaitGraph +{ + size_t numAllocated; + size_t numUsed; + WaitEdge *edges; +} WaitGraph; + +#define NUM_DUMP_LOCKS_COLS 9 + +/* + * Columns: + * 00: waiting_pid + * 01: waiting_node_id + * 02: waiting_transaction_id + * 03: waiting_transactionstamp + * 04: blocking_pid + * 05: blocking_node_id + * 06: blocking_transaction_id + * 07: blocking_transactionstamp + * 08: blocking_also_blocked + */ + +static WaitGraph * BuildLocalWaitForGraph(int sourceNodeId, int localNodeId); +static WaitEdge * AllocWaitEdge(WaitGraph *waitGraph); + +static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); +static void LoadRemoteEdges(WaitGraph *waitGraph); +static void WaitEdgeFromResult(WaitEdge *edge, PGresult *result, size_t rowIndex); +static int64 ParseIntField(PGresult *result, int rowIndex, int colIndex, bool *isNull); +static TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex, + bool *isNull); + +static uint32 TransactionIdHash(const void *key, Size keysize); +static int TransactionIdCompare(const void *a, const void *b, Size keysize); +static LockDepNode * LookupDepNode(HTAB *lockDepNodeHash, + TmgmtTransactionId *transactionId); + +PG_FUNCTION_INFO_V1(this_machine_kills_deadlocks); +PG_FUNCTION_INFO_V1(dump_local_wait_edges); +PG_FUNCTION_INFO_V1(dump_all_wait_edges); + + +Datum +this_machine_kills_deadlocks(PG_FUNCTION_ARGS) +{ + WaitGraph *waitGraph = NULL; + HASHCTL info; + uint32 hashFlags = 0; + HTAB *lockDepNodeHash = NULL; + int localNodeId = 0; /* FIXME: get correct local node id */ + int curBackend = 0; + size_t curEdgeNum; + + /* create (host,port,user,database) -> [connection] hash */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(TmgmtTransactionId); + info.entrysize = sizeof(LockDepNode); + info.hash = TransactionIdHash; + info.match = TransactionIdCompare; + info.hcxt = CurrentMemoryContext; + hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE); + + lockDepNodeHash = hash_create("deadlock detection", + 64, &info, hashFlags); + + waitGraph = BuildLocalWaitForGraph(0, 0); + LoadRemoteEdges(waitGraph); + + /* + * FIXME: should be doable without additional lock now, could just do this + * in the hash, after building it? + */ + LWLockAcquire(&TmgmtShmemControl->lock, LW_SHARED); + for (curBackend = 0; curBackend < MaxBackends; curBackend++) + { + TmgmtBackendData *CurBackendData = &TmgmtShmemControl->sessions[curBackend]; + PGPROC *curProc; + LockDepNode *initialNode = NULL; + + curProc = &ProcGlobal->allProcs[curBackend]; + + /* Skip if unused (FIXME: prepared xacts!) */ + if (curProc->pid == 0) + { + continue; + } + + /* Skip if not a distributed transaction. */ + if (CurBackendData->transactionId.transactionId == 0) + { + continue; + } + + /* Skip if not not originating from interesting node */ + if (localNodeId != CurBackendData->transactionId.nodeId) + { + continue; + } + + /* + * It'd be nice if we could only start at transactions that are + * actually blocked, but we do not know that at this point. + */ + + initialNode = LookupDepNode(lockDepNodeHash, &CurBackendData->transactionId); + initialNode->initial_pid = curProc->pid; + } + LWLockRelease(&TmgmtShmemControl->lock); + + /* + * Flatten wait edges, which includes wait-edges between processes on each + * node, to a graph only including wait-for dependencies between + * distributed transactions. + */ + for (curEdgeNum = 0; curEdgeNum < waitGraph->numUsed; curEdgeNum++) + { + WaitEdge *curEdge = &waitGraph->edges[curEdgeNum]; + TmgmtTransactionId from; + TmgmtTransactionId to; + LockDepNode *fromNode; + LockDepNode *toNode; + + from.nodeId = curEdge->waitingNodeId; + from.transactionId = curEdge->waitingTransactionId; + from.timestamp = curEdge->waitingTransactionStamp; + + to.nodeId = curEdge->blockingNodeId; + to.transactionId = curEdge->blockingTransactionId; + to.timestamp = curEdge->blockingTransactionStamp; + + fromNode = LookupDepNode(lockDepNodeHash, &from); + toNode = LookupDepNode(lockDepNodeHash, &to); + + fromNode->deps = lappend(fromNode->deps, toNode); + } + + /* avoids the need need to call hash_seq_term */ + hash_freeze(lockDepNodeHash); + + /* + * Find lock cycles by doing a tree traversal for each node that starts + * with one of the local ones. + */ + { + LockDepNode *curNode; + HASH_SEQ_STATUS initNodeSeq; + + hash_seq_init(&initNodeSeq, lockDepNodeHash); + while ((curNode = (LockDepNode *) hash_seq_search(&initNodeSeq)) != 0) + { + HASH_SEQ_STATUS resetVisitedSeq; + LockDepNode *resetVisitedNode; + List *todoList = NIL; + + /* Only start at transactions originating locally and blocked. */ + if (curNode->initial_pid == 0) + { + continue; + } + + elog(WARNING, "doing deadlock detection for %d", curNode->initial_pid); + + /* reset visited fields */ + hash_seq_init(&resetVisitedSeq, lockDepNodeHash); + while ((resetVisitedNode = (LockDepNode *) hash_seq_search( + &resetVisitedSeq)) != 0) + { + resetVisitedNode->visited = false; + } + + todoList = list_copy(curNode->deps); + + while (todoList) + { + LockDepNode *visitNode = linitial(todoList); + ListCell *recurseCell = NULL; + + todoList = list_delete_first(todoList); + + /* There's a deadlock if we found our original node */ + if (visitNode == curNode) + { + elog(WARNING, "found deadlock, killing: %d", curNode->initial_pid); + kill(curNode->initial_pid, SIGINT); + pg_usleep(100000); + kill(curNode->initial_pid, SIGINT); + PG_RETURN_BOOL(true); + } + + /* don't do redundant work (or cycle in indep. deadlocks) */ + if (visitNode->visited) + { + continue; + } + visitNode->visited = true; + + foreach(recurseCell, visitNode->deps) + { + todoList = lappend(todoList, lfirst(recurseCell)); + } + } + } + } + + PG_RETURN_BOOL(false); +} + + +Datum +dump_local_wait_edges(PG_FUNCTION_ARGS) +{ + int32 sourceNodeId = PG_GETARG_INT32(0); + int32 localNodeId = PG_GETARG_INT32(1); + WaitGraph *waitGraph = NULL; + + waitGraph = BuildLocalWaitForGraph(sourceNodeId, localNodeId); + + ReturnWaitGraph(waitGraph, fcinfo); + + return (Datum) 0; +} + + +Datum +dump_all_wait_edges(PG_FUNCTION_ARGS) +{ + WaitGraph *waitGraph = NULL; + + waitGraph = BuildLocalWaitForGraph(0, 0); + LoadRemoteEdges(waitGraph); + + ReturnWaitGraph(waitGraph, fcinfo); + + return (Datum) 0; +} + + +static void +RecordWaitEdge(WaitGraph *waitGraph, + int localNodeId, bool blockingAlsoBlocked, + TmgmtBackendData *fromData, PGPROC *fromProc, + TmgmtBackendData *toData, PGPROC *toProc) +{ + WaitEdge *curEdge = AllocWaitEdge(waitGraph); + + curEdge->waitingPid = fromProc->pid; + if (fromData->transactionId.transactionId == 0) + { + curEdge->waitingNodeId = localNodeId; + curEdge->waitingTransactionId = 0; + curEdge->waitingTransactionStamp = 0; + } + else + { + curEdge->waitingNodeId = fromData->transactionId.nodeId; + curEdge->waitingTransactionId = fromData->transactionId.transactionId; + curEdge->waitingTransactionStamp = fromData->transactionId.timestamp; + } + + curEdge->blockingPid = toProc->pid; + if (toData->transactionId.transactionId == 0) + { + curEdge->blockingNodeId = localNodeId; + curEdge->blockingTransactionId = 0; + curEdge->blockingTransactionStamp = 0; + } + else + { + curEdge->blockingNodeId = toData->transactionId.nodeId; + curEdge->blockingTransactionId = toData->transactionId.transactionId; + curEdge->blockingTransactionStamp = toData->transactionId.timestamp; + } + curEdge->blockingAlsoBlocked = blockingAlsoBlocked; +} + + +static WaitGraph * +BuildLocalWaitForGraph(int sourceNodeId, int localNodeId) +{ + WaitGraph *waitGraph = (WaitGraph *) palloc(sizeof(WaitGraph)); + int curBackend; + int partitionNum; + List *todoList = NIL; + bool *visited = NULL; + + /* + * Try hard to avoid allocations while holding lock. Thus we pre-allocate + * space for locks in large batches - for common scenarios this should be + * more than enough space to build the list of wait edges without a single + * allocation. + * + * FIXME: Better todoList datatype, pg style list is a bad idea, way too + * many allocations. + */ + waitGraph->numAllocated = MaxBackends * 3; + waitGraph->numUsed = 0; + waitGraph->edges = (WaitEdge *) palloc(waitGraph->numAllocated * sizeof(WaitEdge)); + + visited = palloc0(sizeof(bool) * MaxBackends); + + /* + * Build lock-graph. We do so by first finding all procs which we are + * interested in (originating on our source system, and blocked). Once + * those are collected, do depth first search over all procs blocking + * those. To avoid redundantly dropping procs, keep track of which procs + * already have been visisted in a pgproc indexed visited[] array. + */ + + LWLockAcquire(&TmgmtShmemControl->lock, LW_SHARED); + for (partitionNum = 0; partitionNum < NUM_LOCK_PARTITIONS; partitionNum++) + { + LWLockAcquire(LockHashPartitionLockByIndex(partitionNum), LW_SHARED); + } + + /* build list of starting procs */ + for (curBackend = 0; curBackend < MaxBackends; curBackend++) + { + TmgmtBackendData *CurBackendData = &TmgmtShmemControl->sessions[curBackend]; + PGPROC *curProc; + + curProc = &ProcGlobal->allProcs[curBackend]; + + /* Skip if unused (FIXME: prepared xacts!) */ + if (curProc->pid == 0) + { + continue; + } + + /* Skip if not a distributed transaction. */ + if (CurBackendData->transactionId.transactionId == 0) + { + continue; + } + + /* Skip if not not originating from interesting node */ + if (sourceNodeId != CurBackendData->transactionId.nodeId) + { + continue; + } + + /* Skip if not blocked */ + if (curProc->links.next == NULL || curProc->waitLock == NULL) + { + continue; + } + + todoList = lappend(todoList, curProc); + } + + + while (todoList) + { + PGPROC *curProc = (PGPROC *) linitial(todoList); + TmgmtBackendData *curBackendData = + &TmgmtShmemControl->sessions[curProc->pgprocno]; + + /* pop from todo list */ + todoList = list_delete_first(todoList); + + /* avoid redundant (potentially cyclic!) iteration */ + if (visited[curProc->pgprocno]) + { + continue; + } + visited[curProc->pgprocno] = true; + + /* FIXME: deal with group locking */ + + /* FIXME: move body to different function */ + if (curProc->links.next != NULL && curProc->waitLock != NULL) + { + LOCK *lock = curProc->waitLock; + SHM_QUEUE *procLocks = &(lock->procLocks); + PROCLOCK *proclock; + int conflictMask; + int numLockModes; + LockMethod lockMethodTable; + + lockMethodTable = GetLocksMethodTable(lock); + numLockModes = lockMethodTable->numLockModes; + conflictMask = lockMethodTable->conflictTab[curProc->waitLockMode]; + + /* + * Record an edge for everyone already holding the lock in a + * conflicting manner ("hard edges" in postgres parlance). + */ + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, lockLink)); + + while (proclock) + { + PGPROC *leader; + TmgmtBackendData *blockingBackendData = NULL; + PGPROC *nextProc; + + nextProc = proclock->tag.myProc; + leader = nextProc->lockGroupLeader == NULL ? nextProc : + nextProc->lockGroupLeader; + + blockingBackendData = &TmgmtShmemControl->sessions[leader->pgprocno]; + + /* A proc never blocks itself or any other lock group member */ + if (leader != curProc) + { + int lockMethodOff; + + /* Have to check conflicts with every locktype held. */ + for (lockMethodOff = 1; lockMethodOff <= numLockModes; + lockMethodOff++) + { + if ((proclock->holdMask & LOCKBIT_ON(lockMethodOff)) && + (conflictMask & LOCKBIT_ON(lockMethodOff))) + { + bool blockingAlsoBlocked = nextProc->links.next != NULL && + nextProc->waitLock != NULL; + + RecordWaitEdge(waitGraph, + localNodeId, blockingAlsoBlocked, + curBackendData, curProc, + blockingBackendData, nextProc); + + if (blockingAlsoBlocked) + { + todoList = lappend(todoList, nextProc); + } + } + } + } + proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, + offsetof(PROCLOCK, lockLink)); + } + + + /* + * Record an edge for everyone in front of us in the wait-queue + * for the lock ("soft edges" in postgres parlance). Postgres can + * re-order the wait-queue, but that seems pretty hard to do in a + * distributed context. + */ + { + PROC_QUEUE *waitQueue = &(lock->waitProcs); + int queueSize = waitQueue->size; + PGPROC *nextProc = (PGPROC *) waitQueue->links.next; + + while (queueSize-- > 0) + { + PGPROC *leader; + + /* done when we reached ourselves */ + if (curProc == nextProc) + { + break; + } + + leader = nextProc->lockGroupLeader == NULL ? nextProc : + nextProc->lockGroupLeader; + + if ((LOCKBIT_ON(leader->waitLockMode) & conflictMask) != 0) + { + bool blockingAlsoBlocked = nextProc->links.next != NULL && + nextProc->waitLock != NULL; + TmgmtBackendData *blockingBackendData = NULL; + + blockingBackendData = + &TmgmtShmemControl->sessions[leader->pgprocno]; + + RecordWaitEdge(waitGraph, + localNodeId, blockingAlsoBlocked, + curBackendData, curProc, + blockingBackendData, nextProc); + + if (blockingAlsoBlocked) + { + todoList = lappend(todoList, nextProc); + } + } + + nextProc = (PGPROC *) nextProc->links.next; + } + } + } + } + + for (partitionNum = 0; partitionNum < NUM_LOCK_PARTITIONS; partitionNum++) + { + LWLockRelease(LockHashPartitionLockByIndex(partitionNum)); + } + + LWLockRelease(&TmgmtShmemControl->lock); + + return waitGraph; +} + + +static WaitEdge * +AllocWaitEdge(WaitGraph *waitGraph) +{ + /* ensure space for new edge */ + if (waitGraph->numAllocated == waitGraph->numUsed) + { + waitGraph->numAllocated *= 2; + waitGraph->edges = (WaitEdge *) + repalloc(waitGraph->edges, sizeof(WaitEdge) * + waitGraph->numAllocated); + } + + return &waitGraph->edges[waitGraph->numUsed++]; +} + + +static int64 +ParseIntField(PGresult *result, int rowIndex, int colIndex, bool *isNull) +{ + if (PQgetisnull(result, rowIndex, colIndex)) + { + if (!isNull) + { + ereport(ERROR, (errmsg("remote field unexpectedly is NULL"))); + } + *isNull = true; + return 0; + } + + if (isNull) + { + *isNull = false; + } + + return pg_strtouint64(PQgetvalue(result, rowIndex, colIndex), NULL, 10); +} + + +static TimestampTz +ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex, bool *isNull) +{ + Datum timestampDatum; + + if (PQgetisnull(result, rowIndex, colIndex)) + { + if (!isNull) + { + ereport(ERROR, (errmsg("remote field unexpectedly is NULL"))); + } + *isNull = true; + return 0; + } + + if (isNull) + { + *isNull = false; + } + + timestampDatum = + DirectFunctionCall3(timestamptz_in, CStringGetDatum(PQgetvalue(result, rowIndex, + colIndex)), + 0, -1); + return DatumGetTimestampTz(timestampDatum); +} + + +static void +ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc = NULL; + Tuplestorestate *tupstore = NULL; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + size_t curEdgeNum; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "set-valued function called in context that cannot accept a set"))); + } + if (!(rsinfo->allowedModes & SFRM_Materialize)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + } + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + { + elog(ERROR, "return type must be a row type"); + } + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); + + /* + * Columns: + * 00: waiting_pid + * 01: waiting_node_id + * 02: waiting_transaction_id + * 03: waiting_transactionstamp + * 04: blocking_pid + * 05: blocking_node_id + * 06: blocking_transaction_id + * 07: blocking_transactionstamp + * 08: blocking_also_blocked + */ + for (curEdgeNum = 0; curEdgeNum < waitGraph->numUsed; curEdgeNum++) + { + Datum values[NUM_DUMP_LOCKS_COLS]; + bool nulls[NUM_DUMP_LOCKS_COLS]; + WaitEdge *curEdge = &waitGraph->edges[curEdgeNum]; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(curEdge->waitingPid); + values[1] = Int32GetDatum(curEdge->waitingNodeId); + if (curEdge->waitingTransactionId != 0) + { + values[2] = Int64GetDatum(curEdge->waitingTransactionId); + values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp); + } + else + { + nulls[2] = true; + nulls[3] = true; + } + + values[4] = Int32GetDatum(curEdge->blockingPid); + values[5] = Int32GetDatum(curEdge->blockingNodeId); + if (curEdge->blockingTransactionId != 0) + { + values[6] = Int64GetDatum(curEdge->blockingTransactionId); + values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp); + } + else + { + nulls[6] = true; + nulls[7] = true; + } + values[8] = BoolGetDatum(curEdge->blockingAlsoBlocked); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); +} + + +static void +WaitEdgeFromResult(WaitEdge *edge, PGresult *result, size_t rowIndex) +{ + bool isNull = false; + + /* + * Remote Columns: + * 00: waiting_pid + * 01: waiting_node_id + * 02: waiting_transaction_id + * 03: waiting_transactionstamp + * 04: blocking_pid + * 05: blocking_node_id + * 06: blocking_transaction_id + * 07: blocking_transactionstamp + * 08: blocking_also_blocked + */ + edge->waitingPid = ParseIntField(result, rowIndex, 0, NULL); + edge->waitingNodeId = ParseIntField(result, rowIndex, 1, NULL); + edge->waitingTransactionId = ParseIntField(result, rowIndex, 2, &isNull); + if (isNull) + { + edge->waitingTransactionId = 0; + } + edge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3, &isNull); + if (isNull) + { + edge->waitingTransactionStamp = 0; + } + + edge->blockingPid = ParseIntField(result, rowIndex, 4, NULL); + edge->blockingNodeId = ParseIntField(result, rowIndex, 5, &isNull); + edge->blockingTransactionId = ParseIntField(result, rowIndex, 6, &isNull); + if (isNull) + { + edge->blockingTransactionId = 0; + } + edge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7, &isNull); + if (isNull) + { + edge->blockingTransactionStamp = 0; + } + edge->blockingAlsoBlocked = ParseIntField(result, rowIndex, 8, &isNull) != 0; +} + + +static void +LoadRemoteEdges(WaitGraph *waitGraph) +{ + List *workerNodeList = ActiveWorkerNodeList(); + ListCell *workerNodeCell = NULL; + char *nodeUser = CitusExtensionOwnerName(); + List *connectionList = NIL; + ListCell *connectionCell = NULL; + + + /* open connections in parallel */ + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + MultiConnection *connection = NULL; + int connectionFlags = 0; + + connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + nodeUser, NULL); + + connectionList = lappend(connectionList, connection); + } + + /* finish opening connections */ + FinishConnectionListEstablishment(connectionList); + + /* send commands in parallel */ + forboth(workerNodeCell, workerNodeList, connectionCell, connectionList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + int querySent = false; + char *command = NULL; + const char *params[2]; + + + params[0] = psprintf("%d", 0); /* FIXME: better id for master */ + params[1] = psprintf("%d", workerNode->nodeId); + command = "SELECT * FROM dump_local_wait_edges($1, $2)"; + + querySent = SendRemoteCommandParams(connection, command, 2, + NULL, params); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + } + + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + PGresult *result = NULL; + bool raiseInterrupts = true; + int64 rowIndex = 0; + int64 rowCount = 0; + int64 colCount = 0; + + result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + +#define NUM_DUMP_LOCKS_COLS 9 + rowCount = PQntuples(result); + colCount = PQnfields(result); + + if (colCount != NUM_DUMP_LOCKS_COLS) + { + elog(ERROR, "hey mister"); + } + + for (rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + WaitEdge *curEdge = AllocWaitEdge(waitGraph); + + WaitEdgeFromResult(curEdge, result, rowIndex); + } + + PQclear(result); + ForgetResults(connection); + } +} + + +static uint32 +TransactionIdHash(const void *key, Size keysize) +{ + TmgmtTransactionId *entry = (TmgmtTransactionId *) key; + uint32 hash = 0; + + hash = hash_any((unsigned char *) &entry->nodeId, sizeof(int64)); + hash = hash_combine(hash, hash_any((unsigned char *) &entry->transactionId, + sizeof(int64))); + hash = hash_combine(hash, hash_any((unsigned char *) &entry->timestamp, + sizeof(TimestampTz))); + + return hash; +} + + +static int +TransactionIdCompare(const void *a, const void *b, Size keysize) +{ + TmgmtTransactionId *ta = (TmgmtTransactionId *) a; + TmgmtTransactionId *tb = (TmgmtTransactionId *) b; + + /* NB: Not used for sorting, just equality... */ + if (ta->nodeId != tb->nodeId || + ta->transactionId != tb->transactionId || + ta->timestamp != tb->timestamp) + { + return 1; + } + else + { + return 0; + } +} + + +static LockDepNode * +LookupDepNode(HTAB *lockDepNodeHash, TmgmtTransactionId *transactionId) +{ + bool found = false; + LockDepNode *node = (LockDepNode *) + hash_search(lockDepNodeHash, transactionId, HASH_ENTER, &found); + + if (!found) + { + node->deps = NIL; + node->initial_pid = false; + node->visited = false; + } + + return node; +} diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 01bfe0eea..306c19424 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -70,6 +70,7 @@ FinishRemoteTransactionBegin(struct MultiConnection *connection) RemoteTransaction *transaction = &connection->remoteTransaction; PGresult *result = NULL; const bool raiseErrors = true; + char *query = NULL; Assert(transaction->transactionState == REMOTE_TRANS_STARTING); @@ -91,6 +92,33 @@ FinishRemoteTransactionBegin(struct MultiConnection *connection) { Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS); } + + { + const char *tzstr = timestamptz_to_str( + MyTmgmtBackendData->transactionId.timestamp); + + query = psprintf("SELECT assign_distributed_transaction_id(" + UINT64_FORMAT ","UINT64_FORMAT ", '%s')", + MyTmgmtBackendData->transactionId.nodeId, + MyTmgmtBackendData->transactionId.transactionId, + tzstr); + } + + /* FIXME: this is a bad idea performancewise */ + if (!SendRemoteCommand(connection, query)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, true); + } + result = GetRemoteCommandResult(connection, raiseErrors); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, raiseErrors); + } + + PQclear(result); + ForgetResults(connection); } @@ -263,7 +291,7 @@ RemoteTransactionCommit(MultiConnection *connection) /* - * StartRemoteTransactionAbort initiates abortin the transaction in a + * StartRemoteTransactionAbort initiates aborting the transaction in a * non-blocking manner. */ void diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index a0d14d9b0..e6faedffd 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,11 +21,15 @@ #include "access/xact.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_shard_transaction.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" +#include "storage/ipc.h" +#include "storage/proc.h" #include "utils/hsearch.h" #include "utils/guc.h" +#include "port/atomics.h" CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; @@ -40,9 +44,6 @@ XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; /* list of connections that are part of the current coordinated transaction */ dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions); - -static bool subXactAbortAttempted = false; - /* * Should this coordinated transaction use 2PC? Set by * CoordinatedTransactionUse2PC(), e.g. if DDL was issued and @@ -50,6 +51,10 @@ static bool subXactAbortAttempted = false; */ bool CoordinatedTransactionUses2PC = false; + +static bool subXactAbortAttempted = false; +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + /* transaction management functions */ static void CoordinatedTransactionCallback(XactEvent event, void *arg); static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, @@ -58,6 +63,39 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction /* remaining functions */ static void AdjustMaxPreparedTransactions(void); +TmgmtShmemControlData *TmgmtShmemControl = NULL; +TmgmtBackendData *MyTmgmtBackendData = NULL; + +PG_FUNCTION_INFO_V1(assign_distributed_transaction_id); + +Datum +assign_distributed_transaction_id(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + Assert(MyTmgmtBackendData); + + /* FIXME: spinlock? */ + MyTmgmtBackendData->transactionId.nodeId = PG_GETARG_INT64(0); + MyTmgmtBackendData->transactionId.transactionId = PG_GETARG_INT64(1); + MyTmgmtBackendData->transactionId.timestamp = PG_GETARG_TIMESTAMPTZ(2); + + PG_RETURN_VOID(); +} + + +static void +UnsetDistributedTransactionId(void) +{ + if (MyTmgmtBackendData) + { + /* FIXME: spinlock? */ + MyTmgmtBackendData->transactionId.nodeId = 0; + MyTmgmtBackendData->transactionId.transactionId = 0; + MyTmgmtBackendData->transactionId.timestamp = 0; + } +} + /* * BeginCoordinatedTransaction begins a coordinated transaction. No @@ -73,6 +111,20 @@ BeginCoordinatedTransaction(void) } CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; + + Assert(MyTmgmtBackendData); + + /* FIXME: Spinlock? Consistency is nice ;) */ + /* FIXME: determine proper local node id for MX etc */ + MyTmgmtBackendData->transactionId.nodeId = 0; + MyTmgmtBackendData->transactionId.transactionId = + pg_atomic_fetch_add_u64(&TmgmtShmemControl->nextTransactionId, 1); + MyTmgmtBackendData->transactionId.timestamp = GetCurrentTimestamp(); + + elog(DEBUG3, "assigning xact: (%lu, %lu, %lu)", + MyTmgmtBackendData->transactionId.nodeId, + MyTmgmtBackendData->transactionId.transactionId, + MyTmgmtBackendData->transactionId.timestamp); } @@ -117,6 +169,59 @@ CoordinatedTransactionUse2PC(void) } +static size_t +TmgmtShmemSize(void) +{ + Size size = 0; + + size = add_size(size, sizeof(TmgmtShmemControlData)); + size = add_size(size, mul_size(sizeof(TmgmtBackendData), MaxBackends)); + + return size; +} + + +static void +TmgmtShmemInit(void) +{ + bool alreadyInitialized = false; + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + TmgmtShmemControl = + (TmgmtShmemControlData *) ShmemInitStruct("Transaction Management", + TmgmtShmemSize(), + &alreadyInitialized); + if (!alreadyInitialized) + { + /* initialize lwlock */ + LWLockTranche *tranche = &TmgmtShmemControl->lockTranche; + + /* start by zeroing out all the memory */ + memset(TmgmtShmemControl, 0, TmgmtShmemSize()); + + TmgmtShmemControl->numSessions = MaxBackends; + + /* initialize lock */ + TmgmtShmemControl->trancheId = LWLockNewTrancheId(); + tranche->array_base = &TmgmtShmemControl->lock; + tranche->array_stride = sizeof(LWLock); + tranche->name = "Distributed Transaction Management"; + LWLockRegisterTranche(TmgmtShmemControl->trancheId, tranche); + LWLockInitialize(&TmgmtShmemControl->lock, + TmgmtShmemControl->trancheId); + + pg_atomic_init_u64(&TmgmtShmemControl->nextTransactionId, 7); + } + LWLockRelease(AddinShmemInitLock); + + if (prev_shmem_startup_hook != NULL) + { + prev_shmem_startup_hook(); + } +} + + void InitializeTransactionManagement(void) { @@ -125,6 +230,26 @@ InitializeTransactionManagement(void) RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL); AdjustMaxPreparedTransactions(); + + /* allocate shared memory */ + RequestAddinShmemSpace(TmgmtShmemSize()); + + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = TmgmtShmemInit; +} + + +void +InitializeTransactionManagementBackend(void) +{ + /* Fill this backend's lock information */ + LWLockAcquire(&TmgmtShmemControl->lock, LW_EXCLUSIVE); + MyTmgmtBackendData = &TmgmtShmemControl->sessions[MyProc->pgprocno]; + MyTmgmtBackendData->databaseId = MyDatabaseId; + + /* FIXME: get id usable for MX, where multiple nodes can start distributed transactions */ + MyTmgmtBackendData->transactionId.nodeId = 0; + LWLockRelease(&TmgmtShmemControl->lock); } @@ -168,6 +293,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) XactModificationLevel = XACT_MODIFICATION_NONE; dlist_init(&InProgressTransactions); CoordinatedTransactionUses2PC = false; + + UnsetDistributedTransactionId(); } break; @@ -204,6 +331,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) dlist_init(&InProgressTransactions); CoordinatedTransactionUses2PC = false; subXactAbortAttempted = false; + + UnsetDistributedTransactionId(); } break; diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index da5652157..9ba302d93 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -22,6 +22,7 @@ #include "access/xact.h" #include "libpq/pqsignal.h" +#include "distributed/deadlock.h" #include "distributed/maintenanced.h" #include "distributed/metadata_cache.h" #include "postmaster/bgworker.h" @@ -258,6 +259,22 @@ CitusMaintenanceDaemonMain(Datum main_arg) * tasks should do their own time math about whether to re-run checks. */ + { + Datum foundDeadlock; + + StartTransactionCommand(); + foundDeadlock = this_machine_kills_deadlocks(NULL); + CommitTransactionCommand(); + + /* + * Check sooner if we just found a deadlock. + */ + if (foundDeadlock) + { + timeout = 100; + } + } + /* * Wait until timeout, or until somebody wakes us up. */ diff --git a/src/include/distributed/deadlock.h b/src/include/distributed/deadlock.h new file mode 100644 index 000000000..cc8317c3b --- /dev/null +++ b/src/include/distributed/deadlock.h @@ -0,0 +1,15 @@ +/*------------------------------------------------------------------------- + * + * deadlock.h + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef DEADLOCK_H +#define DEADLOCK_H + +extern Datum this_machine_kills_deadlocks(PG_FUNCTION_ARGS); + +#endif /* DEADLOCK_H */ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index dd0c691ed..3f9d79427 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -11,6 +11,9 @@ #include "lib/ilist.h" +#include "datatype/timestamp.h" +#include "storage/lwlock.h" + /* describes what kind of modifications have occurred in the current transaction */ typedef enum { @@ -52,6 +55,39 @@ typedef enum COMMIT_PROTOCOL_2PC = 2 } CommitProtocolType; + +/* FIXME: move to different header? */ +typedef struct TmgmtTransactionId +{ + uint64 nodeId; + uint64 transactionId; + TimestampTz timestamp; +} TmgmtTransactionId; + +typedef struct TmgmtBackendData +{ + Oid databaseId; + TmgmtTransactionId transactionId; +} TmgmtBackendData; + + +typedef struct TmgmtShmemControlData +{ + int trancheId; + LWLockTranche lockTranche; + LWLock lock; + + int numSessions; + + pg_atomic_uint64 nextTransactionId; + + TmgmtBackendData sessions[FLEXIBLE_ARRAY_MEMBER]; +} TmgmtShmemControlData; + +extern TmgmtBackendData *MyTmgmtBackendData; +extern TmgmtShmemControlData *TmgmtShmemControl; + + /* config variable managed via guc.c */ extern int MultiShardCommitProtocol; @@ -76,6 +112,7 @@ extern void CoordinatedTransactionUse2PC(void); /* initialization function(s) */ extern void InitializeTransactionManagement(void); +extern void InitializeTransactionManagementBackend(void); #endif /* TRANSACTION_MANAGMENT_H */ diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 5e30d5868..7ccccfc53 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -111,6 +111,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-2'; ALTER EXTENSION citus UPDATE TO '6.2-3'; ALTER EXTENSION citus UPDATE TO '6.2-4'; ALTER EXTENSION citus UPDATE TO '7.0-1'; +ALTER EXTENSION citus UPDATE TO '7.0-2'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index a03010787..b7c1c9793 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -136,14 +136,15 @@ SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id; BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); -ERROR: no transaction participant matches localhost:57638 -DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. COMMIT; -- unless we disable deadlock prevention BEGIN; SET citus.enable_deadlock_prevention TO off; INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); +ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200001" +DETAIL: Key (lab_id, name)=(6, Leslie Lamport) already exists. +CONTEXT: while executing command on localhost:57638 ABORT; -- SELECTs may occur after a modification: First check that selecting -- from the modified node works. @@ -152,7 +153,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; count ------- - 0 + 1 (1 row) ABORT; @@ -168,7 +169,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; count ------- - 0 + 1 (1 row) ABORT; @@ -193,9 +194,10 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs':: (2 rows) SELECT * FROM labs WHERE id = 6; - id | name -----+------ -(0 rows) + id | name +----+----------- + 6 | Bell Labs +(1 row) -- COPY can happen after single row INSERT BEGIN; @@ -256,9 +258,10 @@ DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists. COMMIT; -- verify rollback SELECT * FROM researchers WHERE lab_id = 6; - id | lab_id | name -----+--------+------ -(0 rows) + id | lab_id | name +----+--------+---------------- + 9 | 6 | Leslie Lamport +(1 row) SELECT count(*) FROM pg_dist_transaction; count @@ -283,9 +286,10 @@ DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists. COMMIT; -- verify rollback SELECT * FROM researchers WHERE lab_id = 6; - id | lab_id | name -----+--------+------ -(0 rows) + id | lab_id | name +----+--------+---------------- + 9 | 6 | Leslie Lamport +(1 row) SELECT count(*) FROM pg_dist_transaction; count @@ -301,9 +305,10 @@ COMMIT; SELECT * FROM researchers WHERE lab_id = 6; id | lab_id | name ----+--------+---------------------- + 9 | 6 | Leslie Lamport 17 | 6 | 'Bjarne Stroustrup' 18 | 6 | 'Dennis Ritchie' -(2 rows) +(3 rows) -- verify 2pc SELECT count(*) FROM pg_dist_transaction; @@ -360,9 +365,10 @@ ERROR: could not commit transaction on any active node SELECT * FROM researchers WHERE lab_id = 6; id | lab_id | name ----+--------+---------------------- + 9 | 6 | Leslie Lamport 17 | 6 | 'Bjarne Stroustrup' 18 | 6 | 'Dennis Ritchie' -(2 rows) +(3 rows) -- cleanup triggers and the function SELECT * from run_command_on_placements('researchers', 'drop trigger reject_large_researcher_id on %s') @@ -400,7 +406,7 @@ ALTER TABLE labs ADD COLUMN motto text; SELECT master_modify_multiple_shards('DELETE FROM labs'); master_modify_multiple_shards ------------------------------- - 7 + 8 (1 row) ALTER TABLE labs ADD COLUMN score float; @@ -880,7 +886,6 @@ SELECT create_distributed_table('hash_modifying_xacts', 'key'); BEGIN; INSERT INTO hash_modifying_xacts VALUES (1, 1); INSERT INTO reference_modifying_xacts VALUES (10, 10); -ERROR: no transaction participant matches localhost:57638 COMMIT; -- it is allowed when turning off deadlock prevention BEGIN; diff --git a/src/test/regress/expected/multi_mx_modifying_xacts.out b/src/test/regress/expected/multi_mx_modifying_xacts.out index 92d29bf82..59cb523ea 100644 --- a/src/test/regress/expected/multi_mx_modifying_xacts.out +++ b/src/test/regress/expected/multi_mx_modifying_xacts.out @@ -140,8 +140,6 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id; BEGIN; INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport'); -ERROR: no transaction participant matches localhost:57638 -DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. COMMIT; -- have the same test on the other worker node \c - - - :worker_2_port @@ -163,8 +161,6 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id; BEGIN; INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport'); -ERROR: no transaction participant matches localhost:57638 -DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. COMMIT; -- switch back to the worker node \c - - - :worker_1_port @@ -175,7 +171,7 @@ INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); SELECT count(*) FROM researchers_mx WHERE lab_id = 6; count ------- - 0 + 2 (1 row) ABORT; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 5f5c40c47..5753e19d9 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -111,6 +111,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-2'; ALTER EXTENSION citus UPDATE TO '6.2-3'; ALTER EXTENSION citus UPDATE TO '6.2-4'; ALTER EXTENSION citus UPDATE TO '7.0-1'; +ALTER EXTENSION citus UPDATE TO '7.0-2'; -- show running version SHOW citus.version;