Properly implement retry timeout

preventConflictingFlags
Onder Kalaci 2020-03-27 09:48:28 +01:00
parent bed023c03d
commit 593a3be1a3
3 changed files with 59 additions and 11 deletions

View File

@ -25,6 +25,7 @@
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
#include "distributed/time_constants.h"
#include "distributed/tuplestore.h" #include "distributed/tuplestore.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/hashutils.h" #include "utils/hashutils.h"
@ -86,12 +87,13 @@ typedef struct SharedConnStatsHashEntry
/* /*
* Controlled via a GUC, never access directly, use GetMaxSharedPoolSize(). * Controlled via a GUC, never access directly, use GetMaxSharedPoolSize().
* "0" means adjust MaxSharedPoolSize automatically by using 2 * MaxConnections. * "0" means adjust MaxSharedPoolSize automatically by using MaxConnections.
* "-1" means do not apply connection throttling * "-1" means do not apply connection throttling
* Anything else means use that number * Anything else means use that number
*/ */
int MaxSharedPoolSize = 0; int MaxSharedPoolSize = 0;
int ConnectionRetryTimout = 120 * MS_PER_SECOND;
/* the following two structs used for accessing shared memory */ /* the following two structs used for accessing shared memory */
static HTAB *SharedConnStatsHash = NULL; static HTAB *SharedConnStatsHash = NULL;
@ -183,7 +185,7 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
/* /*
* GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled * GetMaxSharedPoolSize is a wrapper around MaxSharedPoolSize which is controlled
* via a GUC. * via a GUC.
* "0" means adjust MaxSharedPoolSize automatically by using 2 * MaxConnections * "0" means adjust MaxSharedPoolSize automatically by using MaxConnections
* "-1" means do not apply connection throttling * "-1" means do not apply connection throttling
* Anything else means use that number * Anything else means use that number
*/ */
@ -192,7 +194,7 @@ GetMaxSharedPoolSize(void)
{ {
if (MaxSharedPoolSize == 0) if (MaxSharedPoolSize == 0)
{ {
return 2 * MaxConnections; return MaxConnections;
} }
return MaxSharedPoolSize; return MaxSharedPoolSize;
@ -210,16 +212,25 @@ GetMaxSharedPoolSize(void)
void void
WaitOrErrorForSharedConnection(const char *hostname, int port) WaitOrErrorForSharedConnection(const char *hostname, int port)
{ {
int counter = 0; int retryCount = 0;
/*
* Sleep this amount before retrying, there is not much value retrying too often
* as the remote node is too busy. That's the reason we're retrying.
*/
double sleepTimeoutMsec = 1000;
/* In practice, 0 disables the retry logic */
int allowedRetryCount = ConnectionRetryTimout / sleepTimeoutMsec;
while (!TryToIncrementSharedConnectionCounter(hostname, port)) while (!TryToIncrementSharedConnectionCounter(hostname, port))
{ {
int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
double timeoutMsec = 100.0;
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
int rc = WaitLatch(MyLatch, latchFlags, (long) timeoutMsec, PG_WAIT_EXTENSION); int rc = WaitLatch(MyLatch, latchFlags, (long) sleepTimeoutMsec,
PG_WAIT_EXTENSION);
/* emergency bailout if postmaster has died */ /* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH) if (rc & WL_POSTMASTER_DEATH)
@ -239,15 +250,16 @@ WaitOrErrorForSharedConnection(const char *hostname, int port)
} }
else if (rc & WL_TIMEOUT) else if (rc & WL_TIMEOUT)
{ {
++counter; ++retryCount;
if (counter == 10) if (allowedRetryCount <= retryCount)
{ {
ereport(ERROR, (errmsg("citus.max_shared_pool_size number of connections " ereport(ERROR, (errmsg("citus.max_shared_pool_size number of connections "
"are already established to the node %s:%d," "are already established to the node %s:%d,"
"so cannot establish any more connections", "so cannot establish any more connections",
hostname, port), hostname, port),
errhint("consider increasing " errhint("consider increasing "
"citus.max_shared_pool_size"))); "citus.max_shared_pool_size or "
"citus.connection_retry_timeout")));
} }
} }
} }
@ -266,6 +278,12 @@ WaitOrErrorForSharedConnection(const char *hostname, int port)
bool bool
TryToIncrementSharedConnectionCounter(const char *hostname, int port) TryToIncrementSharedConnectionCounter(const char *hostname, int port)
{ {
if (GetMaxSharedPoolSize() == -1)
{
/* connection throttling disabled */
return true;
}
bool counterIncremented = false; bool counterIncremented = false;
SharedConnStatsHashKey connKey; SharedConnStatsHashKey connKey;

View File

@ -101,6 +101,7 @@ static void NodeConninfoGucAssignHook(const char *newval, void *extra);
static const char * MaxSharedPoolSizeGucShowHook(void); static const char * MaxSharedPoolSizeGucShowHook(void);
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
source); source);
static bool ConnectionRetryCheck(int *newval, void **extra, GucSource source);
/* static variable to hold value of deprecated GUC variable */ /* static variable to hold value of deprecated GUC variable */
static bool ExpireCachedShards = false; static bool ExpireCachedShards = false;
@ -462,6 +463,18 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.connection_retry_timeout",
gettext_noop("Sets the time to retry if connection establishement "
"fails because of reaching to citus.max_shared_pool_size."),
NULL,
&ConnectionRetryTimout,
120 * MS_PER_SECOND, 0, MS_PER_HOUR,
PGC_USERSET,
GUC_UNIT_MS,
ConnectionRetryCheck, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.expire_cached_shards", "citus.expire_cached_shards",
gettext_noop("This GUC variable has been deprecated."), gettext_noop("This GUC variable has been deprecated."),
@ -934,8 +947,8 @@ RegisterCitusConfigVariables(void)
"citus.max_shared_pool_size", "citus.max_shared_pool_size",
gettext_noop("Sets the maximum number of connections allowed per worker node " gettext_noop("Sets the maximum number of connections allowed per worker node "
"across all the backends from this node. Setting to -1 disables " "across all the backends from this node. Setting to -1 disables "
"connections throttling. Setting to 0 makes it auto-adjust, meaning" "connections throttling. Setting to 0 makes it auto-adjust, meaning "
"2 * max_connections."), "equal to max_connections."),
gettext_noop("For single coordinator setups, the value should be adjusted " gettext_noop("For single coordinator setups, the value should be adjusted "
"to match the max_connections on the remote nodes. For Citus MX, " "to match the max_connections on the remote nodes. For Citus MX, "
"the value should be tuned such that it is roughly " "the value should be tuned such that it is roughly "
@ -1576,3 +1589,19 @@ StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
} }
#endif #endif
} }
static bool
ConnectionRetryCheck(int *newval, void **extra, GucSource source)
{
/* 0 disables connection_retry_timeout, so should be allowed */
if (*newval <= NodeConnectionTimeout && *newval != 0)
{
GUC_check_errcode(ERRCODE_FEATURE_NOT_SUPPORTED);
GUC_check_errdetail("citus.connection_retry_timeout cannot be smaller than "
"citus.node_connection_timeout.");
return false;
}
return true;
}

View File

@ -12,6 +12,7 @@
#define SHARED_CONNECTION_STATS_H #define SHARED_CONNECTION_STATS_H
extern int MaxSharedPoolSize; extern int MaxSharedPoolSize;
extern int ConnectionRetryTimout;
extern void InitializeSharedConnectionStats(void); extern void InitializeSharedConnectionStats(void);