diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 080511a7b..aa2b4f359 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -55,6 +55,8 @@ static bool ShouldShutdownConnection(MultiConnection *connection, const int cachedConnectionCount); static void ResetConnection(MultiConnection *connection); static void DefaultCitusNoticeProcessor(void *arg, const char *message); +static void RegisterConnectionCleanup(void); +static void CitusCleanupConnectionsAtExit(int code, Datum arg); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); @@ -347,10 +349,42 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, ResetShardPlacementAssociation(connection); + RegisterConnectionCleanup(); + return connection; } +/* + * RegisterConnectionCleanup cleans up any resources left at the end of the + * session. We prefer to cleanup before shared memory exit to make sure that + * this session properly releases anything hold in the shared memory. + */ +static void +RegisterConnectionCleanup(void) +{ + static bool registeredCleanup = false; + if (registeredCleanup == false) + { + before_shmem_exit(CitusCleanupConnectionsAtExit, 0); + + registeredCleanup = true; + } +} + + +/* + * CitusCleanupConnectionsAtExit is called before_shmem_exit() of the + * backend for the purposes of any clean-up needed. + */ +static void +CitusCleanupConnectionsAtExit(int code, Datum arg) +{ + /* properly close all the cached connections */ + ShutdownAllConnections(); +} + + /* * FindAvailableConnection searches the given list of connections for one that * is not claimed exclusively. diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index e323f52c2..3fb201343 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -47,16 +47,18 @@ typedef struct ConnectionStatsSharedData int sharedConnectionHashTrancheId; char *sharedConnectionHashTrancheName; - /* - * We prefer mutex over LwLocks for two reasons: - * - The operations we perform while holding the lock is very tiny, and - * performance wise, mutex is encouraged by Postgres for such cases - * - We have to acquire the lock "atexit" callback, and LwLocks requires - * MyProc to be avaliable to acquire the lock. However, "atexit", it is - * not guranteed to have MyProc avaliable. On the other hand, "mutex" is - * independent from MyProc. - */ - slock_t mutex; + LWLock sharedConnectionHashLock; + +/* / * */ +/* * We prefer mutex over LwLocks for two reasons: */ +/* * - The operations we perform while holding the lock is very tiny, and */ +/* * performance wise, mutex is encouraged by Postgres for such cases */ +/* * - We have to acquire the lock "atexit" callback, and LwLocks requires */ +/* * MyProc to be avaliable to acquire the lock. However, "atexit", it is */ +/* * not guranteed to have MyProc avaliable. On the other hand, "mutex" is */ +/* * independent from MyProc. */ +/* * / */ +/* slock_t mutex; */ } ConnectionStatsSharedData; typedef struct SharedConnStatsHashKey @@ -105,7 +107,7 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; /* local function declarations */ static void StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); -static void LockConnectionSharedMemory(void); +static void LockConnectionSharedMemory(LWLockMode lockMode); static void UnLockConnectionSharedMemory(void); static void SharedConnectionStatsShmemInit(void); static size_t SharedConnectionStatsShmemSize(void); @@ -156,7 +158,7 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) */ /* we're reading all distributed transactions, prevent new backends */ - LockConnectionSharedMemory(); + LockConnectionSharedMemory(LW_SHARED); HASH_SEQ_STATUS status; SharedConnStatsHashEntry *connectionEntry = NULL; @@ -298,7 +300,7 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) connKey.port = port; connKey.databaseOid = MyDatabaseId; - LockConnectionSharedMemory(); + LockConnectionSharedMemory(LW_EXCLUSIVE); /* * Note that while holding a spinlock, it would not allowed to use HASH_ENTER_NULL @@ -356,6 +358,12 @@ DecrementSharedConnectionCounter(const char *hostname, int port) { SharedConnStatsHashKey connKey; + if (GetMaxSharedPoolSize() == -1) + { + /* connection throttling disabled */ + return; + } + strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); if (strlen(hostname) > MAX_NODE_LENGTH) { @@ -367,7 +375,7 @@ DecrementSharedConnectionCounter(const char *hostname, int port) connKey.port = port; connKey.databaseOid = MyDatabaseId; - LockConnectionSharedMemory(); + LockConnectionSharedMemory(LW_EXCLUSIVE); bool entryFound = false; SharedConnStatsHashEntry *connectionEntry = @@ -387,9 +395,11 @@ DecrementSharedConnectionCounter(const char *hostname, int port) * accessing to the SharedConnStatsHash, which is in the shared memory. */ static void -LockConnectionSharedMemory() +LockConnectionSharedMemory(LWLockMode lockMode) { - SpinLockAcquire(&ConnectionStatsSharedState->mutex); + LWLockAcquire(&ConnectionStatsSharedState->sharedConnectionHashLock, lockMode); + + /* SpinLockAcquire(&ConnectionStatsSharedState->mutex); */ } @@ -400,7 +410,9 @@ LockConnectionSharedMemory() static void UnLockConnectionSharedMemory(void) { - SpinLockRelease(&ConnectionStatsSharedState->mutex); + LWLockRelease(&ConnectionStatsSharedState->sharedConnectionHashLock); + + /* SpinLockRelease(&ConnectionStatsSharedState->mutex); */ } @@ -432,7 +444,8 @@ SharedConnectionStatsShmemSize(void) Size size = 0; size = add_size(size, sizeof(ConnectionStatsSharedData)); - size = add_size(size, mul_size(sizeof(LWLock), MaxWorkerNodesTracked)); + + /* size = add_size(size, mul_size(sizeof(LWLock), MaxWorkerNodesTracked)); */ Size hashSize = hash_estimate_size(MaxWorkerNodesTracked, sizeof(SharedConnStatsHashEntry)); @@ -482,7 +495,10 @@ SharedConnectionStatsShmemInit(void) LWLockRegisterTranche(ConnectionStatsSharedState->sharedConnectionHashTrancheId, ConnectionStatsSharedState->sharedConnectionHashTrancheName); - SpinLockInit(&ConnectionStatsSharedState->mutex); + LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock, + ConnectionStatsSharedState->sharedConnectionHashTrancheId); + + /* SpinLockInit(&ConnectionStatsSharedState->mutex); */ } /* allocate hash table */ diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 831380d7d..3e6ba708e 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -72,6 +72,7 @@ #include "distributed/adaptive_executor.h" #include "port/atomics.h" #include "postmaster/postmaster.h" +#include "storage/ipc.h" #include "optimizer/planner.h" #include "optimizer/paths.h" #include "tcop/tcopprot.h" @@ -88,7 +89,6 @@ static char *CitusVersion = CITUS_VERSION; void _PG_init(void); -static void CitusBackendAtExit(void); static void ResizeStackToMaximumDepth(void); static void multi_log_hook(ErrorData *edata); static void CreateRequiredDirectories(void); @@ -275,8 +275,6 @@ _PG_init(void) InitializeCitusQueryStats(); InitializeSharedConnectionStats(); - atexit(CitusBackendAtExit); - /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) { @@ -286,18 +284,6 @@ _PG_init(void) } -/* - * CitusBackendAtExit is called atexit of the backend for the purposes of - * any clean-up needed. - */ -static void -CitusBackendAtExit(void) -{ - /* properly close all the cached connections */ - ShutdownAllConnections(); -} - - /* * Stack size increase during high memory load may cause unexpected crashes. * With this alloca call, we are increasing stack size explicitly, so that if @@ -956,7 +942,7 @@ RegisterCitusConfigVariables(void) "become overwhelmed by too many incoming connections"), &MaxSharedPoolSize, 0, -1, INT_MAX, - PGC_SIGHUP, + PGC_SIGHUP, /* TODO: is PGC_POSTMASTER more convinient? */ GUC_STANDARD, NULL, NULL, MaxSharedPoolSizeGucShowHook);