Revert "Use condition variable instead of sleep"

This reverts commit bc282e8ee53b78cb4c88c6e10ccc00b5da11898a.
preventConflictingFlags
Onder Kalaci 2020-04-01 08:59:46 +02:00
parent 96613c6b2b
commit 3961f4517b
3 changed files with 62 additions and 23 deletions

View File

@ -48,7 +48,6 @@ typedef struct ConnectionStatsSharedData
char *sharedConnectionHashTrancheName;
LWLock sharedConnectionHashLock;
ConditionVariable waitersConditionVariable;
} ConnectionStatsSharedData;
typedef struct SharedConnStatsHashKey
@ -85,6 +84,8 @@ typedef struct SharedConnStatsHashEntry
*/
int MaxSharedPoolSize = 0;
int ConnectionRetryTimout = 120 * MS_PER_SECOND;
/* the following two structs used for accessing shared memory */
static HTAB *SharedConnStatsHash = NULL;
static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL;
@ -225,12 +226,57 @@ GetMaxSharedPoolSize(void)
void
WaitOrErrorForSharedConnection(const char *hostname, int port)
{
int retryCount = 0;
/*
* Sleep this amount before retrying, there is not much value retrying too often
* as the remote node is too busy. That's the reason we're retrying.
*/
double sleepTimeoutMsec = 1000;
/* In practice, 0 disables the retry logic */
int allowedRetryCount = ConnectionRetryTimout / sleepTimeoutMsec;
while (!TryToIncrementSharedConnectionCounter(hostname, port))
{
WaitForSharedConnection();
}
int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
ConditionVariableCancelSleep();
CHECK_FOR_INTERRUPTS();
int rc = WaitLatch(MyLatch, latchFlags, (long) sleepTimeoutMsec,
PG_WAIT_EXTENSION);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
{
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
}
else if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
if (IsHoldOffCancellationReceived())
{
/* in case the interrupts are hold, we still want to cancel */
ereport(ERROR, (errmsg("canceling statement due to user request")));
}
}
else if (rc & WL_TIMEOUT)
{
++retryCount;
if (allowedRetryCount <= retryCount)
{
ereport(ERROR, (errmsg("citus.max_shared_pool_size number of connections "
"are already established to the node %s:%d,"
"so cannot establish any more connections",
hostname, port),
errhint("consider increasing "
"citus.max_shared_pool_size or "
"citus.connection_retry_timeout")));
}
}
}
}
@ -368,8 +414,6 @@ DecrementSharedConnectionCounter(const char *hostname, int port)
connectionEntry->connectionCount -= 1;
UnLockConnectionSharedMemory();
WakeupWaiterBackendsForSharedConnection();
}
@ -395,19 +439,6 @@ UnLockConnectionSharedMemory(void)
}
void
WakeupWaiterBackendsForSharedConnection(void)
{
ConditionVariableBroadcast(&ConnectionStatsSharedState->waitersConditionVariable);
}
void
WaitForSharedConnection(void)
{
ConditionVariableSleep(&ConnectionStatsSharedState->waitersConditionVariable, PG_WAIT_EXTENSION);
}
/*
* InitializeSharedConnectionStats requests the necessary shared memory
* from Postgres and sets up the shared memory startup hook.
@ -489,8 +520,6 @@ SharedConnectionStatsShmemInit(void)
LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock,
ConnectionStatsSharedState->sharedConnectionHashTrancheId);
ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable);
}
/* allocate hash table */

View File

@ -482,6 +482,18 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.connection_retry_timeout",
gettext_noop("Sets the time to retry if connection establishement "
"fails because of reaching to citus.max_shared_pool_size."),
NULL,
&ConnectionRetryTimout,
120 * MS_PER_SECOND, 0, MS_PER_HOUR,
PGC_USERSET,
GUC_UNIT_MS,
ConnectionRetryCheck, NULL, NULL);
DefineCustomBoolVariable(
"citus.expire_cached_shards",
gettext_noop("This GUC variable has been deprecated."),

View File

@ -16,8 +16,6 @@ extern int ConnectionRetryTimout;
extern void InitializeSharedConnectionStats(void);
extern void WaitForSharedConnection(void);
extern void WakeupWaiterBackendsForSharedConnection(void);
extern void RemoveAllSharedConnectionEntriesForNode(char *hostname, int port);
extern int GetMaxSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);