mirror of https://github.com/citusdata/citus.git
Don't add procs multiple times in BuildWaitGraphForSourceNode
parent
734aeebc47
commit
bd6bf29983
|
@ -32,12 +32,14 @@
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PROCStack is a stack of PGPROC pointers used to perform a depth-first search
|
* PROCStack is a stack of PGPROC pointers used to perform a depth-first search
|
||||||
* through the lock graph.
|
* through the lock graph. It also keeps track of which processes have been
|
||||||
|
* added to the stack to avoid visiting the same process multiple times.
|
||||||
*/
|
*/
|
||||||
typedef struct PROCStack
|
typedef struct PROCStack
|
||||||
{
|
{
|
||||||
int procCount;
|
int procCount;
|
||||||
PGPROC **procs;
|
PGPROC **procs;
|
||||||
|
bool *procAdded;
|
||||||
} PROCStack;
|
} PROCStack;
|
||||||
|
|
||||||
|
|
||||||
|
@ -56,6 +58,7 @@ static void AddEdgesForWaitQueue(WaitGraph *waitGraph, PGPROC *waitingProc,
|
||||||
static void AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
|
static void AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
|
||||||
PROCStack *remaining);
|
PROCStack *remaining);
|
||||||
static WaitEdge * AllocWaitEdge(WaitGraph *waitGraph);
|
static WaitEdge * AllocWaitEdge(WaitGraph *waitGraph);
|
||||||
|
static void AddProcToVisit(PROCStack *remaining, PGPROC *proc);
|
||||||
static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc);
|
static bool IsSameLockGroup(PGPROC *leftProc, PGPROC *rightProc);
|
||||||
static bool IsConflictingLockMask(int holdMask, int conflictMask);
|
static bool IsConflictingLockMask(int holdMask, int conflictMask);
|
||||||
|
|
||||||
|
@ -392,11 +395,8 @@ BuildWaitGraphForSourceNode(int sourceNodeId)
|
||||||
{
|
{
|
||||||
WaitGraph *waitGraph = NULL;
|
WaitGraph *waitGraph = NULL;
|
||||||
int curBackend = 0;
|
int curBackend = 0;
|
||||||
bool visitedProcs[MaxBackends];
|
|
||||||
PROCStack remaining;
|
PROCStack remaining;
|
||||||
|
|
||||||
memset(visitedProcs, 0, MaxBackends);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Try hard to avoid allocations while holding lock. Thus we pre-allocate
|
* Try hard to avoid allocations while holding lock. Thus we pre-allocate
|
||||||
* space for locks in large batches - for common scenarios this should be
|
* space for locks in large batches - for common scenarios this should be
|
||||||
|
@ -410,6 +410,7 @@ BuildWaitGraphForSourceNode(int sourceNodeId)
|
||||||
waitGraph->edges = (WaitEdge *) palloc(waitGraph->allocatedSize * sizeof(WaitEdge));
|
waitGraph->edges = (WaitEdge *) palloc(waitGraph->allocatedSize * sizeof(WaitEdge));
|
||||||
|
|
||||||
remaining.procs = (PGPROC **) palloc(sizeof(PGPROC *) * MaxBackends);
|
remaining.procs = (PGPROC **) palloc(sizeof(PGPROC *) * MaxBackends);
|
||||||
|
remaining.procAdded = (bool *) palloc0(sizeof(bool *) * MaxBackends);
|
||||||
remaining.procCount = 0;
|
remaining.procCount = 0;
|
||||||
|
|
||||||
LockLockData();
|
LockLockData();
|
||||||
|
@ -418,8 +419,7 @@ BuildWaitGraphForSourceNode(int sourceNodeId)
|
||||||
* Build lock-graph. We do so by first finding all procs which we are
|
* Build lock-graph. We do so by first finding all procs which we are
|
||||||
* interested in (originating on our source system, and blocked). Once
|
* interested in (originating on our source system, and blocked). Once
|
||||||
* those are collected, do depth first search over all procs blocking
|
* those are collected, do depth first search over all procs blocking
|
||||||
* those. To avoid redundantly visiting procs, keep track of which procs
|
* those.
|
||||||
* already have been visited in a pgproc-indexed visitedProcs[] array.
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/* build list of starting procs */
|
/* build list of starting procs */
|
||||||
|
@ -453,26 +453,13 @@ BuildWaitGraphForSourceNode(int sourceNodeId)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
remaining.procs[remaining.procCount++] = currentProc;
|
AddProcToVisit(&remaining, currentProc);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (remaining.procCount > 0)
|
while (remaining.procCount > 0)
|
||||||
{
|
{
|
||||||
PGPROC *waitingProc = remaining.procs[--remaining.procCount];
|
PGPROC *waitingProc = remaining.procs[--remaining.procCount];
|
||||||
|
|
||||||
/*
|
|
||||||
* We might find a process again if multiple distributed transactions are
|
|
||||||
* waiting for it, but we add all edges on the first visit so we don't need
|
|
||||||
* to visit it again. This also avoids getting into an infinite loop in
|
|
||||||
* case of a local deadlock.
|
|
||||||
*/
|
|
||||||
if (visitedProcs[waitingProc->pgprocno])
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
visitedProcs[waitingProc->pgprocno] = true;
|
|
||||||
|
|
||||||
/* only blocked processes result in wait edges */
|
/* only blocked processes result in wait edges */
|
||||||
if (!IsProcessWaitingForLock(waitingProc))
|
if (!IsProcessWaitingForLock(waitingProc))
|
||||||
{
|
{
|
||||||
|
@ -643,7 +630,7 @@ AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
|
||||||
curEdge->isBlockingXactWaiting = IsProcessWaitingForLock(blockingProc);
|
curEdge->isBlockingXactWaiting = IsProcessWaitingForLock(blockingProc);
|
||||||
if (curEdge->isBlockingXactWaiting)
|
if (curEdge->isBlockingXactWaiting)
|
||||||
{
|
{
|
||||||
remaining->procs[remaining->procCount++] = blockingProc;
|
AddProcToVisit(remaining, blockingProc);
|
||||||
}
|
}
|
||||||
|
|
||||||
curEdge->waitingPid = waitingProc->pid;
|
curEdge->waitingPid = waitingProc->pid;
|
||||||
|
@ -705,6 +692,25 @@ AllocWaitEdge(WaitGraph *waitGraph)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AddProcToVisit adds a process to the stack of processes to visit
|
||||||
|
* in the depth-first search, unless it was already added.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
AddProcToVisit(PROCStack *remaining, PGPROC *proc)
|
||||||
|
{
|
||||||
|
if (remaining->procAdded[proc->pgprocno])
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert(remaining->procCount < MaxBackends);
|
||||||
|
|
||||||
|
remaining->procs[remaining->procCount++] = proc;
|
||||||
|
remaining->procAdded[proc->pgprocno] = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* IsProcessWaitingForLock returns whether a given process is waiting for a lock.
|
* IsProcessWaitingForLock returns whether a given process is waiting for a lock.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue