Use condition variable instead of sleep

preventConflictingFlags
Onder Kalaci 2020-04-01 08:59:40 +02:00
parent cd4b9fd004
commit 96613c6b2b
3 changed files with 23 additions and 62 deletions

View File

@ -48,6 +48,7 @@ typedef struct ConnectionStatsSharedData
char *sharedConnectionHashTrancheName; char *sharedConnectionHashTrancheName;
LWLock sharedConnectionHashLock; LWLock sharedConnectionHashLock;
ConditionVariable waitersConditionVariable;
} ConnectionStatsSharedData; } ConnectionStatsSharedData;
typedef struct SharedConnStatsHashKey typedef struct SharedConnStatsHashKey
@ -84,8 +85,6 @@ typedef struct SharedConnStatsHashEntry
*/ */
int MaxSharedPoolSize = 0; int MaxSharedPoolSize = 0;
int ConnectionRetryTimout = 120 * MS_PER_SECOND;
/* the following two structs used for accessing shared memory */ /* the following two structs used for accessing shared memory */
static HTAB *SharedConnStatsHash = NULL; static HTAB *SharedConnStatsHash = NULL;
static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL; static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL;
@ -226,57 +225,12 @@ GetMaxSharedPoolSize(void)
void void
WaitOrErrorForSharedConnection(const char *hostname, int port) WaitOrErrorForSharedConnection(const char *hostname, int port)
{ {
int retryCount = 0;
/*
* Sleep this amount before retrying, there is not much value retrying too often
* as the remote node is too busy. That's the reason we're retrying.
*/
double sleepTimeoutMsec = 1000;
/* In practice, 0 disables the retry logic */
int allowedRetryCount = ConnectionRetryTimout / sleepTimeoutMsec;
while (!TryToIncrementSharedConnectionCounter(hostname, port)) while (!TryToIncrementSharedConnectionCounter(hostname, port))
{ {
int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; WaitForSharedConnection();
CHECK_FOR_INTERRUPTS();
int rc = WaitLatch(MyLatch, latchFlags, (long) sleepTimeoutMsec,
PG_WAIT_EXTENSION);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
{
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
}
else if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
if (IsHoldOffCancellationReceived())
{
/* in case the interrupts are hold, we still want to cancel */
ereport(ERROR, (errmsg("canceling statement due to user request")));
}
}
else if (rc & WL_TIMEOUT)
{
++retryCount;
if (allowedRetryCount <= retryCount)
{
ereport(ERROR, (errmsg("citus.max_shared_pool_size number of connections "
"are already established to the node %s:%d,"
"so cannot establish any more connections",
hostname, port),
errhint("consider increasing "
"citus.max_shared_pool_size or "
"citus.connection_retry_timeout")));
}
}
} }
ConditionVariableCancelSleep();
} }
@ -414,6 +368,8 @@ DecrementSharedConnectionCounter(const char *hostname, int port)
connectionEntry->connectionCount -= 1; connectionEntry->connectionCount -= 1;
UnLockConnectionSharedMemory(); UnLockConnectionSharedMemory();
WakeupWaiterBackendsForSharedConnection();
} }
@ -439,6 +395,19 @@ UnLockConnectionSharedMemory(void)
} }
void
WakeupWaiterBackendsForSharedConnection(void)
{
ConditionVariableBroadcast(&ConnectionStatsSharedState->waitersConditionVariable);
}
void
WaitForSharedConnection(void)
{
ConditionVariableSleep(&ConnectionStatsSharedState->waitersConditionVariable, PG_WAIT_EXTENSION);
}
/* /*
* InitializeSharedConnectionStats requests the necessary shared memory * InitializeSharedConnectionStats requests the necessary shared memory
* from Postgres and sets up the shared memory startup hook. * from Postgres and sets up the shared memory startup hook.
@ -520,6 +489,8 @@ SharedConnectionStatsShmemInit(void)
LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock, LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock,
ConnectionStatsSharedState->sharedConnectionHashTrancheId); ConnectionStatsSharedState->sharedConnectionHashTrancheId);
ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable);
} }
/* allocate hash table */ /* allocate hash table */

View File

@ -482,18 +482,6 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.connection_retry_timeout",
gettext_noop("Sets the time to retry if connection establishement "
"fails because of reaching to citus.max_shared_pool_size."),
NULL,
&ConnectionRetryTimout,
120 * MS_PER_SECOND, 0, MS_PER_HOUR,
PGC_USERSET,
GUC_UNIT_MS,
ConnectionRetryCheck, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.expire_cached_shards", "citus.expire_cached_shards",
gettext_noop("This GUC variable has been deprecated."), gettext_noop("This GUC variable has been deprecated."),

View File

@ -16,6 +16,8 @@ extern int ConnectionRetryTimout;
extern void InitializeSharedConnectionStats(void); extern void InitializeSharedConnectionStats(void);
extern void WaitForSharedConnection(void);
extern void WakeupWaiterBackendsForSharedConnection(void);
extern void RemoveAllSharedConnectionEntriesForNode(char *hostname, int port); extern void RemoveAllSharedConnectionEntriesForNode(char *hostname, int port);
extern int GetMaxSharedPoolSize(void); extern int GetMaxSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);