mirror of https://github.com/citusdata/citus.git
Warm up connections params hash (#4872)
ConnParams(AuthInfo and PoolInfo) gets a snapshot, which will block the
remote connectinos to localhost. And the release of snapshot will be
blocked by the snapshot. This leads to a deadlock.
We warm up the conn params hash before starting a new transaction so
that the entries will already be there when we start a new transaction.
Hence GetConnParams will not get a snapshot.
(cherry picked from commit b453563e88
)
pull/5110/head
parent
6640c76bde
commit
5759233f15
|
@ -923,6 +923,13 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
||||||
*/
|
*/
|
||||||
(void) CurrentDatabaseName();
|
(void) CurrentDatabaseName();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ConnParams (AuthInfo and PoolInfo) gets a snapshot, which
|
||||||
|
* will blocks the remote connections to localhost. Hence we warm up
|
||||||
|
* the cache here so that after we start a new transaction, the entries
|
||||||
|
* will already be in the hash table, hence we won't be holding any snapshots.
|
||||||
|
*/
|
||||||
|
WarmUpConnParamsHash();
|
||||||
CommitTransactionCommand();
|
CommitTransactionCommand();
|
||||||
StartTransactionCommand();
|
StartTransactionCommand();
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,6 +87,7 @@ static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections,
|
||||||
static void CloseNotReadyMultiConnectionStates(List *connectionStates);
|
static void CloseNotReadyMultiConnectionStates(List *connectionStates);
|
||||||
static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState);
|
static uint32 MultiConnectionStateEventMask(MultiConnectionPollState *connectionState);
|
||||||
static void CitusPQFinish(MultiConnection *connection);
|
static void CitusPQFinish(MultiConnection *connection);
|
||||||
|
static ConnParamsHashEntry * FindOrCreateConnParamsEntry(ConnectionHashKey *key);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Initialize per-backend connection management infrastructure.
|
* Initialize per-backend connection management infrastructure.
|
||||||
|
@ -1129,9 +1130,62 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize)
|
||||||
static void
|
static void
|
||||||
StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key)
|
StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key)
|
||||||
{
|
{
|
||||||
bool found = false;
|
|
||||||
static uint64 connectionId = 1;
|
static uint64 connectionId = 1;
|
||||||
|
|
||||||
|
ConnParamsHashEntry *entry = FindOrCreateConnParamsEntry(key);
|
||||||
|
|
||||||
|
strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH);
|
||||||
|
connection->port = key->port;
|
||||||
|
strlcpy(connection->database, key->database, NAMEDATALEN);
|
||||||
|
strlcpy(connection->user, key->user, NAMEDATALEN);
|
||||||
|
|
||||||
|
connection->pgConn = PQconnectStartParams((const char **) entry->keywords,
|
||||||
|
(const char **) entry->values,
|
||||||
|
false);
|
||||||
|
connection->connectionStart = GetCurrentTimestamp();
|
||||||
|
connection->connectionId = connectionId++;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* To avoid issues with interrupts not getting caught all our connections
|
||||||
|
* are managed in a non-blocking manner. remote_commands.c provides
|
||||||
|
* wrappers emulating blocking behaviour.
|
||||||
|
*/
|
||||||
|
PQsetnonblocking(connection->pgConn, true);
|
||||||
|
|
||||||
|
SetCitusNoticeReceiver(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* WarmUpConnParamsHash warms up the ConnParamsHash by loading all the
|
||||||
|
* conn params for active primary nodes.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
WarmUpConnParamsHash(void)
|
||||||
|
{
|
||||||
|
List *workerNodeList = ActivePrimaryNodeList(AccessShareLock);
|
||||||
|
WorkerNode *workerNode = NULL;
|
||||||
|
foreach_ptr(workerNode, workerNodeList)
|
||||||
|
{
|
||||||
|
ConnectionHashKey key;
|
||||||
|
strlcpy(key.hostname, workerNode->workerName, MAX_NODE_LENGTH);
|
||||||
|
key.port = workerNode->workerPort;
|
||||||
|
strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN);
|
||||||
|
strlcpy(key.user, CurrentUserName(), NAMEDATALEN);
|
||||||
|
FindOrCreateConnParamsEntry(&key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FindOrCreateConnParamsEntry searches ConnParamsHash for the given key,
|
||||||
|
* if it is not found, it is created.
|
||||||
|
*/
|
||||||
|
static ConnParamsHashEntry *
|
||||||
|
FindOrCreateConnParamsEntry(ConnectionHashKey *key)
|
||||||
|
{
|
||||||
|
bool found = false;
|
||||||
|
|
||||||
/* search our cache for precomputed connection settings */
|
/* search our cache for precomputed connection settings */
|
||||||
ConnParamsHashEntry *entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found);
|
ConnParamsHashEntry *entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found);
|
||||||
if (!found || !entry->isValid)
|
if (!found || !entry->isValid)
|
||||||
|
@ -1159,25 +1213,7 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key
|
||||||
entry->isValid = true;
|
entry->isValid = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
strlcpy(connection->hostname, key->hostname, MAX_NODE_LENGTH);
|
return entry;
|
||||||
connection->port = key->port;
|
|
||||||
strlcpy(connection->database, key->database, NAMEDATALEN);
|
|
||||||
strlcpy(connection->user, key->user, NAMEDATALEN);
|
|
||||||
|
|
||||||
connection->pgConn = PQconnectStartParams((const char **) entry->keywords,
|
|
||||||
(const char **) entry->values,
|
|
||||||
false);
|
|
||||||
connection->connectionStart = GetCurrentTimestamp();
|
|
||||||
connection->connectionId = connectionId++;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* To avoid issues with interrupts not getting caught all our connections
|
|
||||||
* are managed in a non-blocking manner. remote_commands.c provides
|
|
||||||
* wrappers emulating blocking behaviour.
|
|
||||||
*/
|
|
||||||
PQsetnonblocking(connection->pgConn, true);
|
|
||||||
|
|
||||||
SetCitusNoticeReceiver(connection);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -262,4 +262,5 @@ extern bool IsCitusInitiatedRemoteBackend(void);
|
||||||
extern double MillisecondsPassedSince(instr_time moment);
|
extern double MillisecondsPassedSince(instr_time moment);
|
||||||
extern long MillisecondsToTimeout(instr_time start, long msAfterStart);
|
extern long MillisecondsToTimeout(instr_time start, long msAfterStart);
|
||||||
|
|
||||||
|
extern void WarmUpConnParamsHash(void);
|
||||||
#endif /* CONNECTION_MANAGMENT_H */
|
#endif /* CONNECTION_MANAGMENT_H */
|
||||||
|
|
|
@ -431,7 +431,7 @@ SELECT * FROM partitioning_test WHERE id = 9 OR id = 10 ORDER BY 1;
|
||||||
-- create default partition
|
-- create default partition
|
||||||
CREATE TABLE partitioning_test_default PARTITION OF partitioning_test DEFAULT;
|
CREATE TABLE partitioning_test_default PARTITION OF partitioning_test DEFAULT;
|
||||||
\d+ partitioning_test
|
\d+ partitioning_test
|
||||||
Table "public.partitioning_test"
|
Table "public.partitioning_test"
|
||||||
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
|
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
id | integer | | | | plain | |
|
id | integer | | | | plain | |
|
||||||
|
@ -621,7 +621,7 @@ SELECT name, type FROM table_attrs WHERE relid = 'partitioning_test_2010'::regcl
|
||||||
-- test add PRIMARY KEY
|
-- test add PRIMARY KEY
|
||||||
-- add PRIMARY KEY to partitioned table - this will error out
|
-- add PRIMARY KEY to partitioned table - this will error out
|
||||||
ALTER TABLE partitioning_test ADD CONSTRAINT partitioning_primary PRIMARY KEY (id);
|
ALTER TABLE partitioning_test ADD CONSTRAINT partitioning_primary PRIMARY KEY (id);
|
||||||
ERROR: unique constraint on partitioned table must include all partitioning columns
|
ERROR: insufficient columns in PRIMARY KEY constraint definition
|
||||||
DETAIL: PRIMARY KEY constraint on table "partitioning_test" lacks column "time" which is part of the partition key.
|
DETAIL: PRIMARY KEY constraint on table "partitioning_test" lacks column "time" which is part of the partition key.
|
||||||
-- ADD PRIMARY KEY to partition
|
-- ADD PRIMARY KEY to partition
|
||||||
ALTER TABLE partitioning_test_2009 ADD CONSTRAINT partitioning_2009_primary PRIMARY KEY (id);
|
ALTER TABLE partitioning_test_2009 ADD CONSTRAINT partitioning_2009_primary PRIMARY KEY (id);
|
||||||
|
@ -1996,7 +1996,7 @@ SELECT fix_pre_citus10_partitioned_table_constraint_names('partitioning_test');
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT fix_pre_citus10_partitioned_table_constraint_names();
|
SELECT fix_pre_citus10_partitioned_table_constraint_names();
|
||||||
fix_pre_citus10_partitioned_table_constraint_names
|
fix_pre_citus10_partitioned_table_constraint_names
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
partitioning_test
|
partitioning_test
|
||||||
"schema-test"
|
"schema-test"
|
||||||
|
|
Loading…
Reference in New Issue