From b780b42c9354b928b4c11945331bc8307d14a76a Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 1 Apr 2020 10:42:18 +0200 Subject: [PATCH] Rely on condition variable again --- .../connection/shared_connection_stats.c | 74 ++++++------------- src/backend/distributed/shared_library_init.c | 28 ------- .../distributed/shared_connection_stats.h | 2 + 3 files changed, 23 insertions(+), 81 deletions(-) diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index c2628daf6..d58dbcaf1 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -48,6 +48,7 @@ typedef struct ConnectionStatsSharedData char *sharedConnectionHashTrancheName; LWLock sharedConnectionHashLock; + ConditionVariable waitersConditionVariable; } ConnectionStatsSharedData; typedef struct SharedConnStatsHashKey @@ -84,8 +85,6 @@ typedef struct SharedConnStatsHashEntry */ int MaxSharedPoolSize = 0; -int ConnectionRetryTimout = 120 * MS_PER_SECOND; - /* the following two structs used for accessing shared memory */ static HTAB *SharedConnStatsHash = NULL; static ConnectionStatsSharedData *ConnectionStatsSharedState = NULL; @@ -226,60 +225,12 @@ GetMaxSharedPoolSize(void) void WaitOrErrorForSharedConnection(const char *hostname, int port) { - int retryCount = 0; - - /* - * Sleep this amount before retrying, there is not much value retrying too often - * as the remote node is too busy. That's the reason we're retrying. - */ - double sleepTimeoutMsec = 100; - - /* In practice, 0 disables the retry logic */ - int allowedRetryCount = ConnectionRetryTimout / sleepTimeoutMsec; - while (!TryToIncrementSharedConnectionCounter(hostname, port)) { - int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; - - CHECK_FOR_INTERRUPTS(); - - int rc = WaitLatch(MyLatch, latchFlags, (long) sleepTimeoutMsec, - PG_WAIT_EXTENSION); - - /* emergency bailout if postmaster has died */ - if (rc & WL_POSTMASTER_DEATH) - { - ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); - } - else if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); - - if (IsHoldOffCancellationReceived()) - { - /* in case the interrupts are hold, we still want to cancel */ - ereport(ERROR, (errmsg("canceling statement due to user request"))); - } - } - else if (rc & WL_TIMEOUT) - { - ++retryCount; - if (allowedRetryCount <= retryCount) - { - ereport(ERROR, (errmsg("citus.max_shared_pool_size number of connections " - "are already established to the node %s:%d," - "so cannot establish any more connections", - hostname, port), - errhint("consider increasing " - "citus.max_shared_pool_size or " - "citus.connection_retry_timeout"))); - } - - ereport(DEBUG4, (errmsg("connection to node %s:%d is retried for %d times", - hostname, port, retryCount))); - } + WaitForSharedConnection(); } + + ConditionVariableCancelSleep(); } @@ -417,6 +368,8 @@ DecrementSharedConnectionCounter(const char *hostname, int port) connectionEntry->connectionCount -= 1; UnLockConnectionSharedMemory(); + + WakeupWaiterBackendsForSharedConnection(); } @@ -442,6 +395,19 @@ UnLockConnectionSharedMemory(void) } +void +WakeupWaiterBackendsForSharedConnection(void) +{ + ConditionVariableBroadcast(&ConnectionStatsSharedState->waitersConditionVariable); +} + +void +WaitForSharedConnection(void) +{ + ConditionVariableSleep(&ConnectionStatsSharedState->waitersConditionVariable, PG_WAIT_EXTENSION); +} + + /* * InitializeSharedConnectionStats requests the necessary shared memory * from Postgres and sets up the shared memory startup hook. @@ -523,6 +489,8 @@ SharedConnectionStatsShmemInit(void) LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock, ConnectionStatsSharedState->sharedConnectionHashTrancheId); + + ConditionVariableInit(&ConnectionStatsSharedState->waitersConditionVariable); } /* allocate hash table */ diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 7292e1872..cd0864f72 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -103,7 +103,6 @@ static void NodeConninfoGucAssignHook(const char *newval, void *extra); static const char * MaxSharedPoolSizeGucShowHook(void); static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source); -static bool ConnectionRetryCheck(int *newval, void **extra, GucSource source); /* static variable to hold value of deprecated GUC variable */ static bool ExpireCachedShards = false; @@ -482,18 +481,6 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); - - DefineCustomIntVariable( - "citus.connection_retry_timeout", - gettext_noop("Sets the time to retry if connection establishement " - "fails because of reaching to citus.max_shared_pool_size."), - NULL, - &ConnectionRetryTimout, - 120 * MS_PER_SECOND, 0, MS_PER_HOUR, - PGC_USERSET, - GUC_UNIT_MS, - ConnectionRetryCheck, NULL, NULL); - DefineCustomBoolVariable( "citus.expire_cached_shards", gettext_noop("This GUC variable has been deprecated."), @@ -1609,18 +1596,3 @@ StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source) #endif } - -static bool -ConnectionRetryCheck(int *newval, void **extra, GucSource source) -{ - /* 0 disables connection_retry_timeout, so should be allowed */ - if (*newval <= NodeConnectionTimeout && *newval != 0) - { - GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED); - GUC_check_errdetail("citus.connection_retry_timeout cannot be smaller than " - "citus.node_connection_timeout."); - return false; - } - - return true; -} diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 78ea95183..abb13dfe6 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -16,6 +16,8 @@ extern int ConnectionRetryTimout; extern void InitializeSharedConnectionStats(void); +extern void WaitForSharedConnection(void); +extern void WakeupWaiterBackendsForSharedConnection(void); extern void RemoveAllSharedConnectionEntriesForNode(char *hostname, int port); extern int GetMaxSharedPoolSize(void); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);