Replace LwLock with spinLock

preventConflictingFlags
Onder Kalaci 2020-03-26 10:15:41 +01:00
parent ebb0ebc3c8
commit 188e571d3c
3 changed files with 100 additions and 17 deletions

View File

@ -80,7 +80,7 @@ static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections,
int *waitCount);
static void CloseNotReadyMultiConnectionStates(List *connectionStates);
static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState);
static void CitusPQFinish(MultiConnection *connection);
static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
@ -475,8 +475,7 @@ CloseConnection(MultiConnection *connection)
bool found;
/* close connection */
PQfinish(connection->pgConn);
connection->pgConn = NULL;
CitusPQFinish(connection);
strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH);
key.port = connection->port;
@ -556,8 +555,7 @@ ShutdownConnection(MultiConnection *connection)
{
SendCancelationRequest(connection);
}
PQfinish(connection->pgConn);
connection->pgConn = NULL;
CitusPQFinish(connection);
}
@ -922,9 +920,24 @@ CloseNotReadyMultiConnectionStates(List *connectionStates)
}
/* close connection, otherwise we take up resource on the other side */
CitusPQFinish(connection);
}
}
/*
* CitusPQFinish is a wrapper around
*/
static void
CitusPQFinish(MultiConnection *connection)
{
if (connection->pgConn != NULL)
{
DecrementSharedConnectionCounter(connection->hostname, connection->port);
}
PQfinish(connection->pgConn);
connection->pgConn = NULL;
}
}

View File

@ -45,7 +45,17 @@ typedef struct ConnectionStatsSharedData
{
int sharedConnectionHashTrancheId;
char *sharedConnectionHashTrancheName;
LWLock sharedConnectionHashLock;
/*
* We prefer mutex over LwLocks for two reasons:
* - The operations we perform while holding the lock is very tiny, and
* performance wise, mutex is encouraged by Postgres for such cases
* - We have to acquire the lock "atexit" callback, and LwLocks requires
* MyProc to be avaliable to acquire the lock. However, "atexit", it is
* not guranteed to have MyProc avaliable. On the other hand, "mutex" is
* independent from MyProc.
*/
slock_t mutex;
} ConnectionStatsSharedData;
typedef struct SharedConnStatsHashKey
@ -96,8 +106,8 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
/* local function declarations */
static void StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor);
static void LockConnectionSharedMemory(void);
static void UnLockConnectionSharedMemory(void);
static void LockConnectionSharedMemory(LWLockMode lockMode);
static void SharedConnectionStatsShmemInit(void);
static size_t SharedConnectionStatsShmemSize(void);
static int SharedConnectionHashCompare(const void *a, const void *b, Size keysize);
@ -141,8 +151,13 @@ StoreAllConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
Datum values[REMOTE_CONNECTION_STATS_COLUMNS];
bool isNulls[REMOTE_CONNECTION_STATS_COLUMNS];
/*
* TODO: We should not do all the iterations/operations while
* holding the spinlock.
*/
/* we're reading all distributed transactions, prevent new backends */
LockConnectionSharedMemory(LW_SHARED);
LockConnectionSharedMemory();
HASH_SEQ_STATUS status;
SharedConnStatsHashEntry *connectionEntry = NULL;
@ -249,14 +264,34 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
connKey.port = port;
connKey.databaseOid = MyDatabaseId;
LockConnectionSharedMemory(LW_EXCLUSIVE);
LockConnectionSharedMemory();
/*
* Note that while holding a spinlock, it would not allowed to use HASH_ENTER_NULL
* if the entries in SharedConnStatsHash were allocated via palloc (as palloc
* might throw OOM errors). However, in this case we're safe as the hash map is
* allocated in shared memory, which doesn't rely on palloc for memory allocation.
* This is already asserted in hash_search() by Postgres.
*/
bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_ENTER, &entryFound);
hash_search(SharedConnStatsHash, &connKey, HASH_ENTER_NULL, &entryFound);
/*
* It is possible to throw an error at this point, but that doesn't help us in anyway.
* Instead, we try our best, let the connection establishment continue by-passing the
* connection throttling.
*/
if (!connectionEntry)
{
UnLockConnectionSharedMemory();
return true;
}
if (!entryFound)
{
/* we successfully allocated the entry for the first time, so initialize it */
connectionEntry->connectionCount = 1;
counterIncremented = true;
@ -278,14 +313,49 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
}
/*
* DecrementSharedConnectionCounter decrements the shared counter
* for the given hostname and port.
*/
void
DecrementSharedConnectionCounter(const char *hostname, int port)
{
SharedConnStatsHashKey connKey;
strlcpy(connKey.hostname, hostname, MAX_NODE_LENGTH);
if (strlen(hostname) > MAX_NODE_LENGTH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
}
connKey.port = port;
connKey.databaseOid = MyDatabaseId;
LockConnectionSharedMemory();
bool entryFound = false;
SharedConnStatsHashEntry *connectionEntry =
hash_search(SharedConnStatsHash, &connKey, HASH_FIND, &entryFound);
/* we should never decrement for non-existing connections */
Assert((connectionEntry && entryFound && connectionEntry->connectionCount > 0));
connectionEntry->connectionCount -= 1;
UnLockConnectionSharedMemory();
}
/*
* LockConnectionSharedMemory is a utility function that should be used when
* accessing to the SharedConnStatsHash, which is in the shared memory.
*/
static void
LockConnectionSharedMemory(LWLockMode lockMode)
LockConnectionSharedMemory()
{
LWLockAcquire(&ConnectionStatsSharedState->sharedConnectionHashLock, lockMode);
SpinLockAcquire(&ConnectionStatsSharedState->mutex);
}
@ -296,7 +366,7 @@ LockConnectionSharedMemory(LWLockMode lockMode)
static void
UnLockConnectionSharedMemory(void)
{
LWLockRelease(&ConnectionStatsSharedState->sharedConnectionHashLock);
SpinLockRelease(&ConnectionStatsSharedState->mutex);
}
@ -378,8 +448,7 @@ SharedConnectionStatsShmemInit(void)
LWLockRegisterTranche(ConnectionStatsSharedState->sharedConnectionHashTrancheId,
ConnectionStatsSharedState->sharedConnectionHashTrancheName);
LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock,
ConnectionStatsSharedState->sharedConnectionHashTrancheId);
SpinLockInit(&ConnectionStatsSharedState->mutex);
}
/* allocate hash table */

View File

@ -16,5 +16,6 @@ extern int MaxTrackedWorkerNodes;
extern void InitializeSharedConnectionStats(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
extern void WaitOrErrorForSharedConnection(const char *hostname, int port);
extern void DecrementSharedConnectionCounter(const char *hostname, int port);
#endif /* SHARED_CONNECTION_STATS_H */