From 5759233f1559dbd1a8245abdd9616649718d963d Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Mon, 12 Apr 2021 13:08:38 +0300 Subject: [PATCH] Warm up connections params hash (#4872) ConnParams(AuthInfo and PoolInfo) gets a snapshot, which will block the remote connectinos to localhost. And the release of snapshot will be blocked by the snapshot. This leads to a deadlock. We warm up the conn params hash before starting a new transaction so that the entries will already be there when we start a new transaction. Hence GetConnParams will not get a snapshot. (cherry picked from commit b453563e88427a882096763efa97d91191c0532e) --- .../distributed/commands/utility_hook.c | 7 ++ .../connection/connection_management.c | 76 ++++++++++++++----- .../distributed/connection_management.h | 1 + .../regress/expected/multi_partitioning.out | 6 +- 4 files changed, 67 insertions(+), 23 deletions(-) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index adde39dbf..4c2e723db 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -923,6 +923,13 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) */ (void) CurrentDatabaseName(); + /* + * ConnParams (AuthInfo and PoolInfo) gets a snapshot, which + * will blocks the remote connections to localhost. Hence we warm up + * the cache here so that after we start a new transaction, the entries + * will already be in the hash table, hence we won't be holding any snapshots. + */ + WarmUpConnParamsHash(); CommitTransactionCommand(); StartTransactionCommand(); } diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 397c47668..ff71a6b13 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -87,6 +87,7 @@ static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections, static void CloseNotReadyMultiConnectionStates(List *connectionStates); static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState); static void CitusPQFinish(MultiConnection *connection); +static ConnParamsHashEntry * FindOrCreateConnParamsEntry(ConnectionHashKey *key); /* * Initialize per-backend connection management infrastructure. @@ -1129,9 +1130,62 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize) static void StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key) { - bool found = false; static uint64 connectionId = 1; + ConnParamsHashEntry *entry = FindOrCreateConnParamsEntry(key); + + strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH); + connection->port = key->port; + strlcpy(connection->database, key->database, NAMEDATALEN); + strlcpy(connection->user, key->user, NAMEDATALEN); + + connection->pgConn = PQconnectStartParams((const char **) entry->keywords, + (const char **) entry->values, + false); + connection->connectionStart = GetCurrentTimestamp(); + connection->connectionId = connectionId++; + + /* + * To avoid issues with interrupts not getting caught all our connections + * are managed in a non-blocking manner. remote_commands.c provides + * wrappers emulating blocking behaviour. + */ + PQsetnonblocking(connection->pgConn, true); + + SetCitusNoticeReceiver(connection); +} + + +/* + * WarmUpConnParamsHash warms up the ConnParamsHash by loading all the + * conn params for active primary nodes. + */ +void +WarmUpConnParamsHash(void) +{ + List *workerNodeList = ActivePrimaryNodeList(AccessShareLock); + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + ConnectionHashKey key; + strlcpy(key.hostname, workerNode->workerName, MAX_NODE_LENGTH); + key.port = workerNode->workerPort; + strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN); + strlcpy(key.user, CurrentUserName(), NAMEDATALEN); + FindOrCreateConnParamsEntry(&key); + } +} + + +/* + * FindOrCreateConnParamsEntry searches ConnParamsHash for the given key, + * if it is not found, it is created. + */ +static ConnParamsHashEntry * +FindOrCreateConnParamsEntry(ConnectionHashKey *key) +{ + bool found = false; + /* search our cache for precomputed connection settings */ ConnParamsHashEntry *entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found); if (!found || !entry->isValid) @@ -1159,25 +1213,7 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key entry->isValid = true; } - strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH); - connection->port = key->port; - strlcpy(connection->database, key->database, NAMEDATALEN); - strlcpy(connection->user, key->user, NAMEDATALEN); - - connection->pgConn = PQconnectStartParams((const char **) entry->keywords, - (const char **) entry->values, - false); - connection->connectionStart = GetCurrentTimestamp(); - connection->connectionId = connectionId++; - - /* - * To avoid issues with interrupts not getting caught all our connections - * are managed in a non-blocking manner. remote_commands.c provides - * wrappers emulating blocking behaviour. - */ - PQsetnonblocking(connection->pgConn, true); - - SetCitusNoticeReceiver(connection); + return entry; } diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 302ef0b65..cb0b91627 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -262,4 +262,5 @@ extern bool IsCitusInitiatedRemoteBackend(void); extern double MillisecondsPassedSince(instr_time moment); extern long MillisecondsToTimeout(instr_time start, long msAfterStart); +extern void WarmUpConnParamsHash(void); #endif /* CONNECTION_MANAGMENT_H */ diff --git a/src/test/regress/expected/multi_partitioning.out b/src/test/regress/expected/multi_partitioning.out index 1bc13247c..34e246a3f 100644 --- a/src/test/regress/expected/multi_partitioning.out +++ b/src/test/regress/expected/multi_partitioning.out @@ -431,7 +431,7 @@ SELECT * FROM partitioning_test WHERE id = 9 OR id = 10 ORDER BY 1; -- create default partition CREATE TABLE partitioning_test_default PARTITION OF partitioning_test DEFAULT; \d+ partitioning_test - Table "public.partitioning_test" + Table "public.partitioning_test" Column | Type | Collation | Nullable | Default | Storage | Stats target | Description --------------------------------------------------------------------- id | integer | | | | plain | | @@ -621,7 +621,7 @@ SELECT name, type FROM table_attrs WHERE relid = 'partitioning_test_2010'::regcl -- test add PRIMARY KEY -- add PRIMARY KEY to partitioned table - this will error out ALTER TABLE partitioning_test ADD CONSTRAINT partitioning_primary PRIMARY KEY (id); -ERROR: unique constraint on partitioned table must include all partitioning columns +ERROR: insufficient columns in PRIMARY KEY constraint definition DETAIL: PRIMARY KEY constraint on table "partitioning_test" lacks column "time" which is part of the partition key. -- ADD PRIMARY KEY to partition ALTER TABLE partitioning_test_2009 ADD CONSTRAINT partitioning_2009_primary PRIMARY KEY (id); @@ -1996,7 +1996,7 @@ SELECT fix_pre_citus10_partitioned_table_constraint_names('partitioning_test'); (1 row) SELECT fix_pre_citus10_partitioned_table_constraint_names(); - fix_pre_citus10_partitioned_table_constraint_names + fix_pre_citus10_partitioned_table_constraint_names --------------------------------------------------------------------- partitioning_test "schema-test"