diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 31af1ad47..eb10abda5 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -907,6 +907,13 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) */ (void) CurrentDatabaseName(); + /* + * ConnParams (AuthInfo and PoolInfo) gets a snapshot, which + * will blocks the remote connections to localhost. Hence we warm up + * the cache here so that after we start a new transaction, the entries + * will already be in the hash table, hence we won't be holding any snapshots. + */ + WarmUpConnParamsHash(); CommitTransactionCommand(); StartTransactionCommand(); } diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 7e5b8a54f..1d307e950 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -87,6 +87,7 @@ static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections, static void CloseNotReadyMultiConnectionStates(List *connectionStates); static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState); static void CitusPQFinish(MultiConnection *connection); +static ConnParamsHashEntry * FindOrCreateConnParamsEntry(ConnectionHashKey *key); /* * Initialize per-backend connection management infrastructure. @@ -1129,9 +1130,62 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize) static void StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key) { - bool found = false; static uint64 connectionId = 1; + ConnParamsHashEntry *entry = FindOrCreateConnParamsEntry(key); + + strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH); + connection->port = key->port; + strlcpy(connection->database, key->database, NAMEDATALEN); + strlcpy(connection->user, key->user, NAMEDATALEN); + + connection->pgConn = PQconnectStartParams((const char **) entry->keywords, + (const char **) entry->values, + false); + connection->connectionStart = GetCurrentTimestamp(); + connection->connectionId = connectionId++; + + /* + * To avoid issues with interrupts not getting caught all our connections + * are managed in a non-blocking manner. remote_commands.c provides + * wrappers emulating blocking behaviour. + */ + PQsetnonblocking(connection->pgConn, true); + + SetCitusNoticeReceiver(connection); +} + + +/* + * WarmUpConnParamsHash warms up the ConnParamsHash by loading all the + * conn params for active primary nodes. + */ +void +WarmUpConnParamsHash(void) +{ + List *workerNodeList = ActivePrimaryNodeList(AccessShareLock); + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + ConnectionHashKey key; + strlcpy(key.hostname, workerNode->workerName, MAX_NODE_LENGTH); + key.port = workerNode->workerPort; + strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN); + strlcpy(key.user, CurrentUserName(), NAMEDATALEN); + FindOrCreateConnParamsEntry(&key); + } +} + + +/* + * FindOrCreateConnParamsEntry searches ConnParamsHash for the given key, + * if it is not found, it is created. + */ +static ConnParamsHashEntry * +FindOrCreateConnParamsEntry(ConnectionHashKey *key) +{ + bool found = false; + /* search our cache for precomputed connection settings */ ConnParamsHashEntry *entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found); if (!found || !entry->isValid) @@ -1159,25 +1213,7 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key entry->isValid = true; } - strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH); - connection->port = key->port; - strlcpy(connection->database, key->database, NAMEDATALEN); - strlcpy(connection->user, key->user, NAMEDATALEN); - - connection->pgConn = PQconnectStartParams((const char **) entry->keywords, - (const char **) entry->values, - false); - connection->connectionStart = GetCurrentTimestamp(); - connection->connectionId = connectionId++; - - /* - * To avoid issues with interrupts not getting caught all our connections - * are managed in a non-blocking manner. remote_commands.c provides - * wrappers emulating blocking behaviour. - */ - PQsetnonblocking(connection->pgConn, true); - - SetCitusNoticeReceiver(connection); + return entry; } diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 710292dc2..34e7e4b67 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -261,4 +261,5 @@ extern bool IsCitusInitiatedRemoteBackend(void); extern double MillisecondsPassedSince(instr_time moment); extern long MillisecondsToTimeout(instr_time start, long msAfterStart); +extern void WarmUpConnParamsHash(void); #endif /* CONNECTION_MANAGMENT_H */