diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index d58dbcaf1..dfe197be3 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -48,7 +48,6 @@ typedef struct ConnectionStatsSharedData char *sharedConnectionHashTrancheName; LWLock sharedConnectionHashLock; - ConditionVariable waitersConditionVariable; } ConnectionStatsSharedData; typedef struct SharedConnStatsHashKey @@ -85,6 +84,8 @@ typedef struct SharedConnStatsHashEntry */ int MaxSharedPoolSize = 0; +int ConnectionRetryTimout = 120 * MS_PER_SECOND; + /* the following two structs used for accessing shared memory */ static HTAB *SharedConnStatsHash = NULL; static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL; @@ -225,12 +226,57 @@ GetMaxSharedPoolSize(void) void WaitOrErrorForSharedConnection(const char *hostname, int port) { + int retryCount = 0; + + /* + * Sleep this amount before retrying, there is not much value retrying too often + * as the remote node is too busy. That's the reason we're retrying. + */ + double sleepTimeoutMsec = 1000; + + /* In practice, 0 disables the retry logic */ + int allowedRetryCount = ConnectionRetryTimout / sleepTimeoutMsec; + while (!TryToIncrementSharedConnectionCounter(hostname, port)) { - WaitForSharedConnection(); - } + int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; - ConditionVariableCancelSleep(); + CHECK_FOR_INTERRUPTS(); + + int rc = WaitLatch(MyLatch, latchFlags, (long) sleepTimeoutMsec, + PG_WAIT_EXTENSION); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + { + ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + } + else if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + + if (IsHoldOffCancellationReceived()) + { + /* in case the interrupts are hold, we still want to cancel */ + ereport(ERROR, (errmsg("canceling statement due to user request"))); + } + } + else if (rc & WL_TIMEOUT) + { + ++retryCount; + if (allowedRetryCount <= retryCount) + { + ereport(ERROR, (errmsg("citus.max_shared_pool_size number of connections " + "are already established to the node %s:%d," + "so cannot establish any more connections", + hostname, port), + errhint("consider increasing " + "citus.max_shared_pool_size or " + "citus.connection_retry_timeout"))); + } + } + } } @@ -368,8 +414,6 @@ DecrementSharedConnectionCounter(const char *hostname, int port) connectionEntry->connectionCount -= 1; UnLockConnectionSharedMemory(); - - WakeupWaiterBackendsForSharedConnection(); } @@ -395,19 +439,6 @@ UnLockConnectionSharedMemory(void) } -void -WakeupWaiterBackendsForSharedConnection(void) -{ - ConditionVariableBroadcast(&ConnectionStatsSharedState->waitersConditionVariable); -} - -void -WaitForSharedConnection(void) -{ - ConditionVariableSleep(&ConnectionStatsSharedState->waitersConditionVariable, PG_WAIT_EXTENSION); -} - - /* * InitializeSharedConnectionStats requests the necessary shared memory * from Postgres and sets up the shared memory startup hook. @@ -489,8 +520,6 @@ SharedConnectionStatsShmemInit(void) LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock, ConnectionStatsSharedState->sharedConnectionHashTrancheId); - - ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable); } /* allocate hash table */ diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index ca0176fac..7292e1872 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -482,6 +482,18 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + + DefineCustomIntVariable( + "citus.connection_retry_timeout", + gettext_noop("Sets the time to retry if connection establishement " + "fails because of reaching to citus.max_shared_pool_size."), + NULL, + &ConnectionRetryTimout, + 120 * MS_PER_SECOND, 0, MS_PER_HOUR, + PGC_USERSET, + GUC_UNIT_MS, + ConnectionRetryCheck, NULL, NULL); + DefineCustomBoolVariable( "citus.expire_cached_shards", gettext_noop("This GUC variable has been deprecated."), diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index abb13dfe6..78ea95183 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -16,8 +16,6 @@ extern int ConnectionRetryTimout; extern void InitializeSharedConnectionStats(void); -extern void WaitForSharedConnection(void); -extern void WakeupWaiterBackendsForSharedConnection(void); extern void RemoveAllSharedConnectionEntriesForNode(char *hostname, int port); extern int GetMaxSharedPoolSize(void); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);