mirror of https://github.com/citusdata/citus.git
Rely on condition variable again
parent
8a123fb8e8
commit
b780b42c93
|
@ -48,6 +48,7 @@ typedef struct ConnectionStatsSharedData
|
|||
char *sharedConnectionHashTrancheName;
|
||||
|
||||
LWLock sharedConnectionHashLock;
|
||||
ConditionVariable waitersConditionVariable;
|
||||
} ConnectionStatsSharedData;
|
||||
|
||||
typedef struct SharedConnStatsHashKey
|
||||
|
@ -84,8 +85,6 @@ typedef struct SharedConnStatsHashEntry
|
|||
*/
|
||||
int MaxSharedPoolSize = 0;
|
||||
|
||||
int ConnectionRetryTimout = 120 * MS_PER_SECOND;
|
||||
|
||||
/* the following two structs used for accessing shared memory */
|
||||
static HTAB *SharedConnStatsHash = NULL;
|
||||
static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL;
|
||||
|
@ -226,60 +225,12 @@ GetMaxSharedPoolSize(void)
|
|||
void
|
||||
WaitOrErrorForSharedConnection(const char *hostname, int port)
|
||||
{
|
||||
int retryCount = 0;
|
||||
|
||||
/*
|
||||
* Sleep this amount before retrying, there is not much value retrying too often
|
||||
* as the remote node is too busy. That's the reason we're retrying.
|
||||
*/
|
||||
double sleepTimeoutMsec = 100;
|
||||
|
||||
/* In practice, 0 disables the retry logic */
|
||||
int allowedRetryCount = ConnectionRetryTimout / sleepTimeoutMsec;
|
||||
|
||||
while (!TryToIncrementSharedConnectionCounter(hostname, port))
|
||||
{
|
||||
int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
int rc = WaitLatch(MyLatch, latchFlags, (long) sleepTimeoutMsec,
|
||||
PG_WAIT_EXTENSION);
|
||||
|
||||
/* emergency bailout if postmaster has died */
|
||||
if (rc & WL_POSTMASTER_DEATH)
|
||||
{
|
||||
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
|
||||
}
|
||||
else if (rc & WL_LATCH_SET)
|
||||
{
|
||||
ResetLatch(MyLatch);
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
if (IsHoldOffCancellationReceived())
|
||||
{
|
||||
/* in case the interrupts are hold, we still want to cancel */
|
||||
ereport(ERROR, (errmsg("canceling statement due to user request")));
|
||||
}
|
||||
}
|
||||
else if (rc & WL_TIMEOUT)
|
||||
{
|
||||
++retryCount;
|
||||
if (allowedRetryCount <= retryCount)
|
||||
{
|
||||
ereport(ERROR, (errmsg("citus.max_shared_pool_size number of connections "
|
||||
"are already established to the node %s:%d,"
|
||||
"so cannot establish any more connections",
|
||||
hostname, port),
|
||||
errhint("consider increasing "
|
||||
"citus.max_shared_pool_size or "
|
||||
"citus.connection_retry_timeout")));
|
||||
WaitForSharedConnection();
|
||||
}
|
||||
|
||||
ereport(DEBUG4, (errmsg("connection to node %s:%d is retried for %d times",
|
||||
hostname, port, retryCount)));
|
||||
}
|
||||
}
|
||||
ConditionVariableCancelSleep();
|
||||
}
|
||||
|
||||
|
||||
|
@ -417,6 +368,8 @@ DecrementSharedConnectionCounter(const char *hostname, int port)
|
|||
connectionEntry->connectionCount -= 1;
|
||||
|
||||
UnLockConnectionSharedMemory();
|
||||
|
||||
WakeupWaiterBackendsForSharedConnection();
|
||||
}
|
||||
|
||||
|
||||
|
@ -442,6 +395,19 @@ UnLockConnectionSharedMemory(void)
|
|||
}
|
||||
|
||||
|
||||
void
|
||||
WakeupWaiterBackendsForSharedConnection(void)
|
||||
{
|
||||
ConditionVariableBroadcast(&ConnectionStatsSharedState->waitersConditionVariable);
|
||||
}
|
||||
|
||||
void
|
||||
WaitForSharedConnection(void)
|
||||
{
|
||||
ConditionVariableSleep(&ConnectionStatsSharedState->waitersConditionVariable, PG_WAIT_EXTENSION);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InitializeSharedConnectionStats requests the necessary shared memory
|
||||
* from Postgres and sets up the shared memory startup hook.
|
||||
|
@ -523,6 +489,8 @@ SharedConnectionStatsShmemInit(void)
|
|||
|
||||
LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock,
|
||||
ConnectionStatsSharedState->sharedConnectionHashTrancheId);
|
||||
|
||||
ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable);
|
||||
}
|
||||
|
||||
/* allocate hash table */
|
||||
|
|
|
@ -103,7 +103,6 @@ static void NodeConninfoGucAssignHook(const char *newval, void *extra);
|
|||
static const char * MaxSharedPoolSizeGucShowHook(void);
|
||||
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
|
||||
source);
|
||||
static bool ConnectionRetryCheck(int *newval, void **extra, GucSource source);
|
||||
|
||||
/* static variable to hold value of deprecated GUC variable */
|
||||
static bool ExpireCachedShards = false;
|
||||
|
@ -482,18 +481,6 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.connection_retry_timeout",
|
||||
gettext_noop("Sets the time to retry if connection establishement "
|
||||
"fails because of reaching to citus.max_shared_pool_size."),
|
||||
NULL,
|
||||
&ConnectionRetryTimout,
|
||||
120 * MS_PER_SECOND, 0, MS_PER_HOUR,
|
||||
PGC_USERSET,
|
||||
GUC_UNIT_MS,
|
||||
ConnectionRetryCheck, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.expire_cached_shards",
|
||||
gettext_noop("This GUC variable has been deprecated."),
|
||||
|
@ -1609,18 +1596,3 @@ StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
|
|||
#endif
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
ConnectionRetryCheck(int *newval, void **extra, GucSource source)
|
||||
{
|
||||
/* 0 disables connection_retry_timeout, so should be allowed */
|
||||
if (*newval <= NodeConnectionTimeout && *newval != 0)
|
||||
{
|
||||
GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED);
|
||||
GUC_check_errdetail("citus.connection_retry_timeout cannot be smaller than "
|
||||
"citus.node_connection_timeout.");
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -16,6 +16,8 @@ extern int ConnectionRetryTimout;
|
|||
|
||||
|
||||
extern void InitializeSharedConnectionStats(void);
|
||||
extern void WaitForSharedConnection(void);
|
||||
extern void WakeupWaiterBackendsForSharedConnection(void);
|
||||
extern void RemoveAllSharedConnectionEntriesForNode(char *hostname, int port);
|
||||
extern int GetMaxSharedPoolSize(void);
|
||||
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
|
||||
|
|
Loading…
Reference in New Issue