mirror of https://github.com/citusdata/citus.git
Merge cd0283f1c0
into 58947e0dcf
commit
6d79c087d1
|
@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
|||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
||||
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
|
||||
6.2-1 6.2-2 6.2-3 6.2-4 \
|
||||
7.0-1
|
||||
7.0-1 7.0-2
|
||||
|
||||
# All citus--*.sql files in the source directory
|
||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||
|
@ -141,6 +141,8 @@ $(EXTENSION)--6.2-4.sql: $(EXTENSION)--6.2-3.sql $(EXTENSION)--6.2-3--6.2-4.sql
|
|||
cat $^ > $@
|
||||
$(EXTENSION)--7.0-1.sql: $(EXTENSION)--6.2-4.sql $(EXTENSION)--6.2-4--7.0-1.sql
|
||||
cat $^ > $@
|
||||
$(EXTENSION)--7.0-2.sql: $(EXTENSION)--7.0-1.sql $(EXTENSION)--7.0-1--7.0-2.sql
|
||||
cat $^ > $@
|
||||
|
||||
NO_PGXS = 1
|
||||
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/* citus--7.0-1--7.0-2.sql */
|
||||
|
||||
SET search_path = 'pg_catalog';
|
||||
|
||||
CREATE FUNCTION assign_distributed_transaction_id(originNodeId int8, transactionId int8, transactionStamp timestamptz)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME',$$assign_distributed_transaction_id$$;
|
||||
COMMENT ON FUNCTION assign_distributed_transaction_id(originNodeId int8, transactionId int8, transactionStamp timestamptz)
|
||||
IS 'sets distributed transaction id';
|
||||
|
||||
CREATE FUNCTION pg_catalog.dump_local_wait_edges(
|
||||
IN source_node_id int, IN local_node_id int,
|
||||
OUT waiting_pid int4,
|
||||
OUT waiting_node_id int4, OUT waiting_transaction_id int8, OUT waiting_transaction_stamp timestamptz,
|
||||
OUT blocking_pid int,
|
||||
OUT blocking_nodeid int4, OUT blocking_transactionid int8, OUT blocking_transactionstamp timestamptz,
|
||||
OUT blocking_also_blocked bool)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE 'c'
|
||||
STRICT
|
||||
AS $$citus$$, $$dump_local_wait_edges$$;
|
||||
|
||||
CREATE FUNCTION pg_catalog.dump_all_wait_edges(
|
||||
OUT pid int4,
|
||||
OUT nodeId int8, OUT transactionId int8, OUT transactionStamp timestamptz,
|
||||
OUT blocked_pid int,
|
||||
OUT blocked_nodeid int8, OUT blocked_transactionid int8, OUT blocked_transactionstamp timestamptz,
|
||||
OUT blocked_blocked bool)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE 'c'
|
||||
STRICT
|
||||
AS $$citus$$, $$dump_all_wait_edges$$;
|
||||
|
||||
|
||||
CREATE FUNCTION pg_catalog.this_machine_kills_deadlocks()
|
||||
RETURNS bool
|
||||
LANGUAGE 'c'
|
||||
STRICT
|
||||
AS $$citus$$, $$this_machine_kills_deadlocks$$;
|
||||
|
||||
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
|||
# Citus extension
|
||||
comment = 'Citus distributed database'
|
||||
default_version = '7.0-1'
|
||||
default_version = '7.0-2'
|
||||
module_pathname = '$libdir/citus'
|
||||
relocatable = false
|
||||
schema = pg_catalog
|
||||
|
|
|
@ -78,8 +78,7 @@ static void AssignInsertTaskShardId(Query *jobQuery, List *taskList);
|
|||
static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
|
||||
bool expectResults);
|
||||
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
|
||||
static List * GetModifyConnections(List *taskPlacementList, bool markCritical,
|
||||
bool startedInTransaction);
|
||||
static List * GetModifyConnections(List *taskPlacementList, bool markCritical);
|
||||
static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
|
||||
bool isModificationQuery, bool expectResults);
|
||||
static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
|
||||
|
@ -666,8 +665,6 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
|
|||
|
||||
char *queryString = task->queryString;
|
||||
bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC);
|
||||
bool startedInTransaction =
|
||||
InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA;
|
||||
|
||||
if (XactModificationLevel == XACT_MODIFICATION_MULTI_SHARD)
|
||||
{
|
||||
|
@ -706,8 +703,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
|
|||
* table) and start a transaction (when in a transaction).
|
||||
*/
|
||||
connectionList = GetModifyConnections(taskPlacementList,
|
||||
taskRequiresTwoPhaseCommit,
|
||||
startedInTransaction);
|
||||
taskRequiresTwoPhaseCommit);
|
||||
|
||||
/* prevent replicas of the same shard from diverging */
|
||||
AcquireExecutorShardLock(task, operation);
|
||||
|
@ -808,7 +804,7 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
|
|||
* transaction in progress.
|
||||
*/
|
||||
static List *
|
||||
GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTransactions)
|
||||
GetModifyConnections(List *taskPlacementList, bool markCritical)
|
||||
{
|
||||
ListCell *taskPlacementCell = NULL;
|
||||
List *multiConnectionList = NIL;
|
||||
|
@ -828,26 +824,6 @@ GetModifyConnections(List *taskPlacementList, bool markCritical, bool noNewTrans
|
|||
*/
|
||||
multiConnection = StartPlacementConnection(connectionFlags, taskPlacement, NULL);
|
||||
|
||||
/*
|
||||
* If already in a transaction, disallow expanding set of remote
|
||||
* transactions. That prevents some forms of distributed deadlocks.
|
||||
*/
|
||||
if (noNewTransactions)
|
||||
{
|
||||
RemoteTransaction *transaction = &multiConnection->remoteTransaction;
|
||||
|
||||
if (EnableDeadlockPrevention &&
|
||||
transaction->transactionState == REMOTE_TRANS_INVALID)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
|
||||
errmsg("no transaction participant matches %s:%d",
|
||||
taskPlacement->nodeName, taskPlacement->nodePort),
|
||||
errdetail("Transactions which modify distributed tables "
|
||||
"may only target nodes affected by the "
|
||||
"modification command which began the transaction.")));
|
||||
}
|
||||
}
|
||||
|
||||
if (markCritical)
|
||||
{
|
||||
MarkRemoteTransactionCritical(multiConnection);
|
||||
|
|
|
@ -197,6 +197,7 @@ void
|
|||
StartupCitusBackend(void)
|
||||
{
|
||||
InitializeMaintenanceDaemonBackend();
|
||||
InitializeTransactionManagementBackend();
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,926 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* deadlock.c
|
||||
*
|
||||
* Basic implementation of distributed deadlock detection. The algorithm used
|
||||
* is quite naive, in the hope that that's sufficient. The approach basically is:
|
||||
*
|
||||
* 1) regularly poll all workers if they've any sessions blocked longer than
|
||||
* deadlock_timeout * fuzz_factor. This avoids triggering more expensive
|
||||
* work if there's no potential for deadlocks. (currently unimplemented)
|
||||
*
|
||||
* 2) If 1) finds there's potential for deadlock, query each worker node for a
|
||||
* wait-graphs for transactions originating on the corresponding
|
||||
* coordinator. This graph can include non-distributed transactions,
|
||||
* transactions originating on another systems, etc., just not as the
|
||||
* "starting points". All the nodes are identified by their distributed
|
||||
* transaction id (inventing one for non-distributed transactions).
|
||||
*
|
||||
* 3) The coordinator combines the wait-graphs from each worker, and builds a
|
||||
* wait-graph spanning all of these.
|
||||
*
|
||||
* 4) Perform cycle detection for each distributed transaction started on the
|
||||
* local node. The algorithm used is a simple graph traversal that finds a
|
||||
* deadlock when the starting node is reached again. As already visited
|
||||
* nodes are skipped, this is O(edges) for a single distributed
|
||||
* transaction. Meaning the total cost is O(edges *
|
||||
* distributed_transactions), where distributed_transactions is limited by
|
||||
* max_connections. The cost can be limited by only considering
|
||||
* transactions that have been running for longer than deadlock_timeout
|
||||
* (unimplemented).
|
||||
* If necessary this could be implemented in O(transactions + edges).
|
||||
*
|
||||
* Copyright (c) 2017, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "funcapi.h"
|
||||
#include "libpq-fe.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "access/hash.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "storage/proc.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/hsearch.h"
|
||||
|
||||
typedef struct LockDepNode
|
||||
{
|
||||
TmgmtTransactionId transactionId;
|
||||
List *deps;
|
||||
int initial_pid;
|
||||
bool visited;
|
||||
} LockDepNode;
|
||||
|
||||
/*
|
||||
* Describes an edge in a waiting-for graph of locks. This isn't used for
|
||||
* deadlock-checking itself, but to gather the information necessary to do so.
|
||||
* It can be acquired locally, or from a remote node.
|
||||
*
|
||||
* The datatypes here are a bit looser than strictly necessary, because
|
||||
* they're transported as the return type from an SQL function.
|
||||
*/
|
||||
typedef struct WaitEdge
|
||||
{
|
||||
int waitingPid;
|
||||
int waitingNodeId;
|
||||
int64 waitingTransactionId;
|
||||
TimestampTz waitingTransactionStamp;
|
||||
|
||||
int blockingPid;
|
||||
int blockingNodeId;
|
||||
int64 blockingTransactionId;
|
||||
TimestampTz blockingTransactionStamp;
|
||||
bool blockingAlsoBlocked;
|
||||
} WaitEdge;
|
||||
|
||||
/* FIXME: better name? More adjacency list than graph... */
|
||||
typedef struct WaitGraph
|
||||
{
|
||||
size_t numAllocated;
|
||||
size_t numUsed;
|
||||
WaitEdge *edges;
|
||||
} WaitGraph;
|
||||
|
||||
#define NUM_DUMP_LOCKS_COLS 9
|
||||
|
||||
/*
|
||||
* Columns:
|
||||
* 00: waiting_pid
|
||||
* 01: waiting_node_id
|
||||
* 02: waiting_transaction_id
|
||||
* 03: waiting_transactionstamp
|
||||
* 04: blocking_pid
|
||||
* 05: blocking_node_id
|
||||
* 06: blocking_transaction_id
|
||||
* 07: blocking_transactionstamp
|
||||
* 08: blocking_also_blocked
|
||||
*/
|
||||
|
||||
static WaitGraph * BuildLocalWaitForGraph(int sourceNodeId, int localNodeId);
|
||||
static WaitEdge * AllocWaitEdge(WaitGraph *waitGraph);
|
||||
|
||||
static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
|
||||
static void LoadRemoteEdges(WaitGraph *waitGraph);
|
||||
static void WaitEdgeFromResult(WaitEdge *edge, PGresult *result, size_t rowIndex);
|
||||
static int64 ParseIntField(PGresult *result, int rowIndex, int colIndex, bool *isNull);
|
||||
static TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex,
|
||||
bool *isNull);
|
||||
|
||||
static uint32 TransactionIdHash(const void *key, Size keysize);
|
||||
static int TransactionIdCompare(const void *a, const void *b, Size keysize);
|
||||
static LockDepNode * LookupDepNode(HTAB *lockDepNodeHash,
|
||||
TmgmtTransactionId *transactionId);
|
||||
|
||||
PG_FUNCTION_INFO_V1(this_machine_kills_deadlocks);
|
||||
PG_FUNCTION_INFO_V1(dump_local_wait_edges);
|
||||
PG_FUNCTION_INFO_V1(dump_all_wait_edges);
|
||||
|
||||
|
||||
Datum
|
||||
this_machine_kills_deadlocks(PG_FUNCTION_ARGS)
|
||||
{
|
||||
WaitGraph *waitGraph = NULL;
|
||||
HASHCTL info;
|
||||
uint32 hashFlags = 0;
|
||||
HTAB *lockDepNodeHash = NULL;
|
||||
int localNodeId = 0; /* FIXME: get correct local node id */
|
||||
int curBackend = 0;
|
||||
size_t curEdgeNum;
|
||||
|
||||
/* create (host,port,user,database) -> [connection] hash */
|
||||
memset(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(TmgmtTransactionId);
|
||||
info.entrysize = sizeof(LockDepNode);
|
||||
info.hash = TransactionIdHash;
|
||||
info.match = TransactionIdCompare;
|
||||
info.hcxt = CurrentMemoryContext;
|
||||
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
|
||||
|
||||
lockDepNodeHash = hash_create("deadlock detection",
|
||||
64, &info, hashFlags);
|
||||
|
||||
waitGraph = BuildLocalWaitForGraph(0, 0);
|
||||
LoadRemoteEdges(waitGraph);
|
||||
|
||||
/*
|
||||
* FIXME: should be doable without additional lock now, could just do this
|
||||
* in the hash, after building it?
|
||||
*/
|
||||
LWLockAcquire(&TmgmtShmemControl->lock, LW_SHARED);
|
||||
for (curBackend = 0; curBackend < MaxBackends; curBackend++)
|
||||
{
|
||||
TmgmtBackendData *CurBackendData = &TmgmtShmemControl->sessions[curBackend];
|
||||
PGPROC *curProc;
|
||||
LockDepNode *initialNode = NULL;
|
||||
|
||||
curProc = &ProcGlobal->allProcs[curBackend];
|
||||
|
||||
/* Skip if unused (FIXME: prepared xacts!) */
|
||||
if (curProc->pid == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Skip if not a distributed transaction. */
|
||||
if (CurBackendData->transactionId.transactionId == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Skip if not not originating from interesting node */
|
||||
if (localNodeId != CurBackendData->transactionId.nodeId)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* It'd be nice if we could only start at transactions that are
|
||||
* actually blocked, but we do not know that at this point.
|
||||
*/
|
||||
|
||||
initialNode = LookupDepNode(lockDepNodeHash, &CurBackendData->transactionId);
|
||||
initialNode->initial_pid = curProc->pid;
|
||||
}
|
||||
LWLockRelease(&TmgmtShmemControl->lock);
|
||||
|
||||
/*
|
||||
* Flatten wait edges, which includes wait-edges between processes on each
|
||||
* node, to a graph only including wait-for dependencies between
|
||||
* distributed transactions.
|
||||
*/
|
||||
for (curEdgeNum = 0; curEdgeNum < waitGraph->numUsed; curEdgeNum++)
|
||||
{
|
||||
WaitEdge *curEdge = &waitGraph->edges[curEdgeNum];
|
||||
TmgmtTransactionId from;
|
||||
TmgmtTransactionId to;
|
||||
LockDepNode *fromNode;
|
||||
LockDepNode *toNode;
|
||||
|
||||
from.nodeId = curEdge->waitingNodeId;
|
||||
from.transactionId = curEdge->waitingTransactionId;
|
||||
from.timestamp = curEdge->waitingTransactionStamp;
|
||||
|
||||
to.nodeId = curEdge->blockingNodeId;
|
||||
to.transactionId = curEdge->blockingTransactionId;
|
||||
to.timestamp = curEdge->blockingTransactionStamp;
|
||||
|
||||
fromNode = LookupDepNode(lockDepNodeHash, &from);
|
||||
toNode = LookupDepNode(lockDepNodeHash, &to);
|
||||
|
||||
fromNode->deps = lappend(fromNode->deps, toNode);
|
||||
}
|
||||
|
||||
/* avoids the need need to call hash_seq_term */
|
||||
hash_freeze(lockDepNodeHash);
|
||||
|
||||
/*
|
||||
* Find lock cycles by doing a tree traversal for each node that starts
|
||||
* with one of the local ones.
|
||||
*/
|
||||
{
|
||||
LockDepNode *curNode;
|
||||
HASH_SEQ_STATUS initNodeSeq;
|
||||
|
||||
hash_seq_init(&initNodeSeq, lockDepNodeHash);
|
||||
while ((curNode = (LockDepNode *) hash_seq_search(&initNodeSeq)) != 0)
|
||||
{
|
||||
HASH_SEQ_STATUS resetVisitedSeq;
|
||||
LockDepNode *resetVisitedNode;
|
||||
List *todoList = NIL;
|
||||
|
||||
/* Only start at transactions originating locally and blocked. */
|
||||
if (curNode->initial_pid == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
elog(WARNING, "doing deadlock detection for %d", curNode->initial_pid);
|
||||
|
||||
/* reset visited fields */
|
||||
hash_seq_init(&resetVisitedSeq, lockDepNodeHash);
|
||||
while ((resetVisitedNode = (LockDepNode *) hash_seq_search(
|
||||
&resetVisitedSeq)) != 0)
|
||||
{
|
||||
resetVisitedNode->visited = false;
|
||||
}
|
||||
|
||||
todoList = list_copy(curNode->deps);
|
||||
|
||||
while (todoList)
|
||||
{
|
||||
LockDepNode *visitNode = linitial(todoList);
|
||||
ListCell *recurseCell = NULL;
|
||||
|
||||
todoList = list_delete_first(todoList);
|
||||
|
||||
/* There's a deadlock if we found our original node */
|
||||
if (visitNode == curNode)
|
||||
{
|
||||
elog(WARNING, "found deadlock, killing: %d", curNode->initial_pid);
|
||||
kill(curNode->initial_pid, SIGINT);
|
||||
pg_usleep(100000);
|
||||
kill(curNode->initial_pid, SIGINT);
|
||||
PG_RETURN_BOOL(true);
|
||||
}
|
||||
|
||||
/* don't do redundant work (or cycle in indep. deadlocks) */
|
||||
if (visitNode->visited)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
visitNode->visited = true;
|
||||
|
||||
foreach(recurseCell, visitNode->deps)
|
||||
{
|
||||
todoList = lappend(todoList, lfirst(recurseCell));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
PG_RETURN_BOOL(false);
|
||||
}
|
||||
|
||||
|
||||
Datum
|
||||
dump_local_wait_edges(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int32 sourceNodeId = PG_GETARG_INT32(0);
|
||||
int32 localNodeId = PG_GETARG_INT32(1);
|
||||
WaitGraph *waitGraph = NULL;
|
||||
|
||||
waitGraph = BuildLocalWaitForGraph(sourceNodeId, localNodeId);
|
||||
|
||||
ReturnWaitGraph(waitGraph, fcinfo);
|
||||
|
||||
return (Datum) 0;
|
||||
}
|
||||
|
||||
|
||||
Datum
|
||||
dump_all_wait_edges(PG_FUNCTION_ARGS)
|
||||
{
|
||||
WaitGraph *waitGraph = NULL;
|
||||
|
||||
waitGraph = BuildLocalWaitForGraph(0, 0);
|
||||
LoadRemoteEdges(waitGraph);
|
||||
|
||||
ReturnWaitGraph(waitGraph, fcinfo);
|
||||
|
||||
return (Datum) 0;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
RecordWaitEdge(WaitGraph *waitGraph,
|
||||
int localNodeId, bool blockingAlsoBlocked,
|
||||
TmgmtBackendData *fromData, PGPROC *fromProc,
|
||||
TmgmtBackendData *toData, PGPROC *toProc)
|
||||
{
|
||||
WaitEdge *curEdge = AllocWaitEdge(waitGraph);
|
||||
|
||||
curEdge->waitingPid = fromProc->pid;
|
||||
if (fromData->transactionId.transactionId == 0)
|
||||
{
|
||||
curEdge->waitingNodeId = localNodeId;
|
||||
curEdge->waitingTransactionId = 0;
|
||||
curEdge->waitingTransactionStamp = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
curEdge->waitingNodeId = fromData->transactionId.nodeId;
|
||||
curEdge->waitingTransactionId = fromData->transactionId.transactionId;
|
||||
curEdge->waitingTransactionStamp = fromData->transactionId.timestamp;
|
||||
}
|
||||
|
||||
curEdge->blockingPid = toProc->pid;
|
||||
if (toData->transactionId.transactionId == 0)
|
||||
{
|
||||
curEdge->blockingNodeId = localNodeId;
|
||||
curEdge->blockingTransactionId = 0;
|
||||
curEdge->blockingTransactionStamp = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
curEdge->blockingNodeId = toData->transactionId.nodeId;
|
||||
curEdge->blockingTransactionId = toData->transactionId.transactionId;
|
||||
curEdge->blockingTransactionStamp = toData->transactionId.timestamp;
|
||||
}
|
||||
curEdge->blockingAlsoBlocked = blockingAlsoBlocked;
|
||||
}
|
||||
|
||||
|
||||
static WaitGraph *
|
||||
BuildLocalWaitForGraph(int sourceNodeId, int localNodeId)
|
||||
{
|
||||
WaitGraph *waitGraph = (WaitGraph *) palloc(sizeof(WaitGraph));
|
||||
int curBackend;
|
||||
int partitionNum;
|
||||
List *todoList = NIL;
|
||||
bool *visited = NULL;
|
||||
|
||||
/*
|
||||
* Try hard to avoid allocations while holding lock. Thus we pre-allocate
|
||||
* space for locks in large batches - for common scenarios this should be
|
||||
* more than enough space to build the list of wait edges without a single
|
||||
* allocation.
|
||||
*
|
||||
* FIXME: Better todoList datatype, pg style list is a bad idea, way too
|
||||
* many allocations.
|
||||
*/
|
||||
waitGraph->numAllocated = MaxBackends * 3;
|
||||
waitGraph->numUsed = 0;
|
||||
waitGraph->edges = (WaitEdge *) palloc(waitGraph->numAllocated * sizeof(WaitEdge));
|
||||
|
||||
visited = palloc0(sizeof(bool) * MaxBackends);
|
||||
|
||||
/*
|
||||
* Build lock-graph. We do so by first finding all procs which we are
|
||||
* interested in (originating on our source system, and blocked). Once
|
||||
* those are collected, do depth first search over all procs blocking
|
||||
* those. To avoid redundantly dropping procs, keep track of which procs
|
||||
* already have been visisted in a pgproc indexed visited[] array.
|
||||
*/
|
||||
|
||||
LWLockAcquire(&TmgmtShmemControl->lock, LW_SHARED);
|
||||
for (partitionNum = 0; partitionNum < NUM_LOCK_PARTITIONS; partitionNum++)
|
||||
{
|
||||
LWLockAcquire(LockHashPartitionLockByIndex(partitionNum), LW_SHARED);
|
||||
}
|
||||
|
||||
/* build list of starting procs */
|
||||
for (curBackend = 0; curBackend < MaxBackends; curBackend++)
|
||||
{
|
||||
TmgmtBackendData *CurBackendData = &TmgmtShmemControl->sessions[curBackend];
|
||||
PGPROC *curProc;
|
||||
|
||||
curProc = &ProcGlobal->allProcs[curBackend];
|
||||
|
||||
/* Skip if unused (FIXME: prepared xacts!) */
|
||||
if (curProc->pid == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Skip if not a distributed transaction. */
|
||||
if (CurBackendData->transactionId.transactionId == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Skip if not not originating from interesting node */
|
||||
if (sourceNodeId != CurBackendData->transactionId.nodeId)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Skip if not blocked */
|
||||
if (curProc->links.next == NULL || curProc->waitLock == NULL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
todoList = lappend(todoList, curProc);
|
||||
}
|
||||
|
||||
|
||||
while (todoList)
|
||||
{
|
||||
PGPROC *curProc = (PGPROC *) linitial(todoList);
|
||||
TmgmtBackendData *curBackendData =
|
||||
&TmgmtShmemControl->sessions[curProc->pgprocno];
|
||||
|
||||
/* pop from todo list */
|
||||
todoList = list_delete_first(todoList);
|
||||
|
||||
/* avoid redundant (potentially cyclic!) iteration */
|
||||
if (visited[curProc->pgprocno])
|
||||
{
|
||||
continue;
|
||||
}
|
||||
visited[curProc->pgprocno] = true;
|
||||
|
||||
/* FIXME: deal with group locking */
|
||||
|
||||
/* FIXME: move body to different function */
|
||||
if (curProc->links.next != NULL && curProc->waitLock != NULL)
|
||||
{
|
||||
LOCK *lock = curProc->waitLock;
|
||||
SHM_QUEUE *procLocks = &(lock->procLocks);
|
||||
PROCLOCK *proclock;
|
||||
int conflictMask;
|
||||
int numLockModes;
|
||||
LockMethod lockMethodTable;
|
||||
|
||||
lockMethodTable = GetLocksMethodTable(lock);
|
||||
numLockModes = lockMethodTable->numLockModes;
|
||||
conflictMask = lockMethodTable->conflictTab[curProc->waitLockMode];
|
||||
|
||||
/*
|
||||
* Record an edge for everyone already holding the lock in a
|
||||
* conflicting manner ("hard edges" in postgres parlance).
|
||||
*/
|
||||
proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
|
||||
offsetof(PROCLOCK, lockLink));
|
||||
|
||||
while (proclock)
|
||||
{
|
||||
PGPROC *leader;
|
||||
TmgmtBackendData *blockingBackendData = NULL;
|
||||
PGPROC *nextProc;
|
||||
|
||||
nextProc = proclock->tag.myProc;
|
||||
leader = nextProc->lockGroupLeader == NULL ? nextProc :
|
||||
nextProc->lockGroupLeader;
|
||||
|
||||
blockingBackendData = &TmgmtShmemControl->sessions[leader->pgprocno];
|
||||
|
||||
/* A proc never blocks itself or any other lock group member */
|
||||
if (leader != curProc)
|
||||
{
|
||||
int lockMethodOff;
|
||||
|
||||
/* Have to check conflicts with every locktype held. */
|
||||
for (lockMethodOff = 1; lockMethodOff <= numLockModes;
|
||||
lockMethodOff++)
|
||||
{
|
||||
if ((proclock->holdMask & LOCKBIT_ON(lockMethodOff)) &&
|
||||
(conflictMask & LOCKBIT_ON(lockMethodOff)))
|
||||
{
|
||||
bool blockingAlsoBlocked = nextProc->links.next != NULL &&
|
||||
nextProc->waitLock != NULL;
|
||||
|
||||
RecordWaitEdge(waitGraph,
|
||||
localNodeId, blockingAlsoBlocked,
|
||||
curBackendData, curProc,
|
||||
blockingBackendData, nextProc);
|
||||
|
||||
if (blockingAlsoBlocked)
|
||||
{
|
||||
todoList = lappend(todoList, nextProc);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
|
||||
offsetof(PROCLOCK, lockLink));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Record an edge for everyone in front of us in the wait-queue
|
||||
* for the lock ("soft edges" in postgres parlance). Postgres can
|
||||
* re-order the wait-queue, but that seems pretty hard to do in a
|
||||
* distributed context.
|
||||
*/
|
||||
{
|
||||
PROC_QUEUE *waitQueue = &(lock->waitProcs);
|
||||
int queueSize = waitQueue->size;
|
||||
PGPROC *nextProc = (PGPROC *) waitQueue->links.next;
|
||||
|
||||
while (queueSize-- > 0)
|
||||
{
|
||||
PGPROC *leader;
|
||||
|
||||
/* done when we reached ourselves */
|
||||
if (curProc == nextProc)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
leader = nextProc->lockGroupLeader == NULL ? nextProc :
|
||||
nextProc->lockGroupLeader;
|
||||
|
||||
if ((LOCKBIT_ON(leader->waitLockMode) & conflictMask) != 0)
|
||||
{
|
||||
bool blockingAlsoBlocked = nextProc->links.next != NULL &&
|
||||
nextProc->waitLock != NULL;
|
||||
TmgmtBackendData *blockingBackendData = NULL;
|
||||
|
||||
blockingBackendData =
|
||||
&TmgmtShmemControl->sessions[leader->pgprocno];
|
||||
|
||||
RecordWaitEdge(waitGraph,
|
||||
localNodeId, blockingAlsoBlocked,
|
||||
curBackendData, curProc,
|
||||
blockingBackendData, nextProc);
|
||||
|
||||
if (blockingAlsoBlocked)
|
||||
{
|
||||
todoList = lappend(todoList, nextProc);
|
||||
}
|
||||
}
|
||||
|
||||
nextProc = (PGPROC *) nextProc->links.next;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (partitionNum = 0; partitionNum < NUM_LOCK_PARTITIONS; partitionNum++)
|
||||
{
|
||||
LWLockRelease(LockHashPartitionLockByIndex(partitionNum));
|
||||
}
|
||||
|
||||
LWLockRelease(&TmgmtShmemControl->lock);
|
||||
|
||||
return waitGraph;
|
||||
}
|
||||
|
||||
|
||||
static WaitEdge *
|
||||
AllocWaitEdge(WaitGraph *waitGraph)
|
||||
{
|
||||
/* ensure space for new edge */
|
||||
if (waitGraph->numAllocated == waitGraph->numUsed)
|
||||
{
|
||||
waitGraph->numAllocated *= 2;
|
||||
waitGraph->edges = (WaitEdge *)
|
||||
repalloc(waitGraph->edges, sizeof(WaitEdge) *
|
||||
waitGraph->numAllocated);
|
||||
}
|
||||
|
||||
return &waitGraph->edges[waitGraph->numUsed++];
|
||||
}
|
||||
|
||||
|
||||
static int64
|
||||
ParseIntField(PGresult *result, int rowIndex, int colIndex, bool *isNull)
|
||||
{
|
||||
if (PQgetisnull(result, rowIndex, colIndex))
|
||||
{
|
||||
if (!isNull)
|
||||
{
|
||||
ereport(ERROR, (errmsg("remote field unexpectedly is NULL")));
|
||||
}
|
||||
*isNull = true;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (isNull)
|
||||
{
|
||||
*isNull = false;
|
||||
}
|
||||
|
||||
return pg_strtouint64(PQgetvalue(result, rowIndex, colIndex), NULL, 10);
|
||||
}
|
||||
|
||||
|
||||
static TimestampTz
|
||||
ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex, bool *isNull)
|
||||
{
|
||||
Datum timestampDatum;
|
||||
|
||||
if (PQgetisnull(result, rowIndex, colIndex))
|
||||
{
|
||||
if (!isNull)
|
||||
{
|
||||
ereport(ERROR, (errmsg("remote field unexpectedly is NULL")));
|
||||
}
|
||||
*isNull = true;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (isNull)
|
||||
{
|
||||
*isNull = false;
|
||||
}
|
||||
|
||||
timestampDatum =
|
||||
DirectFunctionCall3(timestamptz_in, CStringGetDatum(PQgetvalue(result, rowIndex,
|
||||
colIndex)),
|
||||
0, -1);
|
||||
return DatumGetTimestampTz(timestampDatum);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
|
||||
{
|
||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||
TupleDesc tupdesc = NULL;
|
||||
Tuplestorestate *tupstore = NULL;
|
||||
MemoryContext per_query_ctx;
|
||||
MemoryContext oldcontext;
|
||||
size_t curEdgeNum;
|
||||
|
||||
/* check to see if caller supports us returning a tuplestore */
|
||||
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg(
|
||||
"set-valued function called in context that cannot accept a set")));
|
||||
}
|
||||
if (!(rsinfo->allowedModes & SFRM_Materialize))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("materialize mode required, but it is not " \
|
||||
"allowed in this context")));
|
||||
}
|
||||
|
||||
/* Build a tuple descriptor for our result type */
|
||||
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
|
||||
{
|
||||
elog(ERROR, "return type must be a row type");
|
||||
}
|
||||
|
||||
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
|
||||
oldcontext = MemoryContextSwitchTo(per_query_ctx);
|
||||
|
||||
tupstore = tuplestore_begin_heap(true, false, work_mem);
|
||||
rsinfo->returnMode = SFRM_Materialize;
|
||||
rsinfo->setResult = tupstore;
|
||||
rsinfo->setDesc = tupdesc;
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
/*
|
||||
* Columns:
|
||||
* 00: waiting_pid
|
||||
* 01: waiting_node_id
|
||||
* 02: waiting_transaction_id
|
||||
* 03: waiting_transactionstamp
|
||||
* 04: blocking_pid
|
||||
* 05: blocking_node_id
|
||||
* 06: blocking_transaction_id
|
||||
* 07: blocking_transactionstamp
|
||||
* 08: blocking_also_blocked
|
||||
*/
|
||||
for (curEdgeNum = 0; curEdgeNum < waitGraph->numUsed; curEdgeNum++)
|
||||
{
|
||||
Datum values[NUM_DUMP_LOCKS_COLS];
|
||||
bool nulls[NUM_DUMP_LOCKS_COLS];
|
||||
WaitEdge *curEdge = &waitGraph->edges[curEdgeNum];
|
||||
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(nulls, 0, sizeof(nulls));
|
||||
|
||||
values[0] = Int32GetDatum(curEdge->waitingPid);
|
||||
values[1] = Int32GetDatum(curEdge->waitingNodeId);
|
||||
if (curEdge->waitingTransactionId != 0)
|
||||
{
|
||||
values[2] = Int64GetDatum(curEdge->waitingTransactionId);
|
||||
values[3] = TimestampTzGetDatum(curEdge->waitingTransactionStamp);
|
||||
}
|
||||
else
|
||||
{
|
||||
nulls[2] = true;
|
||||
nulls[3] = true;
|
||||
}
|
||||
|
||||
values[4] = Int32GetDatum(curEdge->blockingPid);
|
||||
values[5] = Int32GetDatum(curEdge->blockingNodeId);
|
||||
if (curEdge->blockingTransactionId != 0)
|
||||
{
|
||||
values[6] = Int64GetDatum(curEdge->blockingTransactionId);
|
||||
values[7] = TimestampTzGetDatum(curEdge->blockingTransactionStamp);
|
||||
}
|
||||
else
|
||||
{
|
||||
nulls[6] = true;
|
||||
nulls[7] = true;
|
||||
}
|
||||
values[8] = BoolGetDatum(curEdge->blockingAlsoBlocked);
|
||||
|
||||
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
|
||||
}
|
||||
|
||||
/* clean up and return the tuplestore */
|
||||
tuplestore_donestoring(tupstore);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
WaitEdgeFromResult(WaitEdge *edge, PGresult *result, size_t rowIndex)
|
||||
{
|
||||
bool isNull = false;
|
||||
|
||||
/*
|
||||
* Remote Columns:
|
||||
* 00: waiting_pid
|
||||
* 01: waiting_node_id
|
||||
* 02: waiting_transaction_id
|
||||
* 03: waiting_transactionstamp
|
||||
* 04: blocking_pid
|
||||
* 05: blocking_node_id
|
||||
* 06: blocking_transaction_id
|
||||
* 07: blocking_transactionstamp
|
||||
* 08: blocking_also_blocked
|
||||
*/
|
||||
edge->waitingPid = ParseIntField(result, rowIndex, 0, NULL);
|
||||
edge->waitingNodeId = ParseIntField(result, rowIndex, 1, NULL);
|
||||
edge->waitingTransactionId = ParseIntField(result, rowIndex, 2, &isNull);
|
||||
if (isNull)
|
||||
{
|
||||
edge->waitingTransactionId = 0;
|
||||
}
|
||||
edge->waitingTransactionStamp = ParseTimestampTzField(result, rowIndex, 3, &isNull);
|
||||
if (isNull)
|
||||
{
|
||||
edge->waitingTransactionStamp = 0;
|
||||
}
|
||||
|
||||
edge->blockingPid = ParseIntField(result, rowIndex, 4, NULL);
|
||||
edge->blockingNodeId = ParseIntField(result, rowIndex, 5, &isNull);
|
||||
edge->blockingTransactionId = ParseIntField(result, rowIndex, 6, &isNull);
|
||||
if (isNull)
|
||||
{
|
||||
edge->blockingTransactionId = 0;
|
||||
}
|
||||
edge->blockingTransactionStamp = ParseTimestampTzField(result, rowIndex, 7, &isNull);
|
||||
if (isNull)
|
||||
{
|
||||
edge->blockingTransactionStamp = 0;
|
||||
}
|
||||
edge->blockingAlsoBlocked = ParseIntField(result, rowIndex, 8, &isNull) != 0;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
LoadRemoteEdges(WaitGraph *waitGraph)
|
||||
{
|
||||
List *workerNodeList = ActiveWorkerNodeList();
|
||||
ListCell *workerNodeCell = NULL;
|
||||
char *nodeUser = CitusExtensionOwnerName();
|
||||
List *connectionList = NIL;
|
||||
ListCell *connectionCell = NULL;
|
||||
|
||||
|
||||
/* open connections in parallel */
|
||||
foreach(workerNodeCell, workerNodeList)
|
||||
{
|
||||
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
||||
char *nodeName = workerNode->workerName;
|
||||
int nodePort = workerNode->workerPort;
|
||||
MultiConnection *connection = NULL;
|
||||
int connectionFlags = 0;
|
||||
|
||||
connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
|
||||
nodeUser, NULL);
|
||||
|
||||
connectionList = lappend(connectionList, connection);
|
||||
}
|
||||
|
||||
/* finish opening connections */
|
||||
FinishConnectionListEstablishment(connectionList);
|
||||
|
||||
/* send commands in parallel */
|
||||
forboth(workerNodeCell, workerNodeList, connectionCell, connectionList)
|
||||
{
|
||||
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
||||
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
|
||||
int querySent = false;
|
||||
char *command = NULL;
|
||||
const char *params[2];
|
||||
|
||||
|
||||
params[0] = psprintf("%d", 0); /* FIXME: better id for master */
|
||||
params[1] = psprintf("%d", workerNode->nodeId);
|
||||
command = "SELECT * FROM dump_local_wait_edges($1, $2)";
|
||||
|
||||
querySent = SendRemoteCommandParams(connection, command, 2,
|
||||
NULL, params);
|
||||
if (querySent == 0)
|
||||
{
|
||||
ReportConnectionError(connection, ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
|
||||
PGresult *result = NULL;
|
||||
bool raiseInterrupts = true;
|
||||
int64 rowIndex = 0;
|
||||
int64 rowCount = 0;
|
||||
int64 colCount = 0;
|
||||
|
||||
result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||
if (!IsResponseOK(result))
|
||||
{
|
||||
ReportResultError(connection, result, ERROR);
|
||||
}
|
||||
|
||||
#define NUM_DUMP_LOCKS_COLS 9
|
||||
rowCount = PQntuples(result);
|
||||
colCount = PQnfields(result);
|
||||
|
||||
if (colCount != NUM_DUMP_LOCKS_COLS)
|
||||
{
|
||||
elog(ERROR, "hey mister");
|
||||
}
|
||||
|
||||
for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
|
||||
{
|
||||
WaitEdge *curEdge = AllocWaitEdge(waitGraph);
|
||||
|
||||
WaitEdgeFromResult(curEdge, result, rowIndex);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
ForgetResults(connection);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static uint32
|
||||
TransactionIdHash(const void *key, Size keysize)
|
||||
{
|
||||
TmgmtTransactionId *entry = (TmgmtTransactionId *) key;
|
||||
uint32 hash = 0;
|
||||
|
||||
hash = hash_any((unsigned char *) &entry->nodeId, sizeof(int64));
|
||||
hash = hash_combine(hash, hash_any((unsigned char *) &entry->transactionId,
|
||||
sizeof(int64)));
|
||||
hash = hash_combine(hash, hash_any((unsigned char *) &entry->timestamp,
|
||||
sizeof(TimestampTz)));
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
TransactionIdCompare(const void *a, const void *b, Size keysize)
|
||||
{
|
||||
TmgmtTransactionId *ta = (TmgmtTransactionId *) a;
|
||||
TmgmtTransactionId *tb = (TmgmtTransactionId *) b;
|
||||
|
||||
/* NB: Not used for sorting, just equality... */
|
||||
if (ta->nodeId != tb->nodeId ||
|
||||
ta->transactionId != tb->transactionId ||
|
||||
ta->timestamp != tb->timestamp)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static LockDepNode *
|
||||
LookupDepNode(HTAB *lockDepNodeHash, TmgmtTransactionId *transactionId)
|
||||
{
|
||||
bool found = false;
|
||||
LockDepNode *node = (LockDepNode *)
|
||||
hash_search(lockDepNodeHash, transactionId, HASH_ENTER, &found);
|
||||
|
||||
if (!found)
|
||||
{
|
||||
node->deps = NIL;
|
||||
node->initial_pid = false;
|
||||
node->visited = false;
|
||||
}
|
||||
|
||||
return node;
|
||||
}
|
|
@ -70,6 +70,7 @@ FinishRemoteTransactionBegin(struct MultiConnection *connection)
|
|||
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||
PGresult *result = NULL;
|
||||
const bool raiseErrors = true;
|
||||
char *query = NULL;
|
||||
|
||||
Assert(transaction->transactionState == REMOTE_TRANS_STARTING);
|
||||
|
||||
|
@ -91,6 +92,33 @@ FinishRemoteTransactionBegin(struct MultiConnection *connection)
|
|||
{
|
||||
Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS);
|
||||
}
|
||||
|
||||
{
|
||||
const char *tzstr = timestamptz_to_str(
|
||||
MyTmgmtBackendData->transactionId.timestamp);
|
||||
|
||||
query = psprintf("SELECT assign_distributed_transaction_id("
|
||||
UINT64_FORMAT ","UINT64_FORMAT ", '%s')",
|
||||
MyTmgmtBackendData->transactionId.nodeId,
|
||||
MyTmgmtBackendData->transactionId.transactionId,
|
||||
tzstr);
|
||||
}
|
||||
|
||||
/* FIXME: this is a bad idea performancewise */
|
||||
if (!SendRemoteCommand(connection, query))
|
||||
{
|
||||
ReportConnectionError(connection, WARNING);
|
||||
MarkRemoteTransactionFailed(connection, true);
|
||||
}
|
||||
result = GetRemoteCommandResult(connection, raiseErrors);
|
||||
if (!IsResponseOK(result))
|
||||
{
|
||||
ReportResultError(connection, result, WARNING);
|
||||
MarkRemoteTransactionFailed(connection, raiseErrors);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
ForgetResults(connection);
|
||||
}
|
||||
|
||||
|
||||
|
@ -263,7 +291,7 @@ RemoteTransactionCommit(MultiConnection *connection)
|
|||
|
||||
|
||||
/*
|
||||
* StartRemoteTransactionAbort initiates abortin the transaction in a
|
||||
* StartRemoteTransactionAbort initiates aborting the transaction in a
|
||||
* non-blocking manner.
|
||||
*/
|
||||
void
|
||||
|
|
|
@ -21,11 +21,15 @@
|
|||
#include "access/xact.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_shard_transaction.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/proc.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/guc.h"
|
||||
#include "port/atomics.h"
|
||||
|
||||
|
||||
CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
|
||||
|
@ -40,9 +44,6 @@ XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
|
|||
/* list of connections that are part of the current coordinated transaction */
|
||||
dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions);
|
||||
|
||||
|
||||
static bool subXactAbortAttempted = false;
|
||||
|
||||
/*
|
||||
* Should this coordinated transaction use 2PC? Set by
|
||||
* CoordinatedTransactionUse2PC(), e.g. if DDL was issued and
|
||||
|
@ -50,6 +51,10 @@ static bool subXactAbortAttempted = false;
|
|||
*/
|
||||
bool CoordinatedTransactionUses2PC = false;
|
||||
|
||||
|
||||
static bool subXactAbortAttempted = false;
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||
|
||||
/* transaction management functions */
|
||||
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
|
||||
static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
|
||||
|
@ -58,6 +63,39 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction
|
|||
/* remaining functions */
|
||||
static void AdjustMaxPreparedTransactions(void);
|
||||
|
||||
TmgmtShmemControlData *TmgmtShmemControl = NULL;
|
||||
TmgmtBackendData *MyTmgmtBackendData = NULL;
|
||||
|
||||
PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
|
||||
|
||||
Datum
|
||||
assign_distributed_transaction_id(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
Assert(MyTmgmtBackendData);
|
||||
|
||||
/* FIXME: spinlock? */
|
||||
MyTmgmtBackendData->transactionId.nodeId = PG_GETARG_INT64(0);
|
||||
MyTmgmtBackendData->transactionId.transactionId = PG_GETARG_INT64(1);
|
||||
MyTmgmtBackendData->transactionId.timestamp = PG_GETARG_TIMESTAMPTZ(2);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
UnsetDistributedTransactionId(void)
|
||||
{
|
||||
if (MyTmgmtBackendData)
|
||||
{
|
||||
/* FIXME: spinlock? */
|
||||
MyTmgmtBackendData->transactionId.nodeId = 0;
|
||||
MyTmgmtBackendData->transactionId.transactionId = 0;
|
||||
MyTmgmtBackendData->transactionId.timestamp = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* BeginCoordinatedTransaction begins a coordinated transaction. No
|
||||
|
@ -73,6 +111,20 @@ BeginCoordinatedTransaction(void)
|
|||
}
|
||||
|
||||
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
|
||||
|
||||
Assert(MyTmgmtBackendData);
|
||||
|
||||
/* FIXME: Spinlock? Consistency is nice ;) */
|
||||
/* FIXME: determine proper local node id for MX etc */
|
||||
MyTmgmtBackendData->transactionId.nodeId = 0;
|
||||
MyTmgmtBackendData->transactionId.transactionId =
|
||||
pg_atomic_fetch_add_u64(&TmgmtShmemControl->nextTransactionId, 1);
|
||||
MyTmgmtBackendData->transactionId.timestamp = GetCurrentTimestamp();
|
||||
|
||||
elog(DEBUG3, "assigning xact: (%lu, %lu, %lu)",
|
||||
MyTmgmtBackendData->transactionId.nodeId,
|
||||
MyTmgmtBackendData->transactionId.transactionId,
|
||||
MyTmgmtBackendData->transactionId.timestamp);
|
||||
}
|
||||
|
||||
|
||||
|
@ -117,6 +169,59 @@ CoordinatedTransactionUse2PC(void)
|
|||
}
|
||||
|
||||
|
||||
static size_t
|
||||
TmgmtShmemSize(void)
|
||||
{
|
||||
Size size = 0;
|
||||
|
||||
size = add_size(size, sizeof(TmgmtShmemControlData));
|
||||
size = add_size(size, mul_size(sizeof(TmgmtBackendData), MaxBackends));
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
TmgmtShmemInit(void)
|
||||
{
|
||||
bool alreadyInitialized = false;
|
||||
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
|
||||
TmgmtShmemControl =
|
||||
(TmgmtShmemControlData *) ShmemInitStruct("Transaction Management",
|
||||
TmgmtShmemSize(),
|
||||
&alreadyInitialized);
|
||||
if (!alreadyInitialized)
|
||||
{
|
||||
/* initialize lwlock */
|
||||
LWLockTranche *tranche = &TmgmtShmemControl->lockTranche;
|
||||
|
||||
/* start by zeroing out all the memory */
|
||||
memset(TmgmtShmemControl, 0, TmgmtShmemSize());
|
||||
|
||||
TmgmtShmemControl->numSessions = MaxBackends;
|
||||
|
||||
/* initialize lock */
|
||||
TmgmtShmemControl->trancheId = LWLockNewTrancheId();
|
||||
tranche->array_base = &TmgmtShmemControl->lock;
|
||||
tranche->array_stride = sizeof(LWLock);
|
||||
tranche->name = "Distributed Transaction Management";
|
||||
LWLockRegisterTranche(TmgmtShmemControl->trancheId, tranche);
|
||||
LWLockInitialize(&TmgmtShmemControl->lock,
|
||||
TmgmtShmemControl->trancheId);
|
||||
|
||||
pg_atomic_init_u64(&TmgmtShmemControl->nextTransactionId, 7);
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
|
||||
if (prev_shmem_startup_hook != NULL)
|
||||
{
|
||||
prev_shmem_startup_hook();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
InitializeTransactionManagement(void)
|
||||
{
|
||||
|
@ -125,6 +230,26 @@ InitializeTransactionManagement(void)
|
|||
RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL);
|
||||
|
||||
AdjustMaxPreparedTransactions();
|
||||
|
||||
/* allocate shared memory */
|
||||
RequestAddinShmemSpace(TmgmtShmemSize());
|
||||
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = TmgmtShmemInit;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
InitializeTransactionManagementBackend(void)
|
||||
{
|
||||
/* Fill this backend's lock information */
|
||||
LWLockAcquire(&TmgmtShmemControl->lock, LW_EXCLUSIVE);
|
||||
MyTmgmtBackendData = &TmgmtShmemControl->sessions[MyProc->pgprocno];
|
||||
MyTmgmtBackendData->databaseId = MyDatabaseId;
|
||||
|
||||
/* FIXME: get id usable for MX, where multiple nodes can start distributed transactions */
|
||||
MyTmgmtBackendData->transactionId.nodeId = 0;
|
||||
LWLockRelease(&TmgmtShmemControl->lock);
|
||||
}
|
||||
|
||||
|
||||
|
@ -168,6 +293,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
XactModificationLevel = XACT_MODIFICATION_NONE;
|
||||
dlist_init(&InProgressTransactions);
|
||||
CoordinatedTransactionUses2PC = false;
|
||||
|
||||
UnsetDistributedTransactionId();
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -204,6 +331,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
dlist_init(&InProgressTransactions);
|
||||
CoordinatedTransactionUses2PC = false;
|
||||
subXactAbortAttempted = false;
|
||||
|
||||
UnsetDistributedTransactionId();
|
||||
}
|
||||
break;
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
|
||||
#include "access/xact.h"
|
||||
#include "libpq/pqsignal.h"
|
||||
#include "distributed/deadlock.h"
|
||||
#include "distributed/maintenanced.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
|
@ -258,6 +259,22 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
* tasks should do their own time math about whether to re-run checks.
|
||||
*/
|
||||
|
||||
{
|
||||
Datum foundDeadlock;
|
||||
|
||||
StartTransactionCommand();
|
||||
foundDeadlock = this_machine_kills_deadlocks(NULL);
|
||||
CommitTransactionCommand();
|
||||
|
||||
/*
|
||||
* Check sooner if we just found a deadlock.
|
||||
*/
|
||||
if (foundDeadlock)
|
||||
{
|
||||
timeout = 100;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait until timeout, or until somebody wakes us up.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* deadlock.h
|
||||
*
|
||||
* Copyright (c) 2017, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef DEADLOCK_H
|
||||
#define DEADLOCK_H
|
||||
|
||||
extern Datum this_machine_kills_deadlocks(PG_FUNCTION_ARGS);
|
||||
|
||||
#endif /* DEADLOCK_H */
|
|
@ -11,6 +11,9 @@
|
|||
|
||||
#include "lib/ilist.h"
|
||||
|
||||
#include "datatype/timestamp.h"
|
||||
#include "storage/lwlock.h"
|
||||
|
||||
/* describes what kind of modifications have occurred in the current transaction */
|
||||
typedef enum
|
||||
{
|
||||
|
@ -52,6 +55,39 @@ typedef enum
|
|||
COMMIT_PROTOCOL_2PC = 2
|
||||
} CommitProtocolType;
|
||||
|
||||
|
||||
/* FIXME: move to different header? */
|
||||
typedef struct TmgmtTransactionId
|
||||
{
|
||||
uint64 nodeId;
|
||||
uint64 transactionId;
|
||||
TimestampTz timestamp;
|
||||
} TmgmtTransactionId;
|
||||
|
||||
typedef struct TmgmtBackendData
|
||||
{
|
||||
Oid databaseId;
|
||||
TmgmtTransactionId transactionId;
|
||||
} TmgmtBackendData;
|
||||
|
||||
|
||||
typedef struct TmgmtShmemControlData
|
||||
{
|
||||
int trancheId;
|
||||
LWLockTranche lockTranche;
|
||||
LWLock lock;
|
||||
|
||||
int numSessions;
|
||||
|
||||
pg_atomic_uint64 nextTransactionId;
|
||||
|
||||
TmgmtBackendData sessions[FLEXIBLE_ARRAY_MEMBER];
|
||||
} TmgmtShmemControlData;
|
||||
|
||||
extern TmgmtBackendData *MyTmgmtBackendData;
|
||||
extern TmgmtShmemControlData *TmgmtShmemControl;
|
||||
|
||||
|
||||
/* config variable managed via guc.c */
|
||||
extern int MultiShardCommitProtocol;
|
||||
|
||||
|
@ -76,6 +112,7 @@ extern void CoordinatedTransactionUse2PC(void);
|
|||
|
||||
/* initialization function(s) */
|
||||
extern void InitializeTransactionManagement(void);
|
||||
extern void InitializeTransactionManagementBackend(void);
|
||||
|
||||
|
||||
#endif /* TRANSACTION_MANAGMENT_H */
|
||||
|
|
|
@ -111,6 +111,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-2';
|
|||
ALTER EXTENSION citus UPDATE TO '6.2-3';
|
||||
ALTER EXTENSION citus UPDATE TO '6.2-4';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-1';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-2';
|
||||
-- show running version
|
||||
SHOW citus.version;
|
||||
citus.version
|
||||
|
|
|
@ -136,14 +136,15 @@ SELECT * FROM researchers, labs WHERE labs.id = researchers.lab_id;
|
|||
BEGIN;
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
|
||||
ERROR: no transaction participant matches localhost:57638
|
||||
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
|
||||
COMMIT;
|
||||
-- unless we disable deadlock prevention
|
||||
BEGIN;
|
||||
SET citus.enable_deadlock_prevention TO off;
|
||||
INSERT INTO labs VALUES (6, 'Bell Labs');
|
||||
INSERT INTO researchers VALUES (9, 6, 'Leslie Lamport');
|
||||
ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200001"
|
||||
DETAIL: Key (lab_id, name)=(6, Leslie Lamport) already exists.
|
||||
CONTEXT: while executing command on localhost:57638
|
||||
ABORT;
|
||||
-- SELECTs may occur after a modification: First check that selecting
|
||||
-- from the modified node works.
|
||||
|
@ -152,7 +153,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
|
|||
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||
count
|
||||
-------
|
||||
0
|
||||
1
|
||||
(1 row)
|
||||
|
||||
ABORT;
|
||||
|
@ -168,7 +169,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
|
|||
SELECT count(*) FROM researchers WHERE lab_id = 6;
|
||||
count
|
||||
-------
|
||||
0
|
||||
1
|
||||
(1 row)
|
||||
|
||||
ABORT;
|
||||
|
@ -193,9 +194,10 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.labs'::
|
|||
(2 rows)
|
||||
|
||||
SELECT * FROM labs WHERE id = 6;
|
||||
id | name
|
||||
----+------
|
||||
(0 rows)
|
||||
id | name
|
||||
----+-----------
|
||||
6 | Bell Labs
|
||||
(1 row)
|
||||
|
||||
-- COPY can happen after single row INSERT
|
||||
BEGIN;
|
||||
|
@ -256,9 +258,10 @@ DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists.
|
|||
COMMIT;
|
||||
-- verify rollback
|
||||
SELECT * FROM researchers WHERE lab_id = 6;
|
||||
id | lab_id | name
|
||||
----+--------+------
|
||||
(0 rows)
|
||||
id | lab_id | name
|
||||
----+--------+----------------
|
||||
9 | 6 | Leslie Lamport
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_transaction;
|
||||
count
|
||||
|
@ -283,9 +286,10 @@ DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists.
|
|||
COMMIT;
|
||||
-- verify rollback
|
||||
SELECT * FROM researchers WHERE lab_id = 6;
|
||||
id | lab_id | name
|
||||
----+--------+------
|
||||
(0 rows)
|
||||
id | lab_id | name
|
||||
----+--------+----------------
|
||||
9 | 6 | Leslie Lamport
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_transaction;
|
||||
count
|
||||
|
@ -301,9 +305,10 @@ COMMIT;
|
|||
SELECT * FROM researchers WHERE lab_id = 6;
|
||||
id | lab_id | name
|
||||
----+--------+----------------------
|
||||
9 | 6 | Leslie Lamport
|
||||
17 | 6 | 'Bjarne Stroustrup'
|
||||
18 | 6 | 'Dennis Ritchie'
|
||||
(2 rows)
|
||||
(3 rows)
|
||||
|
||||
-- verify 2pc
|
||||
SELECT count(*) FROM pg_dist_transaction;
|
||||
|
@ -360,9 +365,10 @@ ERROR: could not commit transaction on any active node
|
|||
SELECT * FROM researchers WHERE lab_id = 6;
|
||||
id | lab_id | name
|
||||
----+--------+----------------------
|
||||
9 | 6 | Leslie Lamport
|
||||
17 | 6 | 'Bjarne Stroustrup'
|
||||
18 | 6 | 'Dennis Ritchie'
|
||||
(2 rows)
|
||||
(3 rows)
|
||||
|
||||
-- cleanup triggers and the function
|
||||
SELECT * from run_command_on_placements('researchers', 'drop trigger reject_large_researcher_id on %s')
|
||||
|
@ -400,7 +406,7 @@ ALTER TABLE labs ADD COLUMN motto text;
|
|||
SELECT master_modify_multiple_shards('DELETE FROM labs');
|
||||
master_modify_multiple_shards
|
||||
-------------------------------
|
||||
7
|
||||
8
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE labs ADD COLUMN score float;
|
||||
|
@ -880,7 +886,6 @@ SELECT create_distributed_table('hash_modifying_xacts', 'key');
|
|||
BEGIN;
|
||||
INSERT INTO hash_modifying_xacts VALUES (1, 1);
|
||||
INSERT INTO reference_modifying_xacts VALUES (10, 10);
|
||||
ERROR: no transaction participant matches localhost:57638
|
||||
COMMIT;
|
||||
-- it is allowed when turning off deadlock prevention
|
||||
BEGIN;
|
||||
|
|
|
@ -140,8 +140,6 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id;
|
|||
BEGIN;
|
||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
|
||||
ERROR: no transaction participant matches localhost:57638
|
||||
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
|
||||
COMMIT;
|
||||
-- have the same test on the other worker node
|
||||
\c - - - :worker_2_port
|
||||
|
@ -163,8 +161,6 @@ SELECT * FROM researchers_mx, labs_mx WHERE labs_mx.id = researchers_mx.lab_id;
|
|||
BEGIN;
|
||||
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
||||
INSERT INTO researchers_mx VALUES (9, 6, 'Leslie Lamport');
|
||||
ERROR: no transaction participant matches localhost:57638
|
||||
DETAIL: Transactions which modify distributed tables may only target nodes affected by the modification command which began the transaction.
|
||||
COMMIT;
|
||||
-- switch back to the worker node
|
||||
\c - - - :worker_1_port
|
||||
|
@ -175,7 +171,7 @@ INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
|
|||
SELECT count(*) FROM researchers_mx WHERE lab_id = 6;
|
||||
count
|
||||
-------
|
||||
0
|
||||
2
|
||||
(1 row)
|
||||
|
||||
ABORT;
|
||||
|
|
|
@ -111,6 +111,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-2';
|
|||
ALTER EXTENSION citus UPDATE TO '6.2-3';
|
||||
ALTER EXTENSION citus UPDATE TO '6.2-4';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-1';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-2';
|
||||
|
||||
-- show running version
|
||||
SHOW citus.version;
|
||||
|
|
Loading…
Reference in New Issue