From 593a3be1a31cf364c9df930ddbe74a759b259276 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 27 Mar 2020 09:48:28 +0100 Subject: [PATCH] Properly implement retry timeout --- .../connection/shared_connection_stats.c | 36 ++++++++++++++----- src/backend/distributed/shared_library_init.c | 33 +++++++++++++++-- .../distributed/shared_connection_stats.h | 1 + 3 files changed, 59 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index fa43056ce..e323f52c2 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -25,6 +25,7 @@ #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/shared_connection_stats.h" +#include "distributed/time_constants.h" #include "distributed/tuplestore.h" #include "utils/builtins.h" #include "utils/hashutils.h" @@ -86,12 +87,13 @@ typedef struct SharedConnStatsHashEntry /* * Controlled via a GUC, never access directly, use GetMaxSharedPoolSize(). - * "0" means adjust MaxSharedPoolSize automatically by using 2 * MaxConnections. + * "0" means adjust MaxSharedPoolSize automatically by using MaxConnections. * "-1" means do not apply connection throttling * Anything else means use that number */ int MaxSharedPoolSize = 0; +int ConnectionRetryTimout = 120 * MS_PER_SECOND; /* the following two structs used for accessing shared memory */ static HTAB *SharedConnStatsHash = NULL; @@ -183,7 +185,7 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) /* * GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled * via a GUC. - * "0" means adjust MaxSharedPoolSize automatically by using 2 * MaxConnections + * "0" means adjust MaxSharedPoolSize automatically by using MaxConnections * "-1" means do not apply connection throttling * Anything else means use that number */ @@ -192,7 +194,7 @@ GetMaxSharedPoolSize(void) { if (MaxSharedPoolSize == 0) { - return 2 * MaxConnections; + return MaxConnections; } return MaxSharedPoolSize; @@ -210,16 +212,25 @@ GetMaxSharedPoolSize(void) void WaitOrErrorForSharedConnection(const char *hostname, int port) { - int counter = 0; + int retryCount = 0; + + /* + * Sleep this amount before retrying, there is not much value retrying too often + * as the remote node is too busy. That's the reason we're retrying. + */ + double sleepTimeoutMsec = 1000; + + /* In practice, 0 disables the retry logic */ + int allowedRetryCount = ConnectionRetryTimout / sleepTimeoutMsec; while (!TryToIncrementSharedConnectionCounter(hostname, port)) { int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; - double timeoutMsec = 100.0; CHECK_FOR_INTERRUPTS(); - int rc = WaitLatch(MyLatch, latchFlags, (long) timeoutMsec, PG_WAIT_EXTENSION); + int rc = WaitLatch(MyLatch, latchFlags, (long) sleepTimeoutMsec, + PG_WAIT_EXTENSION); /* emergency bailout if postmaster has died */ if (rc & WL_POSTMASTER_DEATH) @@ -239,15 +250,16 @@ WaitOrErrorForSharedConnection(const char *hostname, int port) } else if (rc & WL_TIMEOUT) { - ++counter; - if (counter == 10) + ++retryCount; + if (allowedRetryCount <= retryCount) { ereport(ERROR, (errmsg("citus.max_shared_pool_size number of connections " "are already established to the node %s:%d," "so cannot establish any more connections", hostname, port), errhint("consider increasing " - "citus.max_shared_pool_size"))); + "citus.max_shared_pool_size or " + "citus.connection_retry_timeout"))); } } } @@ -266,6 +278,12 @@ WaitOrErrorForSharedConnection(const char *hostname, int port) bool TryToIncrementSharedConnectionCounter(const char *hostname, int port) { + if (GetMaxSharedPoolSize() == -1) + { + /* connection throttling disabled */ + return true; + } + bool counterIncremented = false; SharedConnStatsHashKey connKey; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 8371b5886..831380d7d 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -101,6 +101,7 @@ static void NodeConninfoGucAssignHook(const char *newval, void *extra); static const char * MaxSharedPoolSizeGucShowHook(void); static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source); +static bool ConnectionRetryCheck(int *newval, void **extra, GucSource source); /* static variable to hold value of deprecated GUC variable */ static bool ExpireCachedShards = false; @@ -462,6 +463,18 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + + DefineCustomIntVariable( + "citus.connection_retry_timeout", + gettext_noop("Sets the time to retry if connection establishement " + "fails because of reaching to citus.max_shared_pool_size."), + NULL, + &ConnectionRetryTimout, + 120 * MS_PER_SECOND, 0, MS_PER_HOUR, + PGC_USERSET, + GUC_UNIT_MS, + ConnectionRetryCheck, NULL, NULL); + DefineCustomBoolVariable( "citus.expire_cached_shards", gettext_noop("This GUC variable has been deprecated."), @@ -934,8 +947,8 @@ RegisterCitusConfigVariables(void) "citus.max_shared_pool_size", gettext_noop("Sets the maximum number of connections allowed per worker node " "across all the backends from this node. Setting to -1 disables " - "connections throttling. Setting to 0 makes it auto-adjust, meaning" - "2 * max_connections."), + "connections throttling. Setting to 0 makes it auto-adjust, meaning " + "equal to max_connections."), gettext_noop("For single coordinator setups, the value should be adjusted " "to match the max_connections on the remote nodes. For Citus MX, " "the value should be tuned such that it is roughly " @@ -1576,3 +1589,19 @@ StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source) } #endif } + + +static bool +ConnectionRetryCheck(int *newval, void **extra, GucSource source) +{ + /* 0 disables connection_retry_timeout, so should be allowed */ + if (*newval <= NodeConnectionTimeout && *newval != 0) + { + GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED); + GUC_check_errdetail("citus.connection_retry_timeout cannot be smaller than " + "citus.node_connection_timeout."); + return false; + } + + return true; +} diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index c1267d2f9..b2265e678 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -12,6 +12,7 @@ #define SHARED_CONNECTION_STATS_H extern int MaxSharedPoolSize; +extern int ConnectionRetryTimout; extern void InitializeSharedConnectionStats(void);