From 570e1159c1b82191e0f5b94407942d68ce0b54d0 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sun, 11 Jun 2017 18:30:48 -0700 Subject: [PATCH 1/3] WIP: Introduction of very basic distributed transaction identifier concept. This will be used, fat first, for distributed deadlock detection. There it's important to be able to correlate transactions on workers with transactions on the coordinator. For that distributed transactions (ones that actually issue BEGIN on workers etc), get assigned a distributed transaction ID. That ID currently consists out of (nodeId, transactionNumber, timestamp). TODO: * think about 2PC * locking * query functionality * ... --- src/backend/distributed/shared_library_init.c | 1 + .../transaction/remote_transaction.c | 30 +++- .../transaction/transaction_management.c | 135 +++++++++++++++++- .../distributed/transaction_management.h | 37 +++++ 4 files changed, 199 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index c4931d0cb..3f3e37488 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -197,6 +197,7 @@ void StartupCitusBackend(void) { InitializeMaintenanceDaemonBackend(); + InitializeTransactionManagementBackend(); } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 01bfe0eea..306c19424 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -70,6 +70,7 @@ FinishRemoteTransactionBegin(struct MultiConnection *connection) RemoteTransaction *transaction = &connection->remoteTransaction; PGresult *result = NULL; const bool raiseErrors = true; + char *query = NULL; Assert(transaction->transactionState == REMOTE_TRANS_STARTING); @@ -91,6 +92,33 @@ FinishRemoteTransactionBegin(struct MultiConnection *connection) { Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS); } + + { + const char *tzstr = timestamptz_to_str( + MyTmgmtBackendData->transactionId.timestamp); + + query = psprintf("SELECT assign_distributed_transaction_id(" + UINT64_FORMAT ","UINT64_FORMAT ", '%s')", + MyTmgmtBackendData->transactionId.nodeId, + MyTmgmtBackendData->transactionId.transactionId, + tzstr); + } + + /* FIXME: this is a bad idea performancewise */ + if (!SendRemoteCommand(connection, query)) + { + ReportConnectionError(connection, WARNING); + MarkRemoteTransactionFailed(connection, true); + } + result = GetRemoteCommandResult(connection, raiseErrors); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + MarkRemoteTransactionFailed(connection, raiseErrors); + } + + PQclear(result); + ForgetResults(connection); } @@ -263,7 +291,7 @@ RemoteTransactionCommit(MultiConnection *connection) /* - * StartRemoteTransactionAbort initiates abortin the transaction in a + * StartRemoteTransactionAbort initiates aborting the transaction in a * non-blocking manner. */ void diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index a0d14d9b0..e6faedffd 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -21,11 +21,15 @@ #include "access/xact.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_shard_transaction.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" +#include "storage/ipc.h" +#include "storage/proc.h" #include "utils/hsearch.h" #include "utils/guc.h" +#include "port/atomics.h" CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; @@ -40,9 +44,6 @@ XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; /* list of connections that are part of the current coordinated transaction */ dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions); - -static bool subXactAbortAttempted = false; - /* * Should this coordinated transaction use 2PC? Set by * CoordinatedTransactionUse2PC(), e.g. if DDL was issued and @@ -50,6 +51,10 @@ static bool subXactAbortAttempted = false; */ bool CoordinatedTransactionUses2PC = false; + +static bool subXactAbortAttempted = false; +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + /* transaction management functions */ static void CoordinatedTransactionCallback(XactEvent event, void *arg); static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, @@ -58,6 +63,39 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction /* remaining functions */ static void AdjustMaxPreparedTransactions(void); +TmgmtShmemControlData *TmgmtShmemControl = NULL; +TmgmtBackendData *MyTmgmtBackendData = NULL; + +PG_FUNCTION_INFO_V1(assign_distributed_transaction_id); + +Datum +assign_distributed_transaction_id(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + Assert(MyTmgmtBackendData); + + /* FIXME: spinlock? */ + MyTmgmtBackendData->transactionId.nodeId = PG_GETARG_INT64(0); + MyTmgmtBackendData->transactionId.transactionId = PG_GETARG_INT64(1); + MyTmgmtBackendData->transactionId.timestamp = PG_GETARG_TIMESTAMPTZ(2); + + PG_RETURN_VOID(); +} + + +static void +UnsetDistributedTransactionId(void) +{ + if (MyTmgmtBackendData) + { + /* FIXME: spinlock? */ + MyTmgmtBackendData->transactionId.nodeId = 0; + MyTmgmtBackendData->transactionId.transactionId = 0; + MyTmgmtBackendData->transactionId.timestamp = 0; + } +} + /* * BeginCoordinatedTransaction begins a coordinated transaction. No @@ -73,6 +111,20 @@ BeginCoordinatedTransaction(void) } CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; + + Assert(MyTmgmtBackendData); + + /* FIXME: Spinlock? Consistency is nice ;) */ + /* FIXME: determine proper local node id for MX etc */ + MyTmgmtBackendData->transactionId.nodeId = 0; + MyTmgmtBackendData->transactionId.transactionId = + pg_atomic_fetch_add_u64(&TmgmtShmemControl->nextTransactionId, 1); + MyTmgmtBackendData->transactionId.timestamp = GetCurrentTimestamp(); + + elog(DEBUG3, "assigning xact: (%lu, %lu, %lu)", + MyTmgmtBackendData->transactionId.nodeId, + MyTmgmtBackendData->transactionId.transactionId, + MyTmgmtBackendData->transactionId.timestamp); } @@ -117,6 +169,59 @@ CoordinatedTransactionUse2PC(void) } +static size_t +TmgmtShmemSize(void) +{ + Size size = 0; + + size = add_size(size, sizeof(TmgmtShmemControlData)); + size = add_size(size, mul_size(sizeof(TmgmtBackendData), MaxBackends)); + + return size; +} + + +static void +TmgmtShmemInit(void) +{ + bool alreadyInitialized = false; + + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + TmgmtShmemControl = + (TmgmtShmemControlData *) ShmemInitStruct("Transaction Management", + TmgmtShmemSize(), + &alreadyInitialized); + if (!alreadyInitialized) + { + /* initialize lwlock */ + LWLockTranche *tranche = &TmgmtShmemControl->lockTranche; + + /* start by zeroing out all the memory */ + memset(TmgmtShmemControl, 0, TmgmtShmemSize()); + + TmgmtShmemControl->numSessions = MaxBackends; + + /* initialize lock */ + TmgmtShmemControl->trancheId = LWLockNewTrancheId(); + tranche->array_base = &TmgmtShmemControl->lock; + tranche->array_stride = sizeof(LWLock); + tranche->name = "Distributed Transaction Management"; + LWLockRegisterTranche(TmgmtShmemControl->trancheId, tranche); + LWLockInitialize(&TmgmtShmemControl->lock, + TmgmtShmemControl->trancheId); + + pg_atomic_init_u64(&TmgmtShmemControl->nextTransactionId, 7); + } + LWLockRelease(AddinShmemInitLock); + + if (prev_shmem_startup_hook != NULL) + { + prev_shmem_startup_hook(); + } +} + + void InitializeTransactionManagement(void) { @@ -125,6 +230,26 @@ InitializeTransactionManagement(void) RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL); AdjustMaxPreparedTransactions(); + + /* allocate shared memory */ + RequestAddinShmemSpace(TmgmtShmemSize()); + + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = TmgmtShmemInit; +} + + +void +InitializeTransactionManagementBackend(void) +{ + /* Fill this backend's lock information */ + LWLockAcquire(&TmgmtShmemControl->lock, LW_EXCLUSIVE); + MyTmgmtBackendData = &TmgmtShmemControl->sessions[MyProc->pgprocno]; + MyTmgmtBackendData->databaseId = MyDatabaseId; + + /* FIXME: get id usable for MX, where multiple nodes can start distributed transactions */ + MyTmgmtBackendData->transactionId.nodeId = 0; + LWLockRelease(&TmgmtShmemControl->lock); } @@ -168,6 +293,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) XactModificationLevel = XACT_MODIFICATION_NONE; dlist_init(&InProgressTransactions); CoordinatedTransactionUses2PC = false; + + UnsetDistributedTransactionId(); } break; @@ -204,6 +331,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) dlist_init(&InProgressTransactions); CoordinatedTransactionUses2PC = false; subXactAbortAttempted = false; + + UnsetDistributedTransactionId(); } break; diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index dd0c691ed..3f9d79427 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -11,6 +11,9 @@ #include "lib/ilist.h" +#include "datatype/timestamp.h" +#include "storage/lwlock.h" + /* describes what kind of modifications have occurred in the current transaction */ typedef enum { @@ -52,6 +55,39 @@ typedef enum COMMIT_PROTOCOL_2PC = 2 } CommitProtocolType; + +/* FIXME: move to different header? */ +typedef struct TmgmtTransactionId +{ + uint64 nodeId; + uint64 transactionId; + TimestampTz timestamp; +} TmgmtTransactionId; + +typedef struct TmgmtBackendData +{ + Oid databaseId; + TmgmtTransactionId transactionId; +} TmgmtBackendData; + + +typedef struct TmgmtShmemControlData +{ + int trancheId; + LWLockTranche lockTranche; + LWLock lock; + + int numSessions; + + pg_atomic_uint64 nextTransactionId; + + TmgmtBackendData sessions[FLEXIBLE_ARRAY_MEMBER]; +} TmgmtShmemControlData; + +extern TmgmtBackendData *MyTmgmtBackendData; +extern TmgmtShmemControlData *TmgmtShmemControl; + + /* config variable managed via guc.c */ extern int MultiShardCommitProtocol; @@ -76,6 +112,7 @@ extern void CoordinatedTransactionUse2PC(void); /* initialization function(s) */ extern void InitializeTransactionManagement(void); +extern void InitializeTransactionManagementBackend(void); #endif /* TRANSACTION_MANAGMENT_H */ From 1f39d07e65bff6f224ae7a456f8fbe88886083ee Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sun, 11 Jun 2017 18:34:53 -0700 Subject: [PATCH 2/3] WIP: Basic distributed deadlock detector. Lots of things missing: * only perform deadlock detection if there's blocked transactions on the master * only do the actual cycle detection for transactions that are blocked * cancel member(s) of the cycle that are younger, to rpovide some forward guarantee * Fix bug requiring multiple cancel interrupts * Proper error messages for cancelled transactions, that presumably requires some hackery in our abort handler, overwriting the current error? * Test infrastructure, isolationtester unfortunately is unsuitable * ... --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--7.0-1--7.0-2.sql | 42 + src/backend/distributed/citus.control | 2 +- .../distributed/transaction/deadlock.c | 926 ++++++++++++++++++ src/backend/distributed/utils/maintenanced.c | 17 + src/include/distributed/deadlock.h | 15 + src/test/regress/expected/multi_extension.out | 1 + src/test/regress/sql/multi_extension.sql | 1 + 8 files changed, 1006 insertions(+), 2 deletions(-) create mode 100644 src/backend/distributed/citus--7.0-1--7.0-2.sql create mode 100644 src/backend/distributed/transaction/deadlock.c create mode 100644 src/include/distributed/deadlock.h diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index cfd4e1ea8..797ba15fd 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \ 6.2-1 6.2-2 6.2-3 6.2-4 \ - 7.0-1 + 7.0-1 7.0-2 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -141,6 +141,8 @@ $(EXTENSION)--6.2-4.sql: $(EXTENSION)--6.2-3.sql $(EXTENSION)--6.2-3--6.2-4.sql cat $^ > $@ $(EXTENSION)--7.0-1.sql: $(EXTENSION)--6.2-4.sql $(EXTENSION)--6.2-4--7.0-1.sql cat $^ > $@ +$(EXTENSION)--7.0-2.sql: $(EXTENSION)--7.0-1.sql $(EXTENSION)--7.0-1--7.0-2.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--7.0-1--7.0-2.sql b/src/backend/distributed/citus--7.0-1--7.0-2.sql new file mode 100644 index 000000000..36f479089 --- /dev/null +++ b/src/backend/distributed/citus--7.0-1--7.0-2.sql @@ -0,0 +1,42 @@ +/* citus--7.0-1--7.0-2.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION assign_distributed_transaction_id(originNodeId int8, transactionId int8, transactionStamp timestamptz) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$assign_distributed_transaction_id$$; +COMMENT ON FUNCTION assign_distributed_transaction_id(originNodeId int8, transactionId int8, transactionStamp timestamptz) + IS 'sets distributed transaction id'; + +CREATE FUNCTION pg_catalog.dump_local_wait_edges( + IN source_node_id int, IN local_node_id int, + OUT waiting_pid int4, + OUT waiting_node_id int4, OUT waiting_transaction_id int8, OUT waiting_transaction_stamp timestamptz, + OUT blocking_pid int, + OUT blocking_nodeid int4, OUT blocking_transactionid int8, OUT blocking_transactionstamp timestamptz, + OUT blocking_also_blocked bool) +RETURNS SETOF RECORD +LANGUAGE 'c' +STRICT +AS $$citus$$, $$dump_local_wait_edges$$; + +CREATE FUNCTION pg_catalog.dump_all_wait_edges( + OUT pid int4, + OUT nodeId int8, OUT transactionId int8, OUT transactionStamp timestamptz, + OUT blocked_pid int, + OUT blocked_nodeid int8, OUT blocked_transactionid int8, OUT blocked_transactionstamp timestamptz, + OUT blocked_blocked bool) +RETURNS SETOF RECORD +LANGUAGE 'c' +STRICT +AS $$citus$$, $$dump_all_wait_edges$$; + + +CREATE FUNCTION pg_catalog.this_machine_kills_deadlocks() +RETURNS bool +LANGUAGE 'c' +STRICT +AS $$citus$$, $$this_machine_kills_deadlocks$$; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index ecc22e45e..abd503b5e 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '7.0-1' +default_version = '7.0-2' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/transaction/deadlock.c b/src/backend/distributed/transaction/deadlock.c new file mode 100644 index 000000000..7d6d9de77 --- /dev/null +++ b/src/backend/distributed/transaction/deadlock.c @@ -0,0 +1,926 @@ +/*------------------------------------------------------------------------- + * + * deadlock.c + * + * Basic implementation of distributed deadlock detection. The algorithm used + * is quite naive, in the hope that that's sufficient. The approach basically is: + * + * 1) regularly poll all workers if they've any sessions blocked longer than + * deadlock_timeout * fuzz_factor. This avoids triggering more expensive + * work if there's no potential for deadlocks. (currently unimplemented) + * + * 2) If 1) finds there's potential for deadlock, query each worker node for a + * wait-graphs for transactions originating on the corresponding + * coordinator. This graph can include non-distributed transactions, + * transactions originating on another systems, etc., just not as the + * "starting points". All the nodes are identified by their distributed + * transaction id (inventing one for non-distributed transactions). + * + * 3) The coordinator combines the wait-graphs from each worker, and builds a + * wait-graph spanning all of these. + * + * 4) Perform cycle detection for each distributed transaction started on the + * local node. The algorithm used is a simple graph traversal that finds a + * deadlock when the starting node is reached again. As already visited + * nodes are skipped, this is O(edges) for a single distributed + * transaction. Meaning the total cost is O(edges * + * distributed_transactions), where distributed_transactions is limited by + * max_connections. The cost can be limited by only considering + * transactions that have been running for longer than deadlock_timeout + * (unimplemented). + * If necessary this could be implemented in O(transactions + edges). + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "libpq-fe.h" +#include "miscadmin.h" + +#include "access/hash.h" +#include "distributed/connection_management.h" +#include "distributed/hash_helpers.h" +#include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" +#include "distributed/transaction_management.h" +#include "storage/proc.h" +#include "utils/builtins.h" +#include "utils/hsearch.h" + +typedef struct LockDepNode +{ + TmgmtTransactionId transactionId; + List *deps; + int initial_pid; + bool visited; +} LockDepNode; + +/* + * Describes an edge in a waiting-for graph of locks. This isn't used for + * deadlock-checking itself, but to gather the information necessary to do so. + * It can be acquired locally, or from a remote node. + * + * The datatypes here are a bit looser than strictly necessary, because + * they're transported as the return type from an SQL function. + */ +typedef struct WaitEdge +{ + int waitingPid; + int waitingNodeId; + int64 waitingTransactionId; + TimestampTz waitingTransactionStamp; + + int blockingPid; + int blockingNodeId; + int64 blockingTransactionId; + TimestampTz blockingTransactionStamp; + bool blockingAlsoBlocked; +} WaitEdge; + +/* FIXME: better name? More adjacency list than graph... */ +typedef struct WaitGraph +{ + size_t numAllocated; + size_t numUsed; + WaitEdge *edges; +} WaitGraph; + +#define NUM_DUMP_LOCKS_COLS 9 + +/* + * Columns: + * 00: waiting_pid + * 01: waiting_node_id + * 02: waiting_transaction_id + * 03: waiting_transactionstamp + * 04: blocking_pid + * 05: blocking_node_id + * 06: blocking_transaction_id + * 07: blocking_transactionstamp + * 08: blocking_also_blocked + */ + +static WaitGraph * BuildLocalWaitForGraph(int sourceNodeId, int localNodeId); +static WaitEdge * AllocWaitEdge(WaitGraph *waitGraph); + +static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo); +static void LoadRemoteEdges(WaitGraph *waitGraph); +static void WaitEdgeFromResult(WaitEdge *edge, PGresult *result, size_t rowIndex); +static int64 ParseIntField(PGresult *result, int rowIndex, int colIndex, bool *isNull); +static TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex, + bool *isNull); + +static uint32 TransactionIdHash(const void *key, Size keysize); +static int TransactionIdCompare(const void *a, const void *b, Size keysize); +static LockDepNode * LookupDepNode(HTAB *lockDepNodeHash, + TmgmtTransactionId *transactionId); + +PG_FUNCTION_INFO_V1(this_machine_kills_deadlocks); +PG_FUNCTION_INFO_V1(dump_local_wait_edges); +PG_FUNCTION_INFO_V1(dump_all_wait_edges); + + +Datum +this_machine_kills_deadlocks(PG_FUNCTION_ARGS) +{ + WaitGraph *waitGraph = NULL; + HASHCTL info; + uint32 hashFlags = 0; + HTAB *lockDepNodeHash = NULL; + int localNodeId = 0; /* FIXME: get correct local node id */ + int curBackend = 0; + size_t curEdgeNum; + + /* create (host,port,user,database) -> [connection] hash */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(TmgmtTransactionId); + info.entrysize = sizeof(LockDepNode); + info.hash = TransactionIdHash; + info.match = TransactionIdCompare; + info.hcxt = CurrentMemoryContext; + hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE); + + lockDepNodeHash = hash_create("deadlock detection", + 64, &info, hashFlags); + + waitGraph = BuildLocalWaitForGraph(0, 0); + LoadRemoteEdges(waitGraph); + + /* + * FIXME: should be doable without additional lock now, could just do this + * in the hash, after building it? + */ + LWLockAcquire(&TmgmtShmemControl->lock, LW_SHARED); + for (curBackend = 0; curBackend < MaxBackends; curBackend++) + { + TmgmtBackendData *CurBackendData = &TmgmtShmemControl->sessions[curBackend]; + PGPROC *curProc; + LockDepNode *initialNode = NULL; + + curProc = &ProcGlobal->allProcs[curBackend]; + + /* Skip if unused (FIXME: prepared xacts!) */ + if (curProc->pid == 0) + { + continue; + } + + /* Skip if not a distributed transaction. */ + if (CurBackendData->transactionId.transactionId == 0) + { + continue; + } + + /* Skip if not not originating from interesting node */ + if (localNodeId != CurBackendData->transactionId.nodeId) + { + continue; + } + + /* + * It'd be nice if we could only start at transactions that are + * actually blocked, but we do not know that at this point. + */ + + initialNode = LookupDepNode(lockDepNodeHash, &CurBackendData->transactionId); + initialNode->initial_pid = curProc->pid; + } + LWLockRelease(&TmgmtShmemControl->lock); + + /* + * Flatten wait edges, which includes wait-edges between processes on each + * node, to a graph only including wait-for dependencies between + * distributed transactions. + */ + for (curEdgeNum = 0; curEdgeNum < waitGraph->numUsed; curEdgeNum++) + { + WaitEdge *curEdge = &waitGraph->edges[curEdgeNum]; + TmgmtTransactionId from; + TmgmtTransactionId to; + LockDepNode *fromNode; + LockDepNode *toNode; + + from.nodeId = curEdge->waitingNodeId; + from.transactionId = curEdge->waitingTransactionId; + from.timestamp = curEdge->waitingTransactionStamp; + + to.nodeId = curEdge->blockingNodeId; + to.transactionId = curEdge->blockingTransactionId; + to.timestamp = curEdge->blockingTransactionStamp; + + fromNode = LookupDepNode(lockDepNodeHash, &from); + toNode = LookupDepNode(lockDepNodeHash, &to); + + fromNode->deps = lappend(fromNode->deps, toNode); + } + + /* avoids the need need to call hash_seq_term */ + hash_freeze(lockDepNodeHash); + + /* + * Find lock cycles by doing a tree traversal for each node that starts + * with one of the local ones. + */ + { + LockDepNode *curNode; + HASH_SEQ_STATUS initNodeSeq; + + hash_seq_init(&initNodeSeq, lockDepNodeHash); + while ((curNode = (LockDepNode *) hash_seq_search(&initNodeSeq)) != 0) + { + HASH_SEQ_STATUS resetVisitedSeq; + LockDepNode *resetVisitedNode; + List *todoList = NIL; + + /* Only start at transactions originating locally and blocked. */ + if (curNode->initial_pid == 0) + { + continue; + } + + elog(WARNING, "doing deadlock detection for %d", curNode->initial_pid); + + /* reset visited fields */ + hash_seq_init(&resetVisitedSeq, lockDepNodeHash); + while ((resetVisitedNode = (LockDepNode *) hash_seq_search( + &resetVisitedSeq)) != 0) + { + resetVisitedNode->visited = false; + } + + todoList = list_copy(curNode->deps); + + while (todoList) + { + LockDepNode *visitNode = linitial(todoList); + ListCell *recurseCell = NULL; + + todoList = list_delete_first(todoList); + + /* There's a deadlock if we found our original node */ + if (visitNode == curNode) + { + elog(WARNING, "found deadlock, killing: %d", curNode->initial_pid); + kill(curNode->initial_pid, SIGINT); + pg_usleep(100000); + kill(curNode->initial_pid, SIGINT); + PG_RETURN_BOOL(true); + } + + /* don't do redundant work (or cycle in indep. deadlocks) */ + if (visitNode->visited) + { + continue; + } + visitNode->visited = true; + + foreach(recurseCell, visitNode->deps) + { + todoList = lappend(todoList, lfirst(recurseCell)); + } + } + } + } + + PG_RETURN_BOOL(false); +} + + +Datum +dump_local_wait_edges(PG_FUNCTION_ARGS) +{ + int32 sourceNodeId = PG_GETARG_INT32(0); + int32 localNodeId = PG_GETARG_INT32(1); + WaitGraph *waitGraph = NULL; + + waitGraph = BuildLocalWaitForGraph(sourceNodeId, localNodeId); + + ReturnWaitGraph(waitGraph, fcinfo); + + return (Datum) 0; +} + + +Datum +dump_all_wait_edges(PG_FUNCTION_ARGS) +{ + WaitGraph *waitGraph = NULL; + + waitGraph = BuildLocalWaitForGraph(0, 0); + LoadRemoteEdges(waitGraph); + + ReturnWaitGraph(waitGraph, fcinfo); + + return (Datum) 0; +} + + +static void +RecordWaitEdge(WaitGraph *waitGraph, + int localNodeId, bool blockingAlsoBlocked, + TmgmtBackendData *fromData, PGPROC *fromProc, + TmgmtBackendData *toData, PGPROC *toProc) +{ + WaitEdge *curEdge = AllocWaitEdge(waitGraph); + + curEdge->waitingPid = fromProc->pid; + if (fromData->transactionId.transactionId == 0) + { + curEdge->waitingNodeId = localNodeId; + curEdge->waitingTransactionId = 0; + curEdge->waitingTransactionStamp = 0; + } + else + { + curEdge->waitingNodeId = fromData->transactionId.nodeId; + curEdge->waitingTransactionId = fromData->transactionId.transactionId; + curEdge->waitingTransactionStamp = fromData->transactionId.timestamp; + } + + curEdge->blockingPid = toProc->pid; + if (toData->transactionId.transactionId == 0) + { + curEdge->blockingNodeId = localNodeId; + curEdge->blockingTransactionId = 0; + curEdge->blockingTransactionStamp = 0; + } + else + { + curEdge->blockingNodeId = toData->transactionId.nodeId; + curEdge->blockingTransactionId = toData->transactionId.transactionId; + curEdge->blockingTransactionStamp = toData->transactionId.timestamp; + } + curEdge->blockingAlsoBlocked = blockingAlsoBlocked; +} + + +static WaitGraph * +BuildLocalWaitForGraph(int sourceNodeId, int localNodeId) +{ + WaitGraph *waitGraph = (WaitGraph *) palloc(sizeof(WaitGraph)); + int curBackend; + int partitionNum; + List *todoList = NIL; + bool *visited = NULL; + + /* + * Try hard to avoid allocations while holding lock. Thus we pre-allocate + * space for locks in large batches - for common scenarios this should be + * more than enough space to build the list of wait edges without a single + * allocation. + * + * FIXME: Better todoList datatype, pg style list is a bad idea, way too + * many allocations. + */ + waitGraph->numAllocated = MaxBackends * 3; + waitGraph->numUsed = 0; + waitGraph->edges = (WaitEdge *) palloc(waitGraph->numAllocated * sizeof(WaitEdge)); + + visited = palloc0(sizeof(bool) * MaxBackends); + + /* + * Build lock-graph. We do so by first finding all procs which we are + * interested in (originating on our source system, and blocked). Once + * those are collected, do depth first search over all procs blocking + * those. To avoid redundantly dropping procs, keep track of which procs + * already have been visisted in a pgproc indexed visited[] array. + */ + + LWLockAcquire(&TmgmtShmemControl->lock, LW_SHARED); + for (partitionNum = 0; partitionNum < NUM_LOCK_PARTITIONS; partitionNum++) + { + LWLockAcquire(LockHashPartitionLockByIndex(partitionNum), LW_SHARED); + } + + /* build list of starting procs */ + for (curBackend = 0; curBackend < MaxBackends; curBackend++) + { + TmgmtBackendData *CurBackendData = &TmgmtShmemControl->sessions[curBackend]; + PGPROC *curProc; + + curProc = &ProcGlobal->allProcs[curBackend]; + + /* Skip if unused (FIXME: prepared xacts!) */ + if (curProc->pid == 0) + { + continue; + } + + /* Skip if not a distributed transaction. */ + if (CurBackendData->transactionId.transactionId == 0) + { + continue; + } + + /* Skip if not not originating from interesting node */ + if (sourceNodeId != CurBackendData->transactionId.nodeId) + { + continue; + } + + /* Skip if not blocked */ + if (curProc->links.next == NULL || curProc->waitLock == NULL) + { + continue; + } + + todoList = lappend(todoList, curProc); + } + + + while (todoList) + { + PGPROC *curProc = (PGPROC *) linitial(todoList); + TmgmtBackendData *curBackendData = + &TmgmtShmemControl->sessions[curProc->pgprocno]; + + /* pop from todo list */ + todoList = list_delete_first(todoList); + + /* avoid redundant (potentially cyclic!) iteration */ + if (visited[curProc->pgprocno]) + { + continue; + } + visited[curProc->pgprocno] = true; + + /* FIXME: deal with group locking */ + + /* FIXME: move body to different function */ + if (curProc->links.next != NULL && curProc->waitLock != NULL) + { + LOCK *lock = curProc->waitLock; + SHM_QUEUE *procLocks = &(lock->procLocks); + PROCLOCK *proclock; + int conflictMask; + int numLockModes; + LockMethod lockMethodTable; + + lockMethodTable = GetLocksMethodTable(lock); + numLockModes = lockMethodTable->numLockModes; + conflictMask = lockMethodTable->conflictTab[curProc->waitLockMode]; + + /* + * Record an edge for everyone already holding the lock in a + * conflicting manner ("hard edges" in postgres parlance). + */ + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, lockLink)); + + while (proclock) + { + PGPROC *leader; + TmgmtBackendData *blockingBackendData = NULL; + PGPROC *nextProc; + + nextProc = proclock->tag.myProc; + leader = nextProc->lockGroupLeader == NULL ? nextProc : + nextProc->lockGroupLeader; + + blockingBackendData = &TmgmtShmemControl->sessions[leader->pgprocno]; + + /* A proc never blocks itself or any other lock group member */ + if (leader != curProc) + { + int lockMethodOff; + + /* Have to check conflicts with every locktype held. */ + for (lockMethodOff = 1; lockMethodOff <= numLockModes; + lockMethodOff++) + { + if ((proclock->holdMask & LOCKBIT_ON(lockMethodOff)) && + (conflictMask & LOCKBIT_ON(lockMethodOff))) + { + bool blockingAlsoBlocked = nextProc->links.next != NULL && + nextProc->waitLock != NULL; + + RecordWaitEdge(waitGraph, + localNodeId, blockingAlsoBlocked, + curBackendData, curProc, + blockingBackendData, nextProc); + + if (blockingAlsoBlocked) + { + todoList = lappend(todoList, nextProc); + } + } + } + } + proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, + offsetof(PROCLOCK, lockLink)); + } + + + /* + * Record an edge for everyone in front of us in the wait-queue + * for the lock ("soft edges" in postgres parlance). Postgres can + * re-order the wait-queue, but that seems pretty hard to do in a + * distributed context. + */ + { + PROC_QUEUE *waitQueue = &(lock->waitProcs); + int queueSize = waitQueue->size; + PGPROC *nextProc = (PGPROC *) waitQueue->links.next; + + while (queueSize-- > 0) + { + PGPROC *leader; + + /* done when we reached ourselves */ + if (curProc == nextProc) + { + break; + } + + leader = nextProc->lockGroupLeader == NULL ? nextProc : + nextProc->lockGroupLeader; + + if ((LOCKBIT_ON(leader->waitLockMode) & conflictMask) != 0) + { + bool blockingAlsoBlocked = nextProc->links.next != NULL && + nextProc->waitLock != NULL; + TmgmtBackendData *blockingBackendData = NULL; + + blockingBackendData = + &TmgmtShmemControl->sessions[leader->pgprocno]; + + RecordWaitEdge(waitGraph, + localNodeId, blockingAlsoBlocked, + curBackendData, curProc, + blockingBackendData, nextProc); + + if (blockingAlsoBlocked) + { + todoList = lappend(todoList, nextProc); + } + } + + nextProc = (PGPROC *) nextProc->links.next; + } + } + } + } + + for (partitionNum = 0; partitionNum < NUM_LOCK_PARTITIONS; partitionNum++) + { + LWLockRelease(LockHashPartitionLockByIndex(partitionNum)); + } + + LWLockRelease(&TmgmtShmemControl->lock); + + return waitGraph; +} + + +static WaitEdge * +AllocWaitEdge(WaitGraph *waitGraph) +{ + /* ensure space for new edge */ + if (waitGraph->numAllocated == waitGraph->numUsed) + { + waitGraph->numAllocated *= 2; + waitGraph->edges = (WaitEdge *) + repalloc(waitGraph->edges, sizeof(WaitEdge) * + waitGraph->numAllocated); + } + + return &waitGraph->edges[waitGraph->numUsed++]; +} + + +static int64 +ParseIntField(PGresult *result, int rowIndex, int colIndex, bool *isNull) +{ + if (PQgetisnull(result, rowIndex, colIndex)) + { + if (!isNull) + { + ereport(ERROR, (errmsg("remote field unexpectedly is NULL"))); + } + *isNull = true; + return 0; + } + + if (isNull) + { + *isNull = false; + } + + return pg_strtouint64(PQgetvalue(result, rowIndex, colIndex), NULL, 10); +} + + +static TimestampTz +ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex, bool *isNull) +{ + Datum timestampDatum; + + if (PQgetisnull(result, rowIndex, colIndex)) + { + if (!isNull) + { + ereport(ERROR, (errmsg("remote field unexpectedly is NULL"))); + } + *isNull = true; + return 0; + } + + if (isNull) + { + *isNull = false; + } + + timestampDatum = + DirectFunctionCall3(timestamptz_in, CStringGetDatum(PQgetvalue(result, rowIndex, + colIndex)), + 0, -1); + return DatumGetTimestampTz(timestampDatum); +} + + +static void +ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc = NULL; + Tuplestorestate *tupstore = NULL; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + size_t curEdgeNum; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "set-valued function called in context that cannot accept a set"))); + } + if (!(rsinfo->allowedModes & SFRM_Materialize)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + } + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + { + elog(ERROR, "return type must be a row type"); + } + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); + + /* + * Columns: + * 00: waiting_pid + * 01: waiting_node_id + * 02: waiting_transaction_id + * 03: waiting_transactionstamp + * 04: blocking_pid + * 05: blocking_node_id + * 06: blocking_transaction_id + * 07: blocking_transactionstamp + * 08: blocking_also_blocked + */ + for (curEdgeNum = 0; curEdgeNum < waitGraph->numUsed; curEdgeNum++) + { + Datum values[NUM_DUMP_LOCKS_COLS]; + bool nulls[NUM_DUMP_LOCKS_COLS]; + WaitEdge *curEdge = &waitGraph->edges[curEdgeNum]; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = Int32GetDatum(curEdge->waitingPid); + values[1] = Int32GetDatum(curEdge->waitingNodeId); + if (curEdge->waitingTransactionId != 0) + { + values[2] = Int64GetDatum(curEdge->waitingTransactionId); + values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp); + } + else + { + nulls[2] = true; + nulls[3] = true; + } + + values[4] = Int32GetDatum(curEdge->blockingPid); + values[5] = Int32GetDatum(curEdge->blockingNodeId); + if (curEdge->blockingTransactionId != 0) + { + values[6] = Int64GetDatum(curEdge->blockingTransactionId); + values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp); + } + else + { + nulls[6] = true; + nulls[7] = true; + } + values[8] = BoolGetDatum(curEdge->blockingAlsoBlocked); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); +} + + +static void +WaitEdgeFromResult(WaitEdge *edge, PGresult *result, size_t rowIndex) +{ + bool isNull = false; + + /* + * Remote Columns: + * 00: waiting_pid + * 01: waiting_node_id + * 02: waiting_transaction_id + * 03: waiting_transactionstamp + * 04: blocking_pid + * 05: blocking_node_id + * 06: blocking_transaction_id + * 07: blocking_transactionstamp + * 08: blocking_also_blocked + */ + edge->waitingPid = ParseIntField(result, rowIndex, 0, NULL); + edge->waitingNodeId = ParseIntField(result, rowIndex, 1, NULL); + edge->waitingTransactionId = ParseIntField(result, rowIndex, 2, &isNull); + if (isNull) + { + edge->waitingTransactionId = 0; + } + edge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3, &isNull); + if (isNull) + { + edge->waitingTransactionStamp = 0; + } + + edge->blockingPid = ParseIntField(result, rowIndex, 4, NULL); + edge->blockingNodeId = ParseIntField(result, rowIndex, 5, &isNull); + edge->blockingTransactionId = ParseIntField(result, rowIndex, 6, &isNull); + if (isNull) + { + edge->blockingTransactionId = 0; + } + edge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7, &isNull); + if (isNull) + { + edge->blockingTransactionStamp = 0; + } + edge->blockingAlsoBlocked = ParseIntField(result, rowIndex, 8, &isNull) != 0; +} + + +static void +LoadRemoteEdges(WaitGraph *waitGraph) +{ + List *workerNodeList = ActiveWorkerNodeList(); + ListCell *workerNodeCell = NULL; + char *nodeUser = CitusExtensionOwnerName(); + List *connectionList = NIL; + ListCell *connectionCell = NULL; + + + /* open connections in parallel */ + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + MultiConnection *connection = NULL; + int connectionFlags = 0; + + connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + nodeUser, NULL); + + connectionList = lappend(connectionList, connection); + } + + /* finish opening connections */ + FinishConnectionListEstablishment(connectionList); + + /* send commands in parallel */ + forboth(workerNodeCell, workerNodeList, connectionCell, connectionList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + int querySent = false; + char *command = NULL; + const char *params[2]; + + + params[0] = psprintf("%d", 0); /* FIXME: better id for master */ + params[1] = psprintf("%d", workerNode->nodeId); + command = "SELECT * FROM dump_local_wait_edges($1, $2)"; + + querySent = SendRemoteCommandParams(connection, command, 2, + NULL, params); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + } + + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + PGresult *result = NULL; + bool raiseInterrupts = true; + int64 rowIndex = 0; + int64 rowCount = 0; + int64 colCount = 0; + + result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + +#define NUM_DUMP_LOCKS_COLS 9 + rowCount = PQntuples(result); + colCount = PQnfields(result); + + if (colCount != NUM_DUMP_LOCKS_COLS) + { + elog(ERROR, "hey mister"); + } + + for (rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + WaitEdge *curEdge = AllocWaitEdge(waitGraph); + + WaitEdgeFromResult(curEdge, result, rowIndex); + } + + PQclear(result); + ForgetResults(connection); + } +} + + +static uint32 +TransactionIdHash(const void *key, Size keysize) +{ + TmgmtTransactionId *entry = (TmgmtTransactionId *) key; + uint32 hash = 0; + + hash = hash_any((unsigned char *) &entry->nodeId, sizeof(int64)); + hash = hash_combine(hash, hash_any((unsigned char *) &entry->transactionId, + sizeof(int64))); + hash = hash_combine(hash, hash_any((unsigned char *) &entry->timestamp, + sizeof(TimestampTz))); + + return hash; +} + + +static int +TransactionIdCompare(const void *a, const void *b, Size keysize) +{ + TmgmtTransactionId *ta = (TmgmtTransactionId *) a; + TmgmtTransactionId *tb = (TmgmtTransactionId *) b; + + /* NB: Not used for sorting, just equality... */ + if (ta->nodeId != tb->nodeId || + ta->transactionId != tb->transactionId || + ta->timestamp != tb->timestamp) + { + return 1; + } + else + { + return 0; + } +} + + +static LockDepNode * +LookupDepNode(HTAB *lockDepNodeHash, TmgmtTransactionId *transactionId) +{ + bool found = false; + LockDepNode *node = (LockDepNode *) + hash_search(lockDepNodeHash, transactionId, HASH_ENTER, &found); + + if (!found) + { + node->deps = NIL; + node->initial_pid = false; + node->visited = false; + } + + return node; +} diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index da5652157..9ba302d93 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -22,6 +22,7 @@ #include "access/xact.h" #include "libpq/pqsignal.h" +#include "distributed/deadlock.h" #include "distributed/maintenanced.h" #include "distributed/metadata_cache.h" #include "postmaster/bgworker.h" @@ -258,6 +259,22 @@ CitusMaintenanceDaemonMain(Datum main_arg) * tasks should do their own time math about whether to re-run checks. */ + { + Datum foundDeadlock; + + StartTransactionCommand(); + foundDeadlock = this_machine_kills_deadlocks(NULL); + CommitTransactionCommand(); + + /* + * Check sooner if we just found a deadlock. + */ + if (foundDeadlock) + { + timeout = 100; + } + } + /* * Wait until timeout, or until somebody wakes us up. */ diff --git a/src/include/distributed/deadlock.h b/src/include/distributed/deadlock.h new file mode 100644 index 000000000..cc8317c3b --- /dev/null +++ b/src/include/distributed/deadlock.h @@ -0,0 +1,15 @@ +/*------------------------------------------------------------------------- + * + * deadlock.h + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef DEADLOCK_H +#define DEADLOCK_H + +extern Datum this_machine_kills_deadlocks(PG_FUNCTION_ARGS); + +#endif /* DEADLOCK_H */ diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 5e30d5868..7ccccfc53 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -111,6 +111,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-2'; ALTER EXTENSION citus UPDATE TO '6.2-3'; ALTER EXTENSION citus UPDATE TO '6.2-4'; ALTER EXTENSION citus UPDATE TO '7.0-1'; +ALTER EXTENSION citus UPDATE TO '7.0-2'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 5f5c40c47..5753e19d9 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -111,6 +111,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-2'; ALTER EXTENSION citus UPDATE TO '6.2-3'; ALTER EXTENSION citus UPDATE TO '6.2-4'; ALTER EXTENSION citus UPDATE TO '7.0-1'; +ALTER EXTENSION citus UPDATE TO '7.0-2'; -- show running version SHOW citus.version; From cd0283f1c07eef19bf3c8fa64194aa7105689589 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sun, 11 Jun 2017 18:39:01 -0700 Subject: [PATCH 3/3] WIP: Allow router to touch multiple shards in a transaction. This primarily for now is to be able to test distributed deadlock detection. Todo: - Fix regression tests properly --- .../executor/multi_router_executor.c | 30 ++------------ .../expected/multi_modifying_xacts.out | 39 +++++++++++-------- .../expected/multi_mx_modifying_xacts.out | 6 +-- 3 files changed, 26 insertions(+), 49 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index ed7e1a2f5..c8a831efc 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -78,8 +78,7 @@ static void AssignInsertTaskShardId(Query *jobQuery, List *taskList); static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); -static List * GetModifyConnections(List *taskPlacementList, bool markCritical, - bool startedInTransaction); +static List * GetModifyConnections(List *taskPlacementList, bool markCritical); static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults); static int64 ExecuteModifyTasks(List *taskList, bool expectResults, @@ -666,8 +665,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult char *queryString = task->queryString; bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC); - bool startedInTransaction = - InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA; if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD) { @@ -706,8 +703,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult * table) and start a transaction (when in a transaction). */ connectionList = GetModifyConnections(taskPlacementList, - taskRequiresTwoPhaseCommit, - startedInTransaction); + taskRequiresTwoPhaseCommit); /* prevent replicas of the same shard from diverging */ AcquireExecutorShardLock(task, operation); @@ -808,7 +804,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult * transaction in progress. */ static List * -GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTransactions) +GetModifyConnections(List *taskPlacementList, bool markCritical) { ListCell *taskPlacementCell = NULL; List *multiConnectionList = NIL; @@ -828,26 +824,6 @@ GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTrans */ multiConnection = StartPlacementConnection(connectionFlags, taskPlacement, NULL); - /* - * If already in a transaction, disallow expanding set of remote - * transactions. That prevents some forms of distributed deadlocks. - */ - if (noNewTransactions) - { - RemoteTransaction *transaction = &multiConnection->remoteTransaction; - - if (EnableDeadlockPrevention && - transaction->transactionState == REMOTE_TRANS_INVALID) - { - ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), - errmsg("no transaction participant matches %s:%d", - taskPlacement->nodeName, taskPlacement->nodePort), - errdetail("Transactions which modify distributed tables " - "may only target nodes affected by the " - "modification command which began the transaction."))); - } - } - if (markCritical) { MarkRemoteTransactionCritical(multiConnection); diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index a03010787..b7c1c9793 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -136,14 +136,15 @@ SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id; BEGIN; INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); -ERROR: no transaction participant matches localhost:57638 -DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. COMMIT; -- unless we disable deadlock prevention BEGIN; SET citus.enable_deadlock_prevention TO off; INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport'); +ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200001" +DETAIL: Key (lab_id, name)=(6, Leslie Lamport) already exists. +CONTEXT: while executing command on localhost:57638 ABORT; -- SELECTs may occur after a modification: First check that selecting -- from the modified node works. @@ -152,7 +153,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; count ------- - 0 + 1 (1 row) ABORT; @@ -168,7 +169,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs'); SELECT count(*) FROM researchers WHERE lab_id = 6; count ------- - 0 + 1 (1 row) ABORT; @@ -193,9 +194,10 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs':: (2 rows) SELECT * FROM labs WHERE id = 6; - id | name -----+------ -(0 rows) + id | name +----+----------- + 6 | Bell Labs +(1 row) -- COPY can happen after single row INSERT BEGIN; @@ -256,9 +258,10 @@ DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists. COMMIT; -- verify rollback SELECT * FROM researchers WHERE lab_id = 6; - id | lab_id | name -----+--------+------ -(0 rows) + id | lab_id | name +----+--------+---------------- + 9 | 6 | Leslie Lamport +(1 row) SELECT count(*) FROM pg_dist_transaction; count @@ -283,9 +286,10 @@ DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists. COMMIT; -- verify rollback SELECT * FROM researchers WHERE lab_id = 6; - id | lab_id | name -----+--------+------ -(0 rows) + id | lab_id | name +----+--------+---------------- + 9 | 6 | Leslie Lamport +(1 row) SELECT count(*) FROM pg_dist_transaction; count @@ -301,9 +305,10 @@ COMMIT; SELECT * FROM researchers WHERE lab_id = 6; id | lab_id | name ----+--------+---------------------- + 9 | 6 | Leslie Lamport 17 | 6 | 'Bjarne Stroustrup' 18 | 6 | 'Dennis Ritchie' -(2 rows) +(3 rows) -- verify 2pc SELECT count(*) FROM pg_dist_transaction; @@ -360,9 +365,10 @@ ERROR: could not commit transaction on any active node SELECT * FROM researchers WHERE lab_id = 6; id | lab_id | name ----+--------+---------------------- + 9 | 6 | Leslie Lamport 17 | 6 | 'Bjarne Stroustrup' 18 | 6 | 'Dennis Ritchie' -(2 rows) +(3 rows) -- cleanup triggers and the function SELECT * from run_command_on_placements('researchers', 'drop trigger reject_large_researcher_id on %s') @@ -400,7 +406,7 @@ ALTER TABLE labs ADD COLUMN motto text; SELECT master_modify_multiple_shards('DELETE FROM labs'); master_modify_multiple_shards ------------------------------- - 7 + 8 (1 row) ALTER TABLE labs ADD COLUMN score float; @@ -880,7 +886,6 @@ SELECT create_distributed_table('hash_modifying_xacts', 'key'); BEGIN; INSERT INTO hash_modifying_xacts VALUES (1, 1); INSERT INTO reference_modifying_xacts VALUES (10, 10); -ERROR: no transaction participant matches localhost:57638 COMMIT; -- it is allowed when turning off deadlock prevention BEGIN; diff --git a/src/test/regress/expected/multi_mx_modifying_xacts.out b/src/test/regress/expected/multi_mx_modifying_xacts.out index 92d29bf82..59cb523ea 100644 --- a/src/test/regress/expected/multi_mx_modifying_xacts.out +++ b/src/test/regress/expected/multi_mx_modifying_xacts.out @@ -140,8 +140,6 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id; BEGIN; INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport'); -ERROR: no transaction participant matches localhost:57638 -DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. COMMIT; -- have the same test on the other worker node \c - - - :worker_2_port @@ -163,8 +161,6 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id; BEGIN; INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport'); -ERROR: no transaction participant matches localhost:57638 -DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction. COMMIT; -- switch back to the worker node \c - - - :worker_1_port @@ -175,7 +171,7 @@ INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); SELECT count(*) FROM researchers_mx WHERE lab_id = 6; count ------- - 0 + 2 (1 row) ABORT;