From 188e571d3cbd99c5b7257b18d1c707fbc3e20e3b Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 26 Mar 2020 10:15:41 +0100 Subject: [PATCH] Replace LwLock with spinLock --- .../connection/connection_management.c | 27 ++++-- .../connection/shared_connection_stats.c | 89 ++++++++++++++++--- .../distributed/shared_connection_stats.h | 1 + 3 files changed, 100 insertions(+), 17 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 0b66ff815..080511a7b 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -80,7 +80,7 @@ static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount); static void CloseNotReadyMultiConnectionStates(List *connectionStates); static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState); - +static void CitusPQFinish(MultiConnection *connection); static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL; @@ -475,8 +475,7 @@ CloseConnection(MultiConnection *connection) bool found; /* close connection */ - PQfinish(connection->pgConn); - connection->pgConn = NULL; + CitusPQFinish(connection); strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); key.port = connection->port; @@ -556,8 +555,7 @@ ShutdownConnection(MultiConnection *connection) { SendCancelationRequest(connection); } - PQfinish(connection->pgConn); - connection->pgConn = NULL; + CitusPQFinish(connection); } @@ -922,12 +920,27 @@ CloseNotReadyMultiConnectionStates(List *connectionStates) } /* close connection, otherwise we take up resource on the other side */ - PQfinish(connection->pgConn); - connection->pgConn = NULL; + CitusPQFinish(connection); } } +/* + * CitusPQFinish is a wrapper around + */ +static void +CitusPQFinish(MultiConnection *connection) +{ + if (connection->pgConn != NULL) + { + DecrementSharedConnectionCounter(connection->hostname, connection->port); + } + + PQfinish(connection->pgConn); + connection->pgConn = NULL; +} + + /* * Close connections on timeout in FinishConnectionListEstablishment * Synchronously finish connection establishment of an individual connection. diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index a3484e74d..6b0bbb435 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -45,7 +45,17 @@ typedef struct ConnectionStatsSharedData { int sharedConnectionHashTrancheId; char *sharedConnectionHashTrancheName; - LWLock sharedConnectionHashLock; + + /* + * We prefer mutex over LwLocks for two reasons: + * - The operations we perform while holding the lock is very tiny, and + * performance wise, mutex is encouraged by Postgres for such cases + * - We have to acquire the lock "atexit" callback, and LwLocks requires + * MyProc to be avaliable to acquire the lock. However, "atexit", it is + * not guranteed to have MyProc avaliable. On the other hand, "mutex" is + * independent from MyProc. + */ + slock_t mutex; } ConnectionStatsSharedData; typedef struct SharedConnStatsHashKey @@ -96,8 +106,8 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL; /* local function declarations */ static void StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); +static void LockConnectionSharedMemory(void); static void UnLockConnectionSharedMemory(void); -static void LockConnectionSharedMemory(LWLockMode lockMode); static void SharedConnectionStatsShmemInit(void); static size_t SharedConnectionStatsShmemSize(void); static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize); @@ -141,8 +151,13 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) Datum values[REMOTE_CONNECTION_STATS_COLUMNS]; bool isNulls[REMOTE_CONNECTION_STATS_COLUMNS]; + /* + * TODO: We should not do all the iterations/operations while + * holding the spinlock. + */ + /* we're reading all distributed transactions, prevent new backends */ - LockConnectionSharedMemory(LW_SHARED); + LockConnectionSharedMemory(); HASH_SEQ_STATUS status; SharedConnStatsHashEntry *connectionEntry = NULL; @@ -249,14 +264,34 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) connKey.port = port; connKey.databaseOid = MyDatabaseId; - LockConnectionSharedMemory(LW_EXCLUSIVE); + LockConnectionSharedMemory(); + /* + * Note that while holding a spinlock, it would not allowed to use HASH_ENTER_NULL + * if the entries in SharedConnStatsHash were allocated via palloc (as palloc + * might throw OOM errors). However, in this case we're safe as the hash map is + * allocated in shared memory, which doesn't rely on palloc for memory allocation. + * This is already asserted in hash_search() by Postgres. + */ bool entryFound = false; SharedConnStatsHashEntry *connectionEntry = - hash_search(SharedConnStatsHash, &connKey, HASH_ENTER, &entryFound); + hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound); + + /* + * It is possible to throw an error at this point, but that doesn't help us in anyway. + * Instead, we try our best, let the connection establishment continue by-passing the + * connection throttling. + */ + if (!connectionEntry) + { + UnLockConnectionSharedMemory(); + + return true; + } if (!entryFound) { + /* we successfully allocated the entry for the first time, so initialize it */ connectionEntry->connectionCount = 1; counterIncremented = true; @@ -278,14 +313,49 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) } +/* + * DecrementSharedConnectionCounter decrements the shared counter + * for the given hostname and port. + */ +void +DecrementSharedConnectionCounter(const char *hostname, int port) +{ + SharedConnStatsHashKey connKey; + + strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH); + if (strlen(hostname) > MAX_NODE_LENGTH) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("hostname exceeds the maximum length of %d", + MAX_NODE_LENGTH))); + } + + connKey.port = port; + connKey.databaseOid = MyDatabaseId; + + LockConnectionSharedMemory(); + + bool entryFound = false; + SharedConnStatsHashEntry *connectionEntry = + hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound); + + /* we should never decrement for non-existing connections */ + Assert((connectionEntry && entryFound && connectionEntry->connectionCount > 0)); + + connectionEntry->connectionCount -= 1; + + UnLockConnectionSharedMemory(); +} + + /* * LockConnectionSharedMemory is a utility function that should be used when * accessing to the SharedConnStatsHash, which is in the shared memory. */ static void -LockConnectionSharedMemory(LWLockMode lockMode) +LockConnectionSharedMemory() { - LWLockAcquire(&ConnectionStatsSharedState->sharedConnectionHashLock, lockMode); + SpinLockAcquire(&ConnectionStatsSharedState->mutex); } @@ -296,7 +366,7 @@ LockConnectionSharedMemory(LWLockMode lockMode) static void UnLockConnectionSharedMemory(void) { - LWLockRelease(&ConnectionStatsSharedState->sharedConnectionHashLock); + SpinLockRelease(&ConnectionStatsSharedState->mutex); } @@ -378,8 +448,7 @@ SharedConnectionStatsShmemInit(void) LWLockRegisterTranche(ConnectionStatsSharedState->sharedConnectionHashTrancheId, ConnectionStatsSharedState->sharedConnectionHashTrancheName); - LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock, - ConnectionStatsSharedState->sharedConnectionHashTrancheId); + SpinLockInit(&ConnectionStatsSharedState->mutex); } /* allocate hash table */ diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index b82927ad6..8458bca8e 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -16,5 +16,6 @@ extern int MaxTrackedWorkerNodes; extern void InitializeSharedConnectionStats(void); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); extern void WaitOrErrorForSharedConnection(const char *hostname, int port); +extern void DecrementSharedConnectionCounter(const char *hostname, int port); #endif /* SHARED_CONNECTION_STATS_H */