Use LwLocks again

preventConflictingFlags
Onder Kalaci 2020-03-27 11:20:50 +01:00
parent 593a3be1a3
commit 3d242ff5f9
3 changed files with 71 additions and 35 deletions

View File

@ -55,6 +55,8 @@ static bool ShouldShutdownConnection(MultiConnection *connection, const int
cachedConnectionCount); cachedConnectionCount);
static void ResetConnection(MultiConnection *connection); static void ResetConnection(MultiConnection *connection);
static void DefaultCitusNoticeProcessor(void *arg, const char *message); static void DefaultCitusNoticeProcessor(void *arg, const char *message);
static void RegisterConnectionCleanup(void);
static void CitusCleanupConnectionsAtExit(int code, Datum arg);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
static bool RemoteTransactionIdle(MultiConnection *connection); static bool RemoteTransactionIdle(MultiConnection *connection);
static int EventSetSizeForConnectionList(List *connections); static int EventSetSizeForConnectionList(List *connections);
@ -347,10 +349,42 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
ResetShardPlacementAssociation(connection); ResetShardPlacementAssociation(connection);
RegisterConnectionCleanup();
return connection; return connection;
} }
/*
* RegisterConnectionCleanup cleans up any resources left at the end of the
* session. We prefer to cleanup before shared memory exit to make sure that
* this session properly releases anything hold in the shared memory.
*/
static void
RegisterConnectionCleanup(void)
{
static bool registeredCleanup = false;
if (registeredCleanup == false)
{
before_shmem_exit(CitusCleanupConnectionsAtExit, 0);
registeredCleanup = true;
}
}
/*
* CitusCleanupConnectionsAtExit is called before_shmem_exit() of the
* backend for the purposes of any clean-up needed.
*/
static void
CitusCleanupConnectionsAtExit(int code, Datum arg)
{
/* properly close all the cached connections */
ShutdownAllConnections();
}
/* /*
* FindAvailableConnection searches the given list of connections for one that * FindAvailableConnection searches the given list of connections for one that
* is not claimed exclusively. * is not claimed exclusively.

View File

@ -47,16 +47,18 @@ typedef struct ConnectionStatsSharedData
int sharedConnectionHashTrancheId; int sharedConnectionHashTrancheId;
char *sharedConnectionHashTrancheName; char *sharedConnectionHashTrancheName;
/* LWLock sharedConnectionHashLock;
* We prefer mutex over LwLocks for two reasons:
* - The operations we perform while holding the lock is very tiny, and /* / * */
* performance wise, mutex is encouraged by Postgres for such cases /* * We prefer mutex over LwLocks for two reasons: */
* - We have to acquire the lock "atexit" callback, and LwLocks requires /* * - The operations we perform while holding the lock is very tiny, and */
* MyProc to be avaliable to acquire the lock. However, "atexit", it is /* * performance wise, mutex is encouraged by Postgres for such cases */
* not guranteed to have MyProc avaliable. On the other hand, "mutex" is /* * - We have to acquire the lock "atexit" callback, and LwLocks requires */
* independent from MyProc. /* * MyProc to be avaliable to acquire the lock. However, "atexit", it is */
*/ /* * not guranteed to have MyProc avaliable. On the other hand, "mutex" is */
slock_t mutex; /* * independent from MyProc. */
/* * / */
/* slock_t mutex; */
} ConnectionStatsSharedData; } ConnectionStatsSharedData;
typedef struct SharedConnStatsHashKey typedef struct SharedConnStatsHashKey
@ -105,7 +107,7 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
/* local function declarations */ /* local function declarations */
static void StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); static void StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor);
static void LockConnectionSharedMemory(void); static void LockConnectionSharedMemory(LWLockMode lockMode);
static void UnLockConnectionSharedMemory(void); static void UnLockConnectionSharedMemory(void);
static void SharedConnectionStatsShmemInit(void); static void SharedConnectionStatsShmemInit(void);
static size_t SharedConnectionStatsShmemSize(void); static size_t SharedConnectionStatsShmemSize(void);
@ -156,7 +158,7 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
*/ */
/* we're reading all distributed transactions, prevent new backends */ /* we're reading all distributed transactions, prevent new backends */
LockConnectionSharedMemory(); LockConnectionSharedMemory(LW_SHARED);
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
SharedConnStatsHashEntry *connectionEntry = NULL; SharedConnStatsHashEntry *connectionEntry = NULL;
@ -298,7 +300,7 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
connKey.port = port; connKey.port = port;
connKey.databaseOid = MyDatabaseId; connKey.databaseOid = MyDatabaseId;
LockConnectionSharedMemory(); LockConnectionSharedMemory(LW_EXCLUSIVE);
/* /*
* Note that while holding a spinlock, it would not allowed to use HASH_ENTER_NULL * Note that while holding a spinlock, it would not allowed to use HASH_ENTER_NULL
@ -356,6 +358,12 @@ DecrementSharedConnectionCounter(const char *hostname, int port)
{ {
SharedConnStatsHashKey connKey; SharedConnStatsHashKey connKey;
if (GetMaxSharedPoolSize() == -1)
{
/* connection throttling disabled */
return;
}
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH) if (strlen(hostname) > MAX_NODE_LENGTH)
{ {
@ -367,7 +375,7 @@ DecrementSharedConnectionCounter(const char *hostname, int port)
connKey.port = port; connKey.port = port;
connKey.databaseOid = MyDatabaseId; connKey.databaseOid = MyDatabaseId;
LockConnectionSharedMemory(); LockConnectionSharedMemory(LW_EXCLUSIVE);
bool entryFound = false; bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry = SharedConnStatsHashEntry *connectionEntry =
@ -387,9 +395,11 @@ DecrementSharedConnectionCounter(const char *hostname, int port)
* accessing to the SharedConnStatsHash, which is in the shared memory. * accessing to the SharedConnStatsHash, which is in the shared memory.
*/ */
static void static void
LockConnectionSharedMemory() LockConnectionSharedMemory(LWLockMode lockMode)
{ {
SpinLockAcquire(&ConnectionStatsSharedState->mutex); LWLockAcquire(&ConnectionStatsSharedState->sharedConnectionHashLock, lockMode);
/* SpinLockAcquire(&ConnectionStatsSharedState->mutex); */
} }
@ -400,7 +410,9 @@ LockConnectionSharedMemory()
static void static void
UnLockConnectionSharedMemory(void) UnLockConnectionSharedMemory(void)
{ {
SpinLockRelease(&ConnectionStatsSharedState->mutex); LWLockRelease(&ConnectionStatsSharedState->sharedConnectionHashLock);
/* SpinLockRelease(&ConnectionStatsSharedState->mutex); */
} }
@ -432,7 +444,8 @@ SharedConnectionStatsShmemSize(void)
Size size = 0; Size size = 0;
size = add_size(size, sizeof(ConnectionStatsSharedData)); size = add_size(size, sizeof(ConnectionStatsSharedData));
size = add_size(size, mul_size(sizeof(LWLock), MaxWorkerNodesTracked));
/* size = add_size(size, mul_size(sizeof(LWLock), MaxWorkerNodesTracked)); */
Size hashSize = hash_estimate_size(MaxWorkerNodesTracked, Size hashSize = hash_estimate_size(MaxWorkerNodesTracked,
sizeof(SharedConnStatsHashEntry)); sizeof(SharedConnStatsHashEntry));
@ -482,7 +495,10 @@ SharedConnectionStatsShmemInit(void)
LWLockRegisterTranche(ConnectionStatsSharedState->sharedConnectionHashTrancheId, LWLockRegisterTranche(ConnectionStatsSharedState->sharedConnectionHashTrancheId,
ConnectionStatsSharedState->sharedConnectionHashTrancheName); ConnectionStatsSharedState->sharedConnectionHashTrancheName);
SpinLockInit(&ConnectionStatsSharedState->mutex); LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock,
ConnectionStatsSharedState->sharedConnectionHashTrancheId);
/* SpinLockInit(&ConnectionStatsSharedState->mutex); */
} }
/* allocate hash table */ /* allocate hash table */

View File

@ -72,6 +72,7 @@
#include "distributed/adaptive_executor.h" #include "distributed/adaptive_executor.h"
#include "port/atomics.h" #include "port/atomics.h"
#include "postmaster/postmaster.h" #include "postmaster/postmaster.h"
#include "storage/ipc.h"
#include "optimizer/planner.h" #include "optimizer/planner.h"
#include "optimizer/paths.h" #include "optimizer/paths.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
@ -88,7 +89,6 @@ static char *CitusVersion = CITUS_VERSION;
void _PG_init(void); void _PG_init(void);
static void CitusBackendAtExit(void);
static void ResizeStackToMaximumDepth(void); static void ResizeStackToMaximumDepth(void);
static void multi_log_hook(ErrorData *edata); static void multi_log_hook(ErrorData *edata);
static void CreateRequiredDirectories(void); static void CreateRequiredDirectories(void);
@ -275,8 +275,6 @@ _PG_init(void)
InitializeCitusQueryStats(); InitializeCitusQueryStats();
InitializeSharedConnectionStats(); InitializeSharedConnectionStats();
atexit(CitusBackendAtExit);
/* enable modification of pg_catalog tables during pg_upgrade */ /* enable modification of pg_catalog tables during pg_upgrade */
if (IsBinaryUpgrade) if (IsBinaryUpgrade)
{ {
@ -286,18 +284,6 @@ _PG_init(void)
} }
/*
* CitusBackendAtExit is called atexit of the backend for the purposes of
* any clean-up needed.
*/
static void
CitusBackendAtExit(void)
{
/* properly close all the cached connections */
ShutdownAllConnections();
}
/* /*
* Stack size increase during high memory load may cause unexpected crashes. * Stack size increase during high memory load may cause unexpected crashes.
* With this alloca call, we are increasing stack size explicitly, so that if * With this alloca call, we are increasing stack size explicitly, so that if
@ -956,7 +942,7 @@ RegisterCitusConfigVariables(void)
"become overwhelmed by too many incoming connections"), "become overwhelmed by too many incoming connections"),
&MaxSharedPoolSize, &MaxSharedPoolSize,
0, -1, INT_MAX, 0, -1, INT_MAX,
PGC_SIGHUP, PGC_SIGHUP, /* TODO: is PGC_POSTMASTER more convinient? */
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, MaxSharedPoolSizeGucShowHook); NULL, NULL, MaxSharedPoolSizeGucShowHook);