Warm up connections params hash (#4872)

ConnParams(AuthInfo and PoolInfo) gets a snapshot, which will block the
remote connectinos to localhost. And the release of snapshot will be
blocked by the snapshot. This leads to a deadlock.

We warm up the conn params hash before starting a new transaction so
that the entries will already be there when we start a new transaction.
Hence GetConnParams will not get a snapshot.
pull/4898/head
SaitTalhaNisanci 2021-04-12 13:08:38 +03:00 committed by GitHub
parent a1a394dbc9
commit b453563e88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 20 deletions

View File

@ -907,6 +907,13 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
*/
(void) CurrentDatabaseName();
/*
* ConnParams (AuthInfo and PoolInfo) gets a snapshot, which
* will blocks the remote connections to localhost. Hence we warm up
* the cache here so that after we start a new transaction, the entries
* will already be in the hash table, hence we won't be holding any snapshots.
*/
WarmUpConnParamsHash();
CommitTransactionCommand();
StartTransactionCommand();
}

View File

@ -87,6 +87,7 @@ static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections,
static void CloseNotReadyMultiConnectionStates(List *connectionStates);
static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState);
static void CitusPQFinish(MultiConnection *connection);
static ConnParamsHashEntry * FindOrCreateConnParamsEntry(ConnectionHashKey *key);
/*
* Initialize per-backend connection management infrastructure.
@ -1129,9 +1130,62 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize)
static void
StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key)
{
bool found = false;
static uint64 connectionId = 1;
ConnParamsHashEntry *entry = FindOrCreateConnParamsEntry(key);
strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH);
connection->port = key->port;
strlcpy(connection->database, key->database, NAMEDATALEN);
strlcpy(connection->user, key->user, NAMEDATALEN);
connection->pgConn = PQconnectStartParams((const char **) entry->keywords,
(const char **) entry->values,
false);
connection->connectionStart = GetCurrentTimestamp();
connection->connectionId = connectionId++;
/*
* To avoid issues with interrupts not getting caught all our connections
* are managed in a non-blocking manner. remote_commands.c provides
* wrappers emulating blocking behaviour.
*/
PQsetnonblocking(connection->pgConn, true);
SetCitusNoticeReceiver(connection);
}
/*
* WarmUpConnParamsHash warms up the ConnParamsHash by loading all the
* conn params for active primary nodes.
*/
void
WarmUpConnParamsHash(void)
{
List *workerNodeList = ActivePrimaryNodeList(AccessShareLock);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
ConnectionHashKey key;
strlcpy(key.hostname, workerNode->workerName, MAX_NODE_LENGTH);
key.port = workerNode->workerPort;
strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN);
strlcpy(key.user, CurrentUserName(), NAMEDATALEN);
FindOrCreateConnParamsEntry(&key);
}
}
/*
* FindOrCreateConnParamsEntry searches ConnParamsHash for the given key,
* if it is not found, it is created.
*/
static ConnParamsHashEntry *
FindOrCreateConnParamsEntry(ConnectionHashKey *key)
{
bool found = false;
/* search our cache for precomputed connection settings */
ConnParamsHashEntry *entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found);
if (!found || !entry->isValid)
@ -1159,25 +1213,7 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key
entry->isValid = true;
}
strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH);
connection->port = key->port;
strlcpy(connection->database, key->database, NAMEDATALEN);
strlcpy(connection->user, key->user, NAMEDATALEN);
connection->pgConn = PQconnectStartParams((const char **) entry->keywords,
(const char **) entry->values,
false);
connection->connectionStart = GetCurrentTimestamp();
connection->connectionId = connectionId++;
/*
* To avoid issues with interrupts not getting caught all our connections
* are managed in a non-blocking manner. remote_commands.c provides
* wrappers emulating blocking behaviour.
*/
PQsetnonblocking(connection->pgConn, true);
SetCitusNoticeReceiver(connection);
return entry;
}

View File

@ -261,4 +261,5 @@ extern bool IsCitusInitiatedRemoteBackend(void);
extern double MillisecondsPassedSince(instr_time moment);
extern long MillisecondsToTimeout(instr_time start, long msAfterStart);
extern void WarmUpConnParamsHash(void);
#endif /* CONNECTION_MANAGMENT_H */