mirror of https://github.com/citusdata/citus.git
Merge pull request #2514 from citusdata/fix_total_procs
Ensure to use initialized MaxBackendspull/2516/head
commit
cb119f4f73
|
@ -25,6 +25,7 @@
|
|||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/transaction_identifier.h"
|
||||
#include "nodes/execnodes.h"
|
||||
#include "postmaster/autovacuum.h" /* to access autovacuum_max_workers */
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "storage/lwlock.h"
|
||||
|
@ -512,6 +513,7 @@ BackendManagementShmemInit(void)
|
|||
if (!alreadyInitialized)
|
||||
{
|
||||
int backendIndex = 0;
|
||||
int totalProcs = 0;
|
||||
char *trancheName = "Backend Management Tranche";
|
||||
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
|
@ -557,7 +559,8 @@ BackendManagementShmemInit(void)
|
|||
* We also initiate initiatorNodeIdentifier to -1, which can never be
|
||||
* used as a node id.
|
||||
*/
|
||||
for (backendIndex = 0; backendIndex < TotalProcs; ++backendIndex)
|
||||
totalProcs = TotalProcCount();
|
||||
for (backendIndex = 0; backendIndex < totalProcs; ++backendIndex)
|
||||
{
|
||||
backendManagementShmemData->backends[backendIndex].citusBackend.
|
||||
initiatorNodeIdentifier = -1;
|
||||
|
@ -582,14 +585,62 @@ static size_t
|
|||
BackendManagementShmemSize(void)
|
||||
{
|
||||
Size size = 0;
|
||||
int totalProcs = TotalProcCount();
|
||||
|
||||
size = add_size(size, sizeof(BackendManagementShmemData));
|
||||
size = add_size(size, mul_size(sizeof(BackendData), TotalProcs));
|
||||
size = add_size(size, mul_size(sizeof(BackendData), totalProcs));
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TotalProcCount returns the total processes that could run via the current
|
||||
* postgres server. See the details in the function comments.
|
||||
*
|
||||
* There is one thing we should warn the readers. Citus enforces to be loaded
|
||||
* as the first extension in shared_preload_libraries. However, if any other
|
||||
* extension overrides MaxConnections, autovacuum_max_workers or
|
||||
* max_worker_processes, our reasoning in this function may not work as expected.
|
||||
* Given that it is not a usual pattern for extension, we consider Citus' behaviour
|
||||
* good enough for now.
|
||||
*/
|
||||
int
|
||||
TotalProcCount(void)
|
||||
{
|
||||
int maxBackends = 0;
|
||||
int totalProcs = 0;
|
||||
|
||||
#ifdef WIN32
|
||||
|
||||
/* autovacuum_max_workers is not PGDLLIMPORT, so use a high estimate for windows */
|
||||
int estimatedMaxAutovacuumWorkers = 30;
|
||||
maxBackends =
|
||||
MaxConnections + estimatedMaxAutovacuumWorkers + 1 + max_worker_processes;
|
||||
#else
|
||||
|
||||
/*
|
||||
* We're simply imitating Postgrsql's InitializeMaxBackends(). Given that all
|
||||
* the items used here PGC_POSTMASTER, should be safe to access them
|
||||
* anytime during the execution even before InitializeMaxBackends() is called.
|
||||
*/
|
||||
maxBackends = MaxConnections + autovacuum_max_workers + 1 + max_worker_processes;
|
||||
#endif
|
||||
|
||||
/*
|
||||
* We prefer to maintain space for auxiliary procs or preperad transactions in
|
||||
* the backend space because they could be blocking processes and our current
|
||||
* implementation of distributed deadlock detection could process them
|
||||
* as a regular backend. In the future, we could consider chaning deadlock
|
||||
* detection algorithm to ignore auxiliary procs or preperad transactions and
|
||||
* save same space.
|
||||
*/
|
||||
totalProcs = maxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts;
|
||||
|
||||
return totalProcs;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InitializeBackendData initialises MyBackendData to the shared memory segment
|
||||
* belonging to the current backend.
|
||||
|
|
|
@ -389,6 +389,7 @@ BuildLocalWaitGraph(void)
|
|||
WaitGraph *waitGraph = NULL;
|
||||
int curBackend = 0;
|
||||
PROCStack remaining;
|
||||
int totalProcs = TotalProcCount();
|
||||
|
||||
/*
|
||||
* Try hard to avoid allocations while holding lock. Thus we pre-allocate
|
||||
|
@ -398,12 +399,12 @@ BuildLocalWaitGraph(void)
|
|||
*/
|
||||
waitGraph = (WaitGraph *) palloc0(sizeof(WaitGraph));
|
||||
waitGraph->localNodeId = GetLocalGroupId();
|
||||
waitGraph->allocatedSize = TotalProcs * 3;
|
||||
waitGraph->allocatedSize = totalProcs * 3;
|
||||
waitGraph->edgeCount = 0;
|
||||
waitGraph->edges = (WaitEdge *) palloc(waitGraph->allocatedSize * sizeof(WaitEdge));
|
||||
|
||||
remaining.procs = (PGPROC **) palloc(sizeof(PGPROC *) * TotalProcs);
|
||||
remaining.procAdded = (bool *) palloc0(sizeof(bool *) * TotalProcs);
|
||||
remaining.procs = (PGPROC **) palloc(sizeof(PGPROC *) * totalProcs);
|
||||
remaining.procAdded = (bool *) palloc0(sizeof(bool *) * totalProcs);
|
||||
remaining.procCount = 0;
|
||||
|
||||
LockLockData();
|
||||
|
@ -416,7 +417,7 @@ BuildLocalWaitGraph(void)
|
|||
*/
|
||||
|
||||
/* build list of starting procs */
|
||||
for (curBackend = 0; curBackend < TotalProcs; curBackend++)
|
||||
for (curBackend = 0; curBackend < totalProcs; curBackend++)
|
||||
{
|
||||
PGPROC *currentProc = &ProcGlobal->allProcs[curBackend];
|
||||
BackendData currentBackendData;
|
||||
|
@ -762,7 +763,7 @@ AddProcToVisit(PROCStack *remaining, PGPROC *proc)
|
|||
return;
|
||||
}
|
||||
|
||||
Assert(remaining->procCount < TotalProcs);
|
||||
Assert(remaining->procCount < TotalProcCount());
|
||||
|
||||
remaining->procs[remaining->procCount++] = proc;
|
||||
remaining->procAdded[proc->pgprocno] = true;
|
||||
|
|
|
@ -22,9 +22,6 @@
|
|||
#include "storage/s_lock.h"
|
||||
|
||||
|
||||
#define TotalProcs (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts)
|
||||
|
||||
|
||||
/*
|
||||
* CitusInitiatedBackend keeps some information about the backends that are
|
||||
* initiated by Citus.
|
||||
|
@ -58,6 +55,7 @@ typedef struct BackendData
|
|||
|
||||
|
||||
extern void InitializeBackendManagement(void);
|
||||
extern int TotalProcCount(void);
|
||||
extern void InitializeBackendData(void);
|
||||
extern void LockBackendSharedMemory(LWLockMode lockMode);
|
||||
extern void UnlockBackendSharedMemory(void);
|
||||
|
|
Loading…
Reference in New Issue