Merge pull request #5579 from citusdata/improve_metadata_conn

Improve metadata connection selection logic
pull/5606/head
Önder Kalacı 2022-01-07 10:42:23 +01:00 committed by GitHub
commit 9d858cb1da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 22 additions and 39 deletions

View File

@ -56,9 +56,7 @@ static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static void StartConnectionEstablishment(MultiConnection *connectionn,
ConnectionHashKey *key);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
#ifdef USE_ASSERT_CHECKING
static void AssertSingleMetadataConnectionExists(dlist_head *connections);
#endif
static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections);
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int
@ -420,6 +418,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
static MultiConnection *
FindAvailableConnection(dlist_head *connections, uint32 flags)
{
List *metadataConnectionCandidateList = NIL;
dlist_iter iter;
dlist_foreach(iter, connections)
{
@ -473,52 +473,40 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
{
/*
* The caller requested a metadata connection, and this is not the
* metadata connection. Still, this is a candidate for becoming a
* metadata connection.
*/
metadataConnectionCandidateList =
lappend(metadataConnectionCandidateList, connection);
continue;
}
else
{
/*
* Now that we found metadata connection. We do some sanity
* checks.
*/
#ifdef USE_ASSERT_CHECKING
AssertSingleMetadataConnectionExists(connections);
#endif
/*
* Connection is in use for an ongoing operation. Metadata
* connection cannot be claimed exclusively.
*/
if (connection->claimedExclusively)
{
ereport(ERROR, (errmsg("metadata connections cannot be "
"claimed exclusively")));
}
}
return connection;
}
if ((flags & REQUIRE_METADATA_CONNECTION) && !dlist_is_empty(connections))
if ((flags & REQUIRE_METADATA_CONNECTION) &&
list_length(metadataConnectionCandidateList) > 0)
{
/*
* Caller asked a metadata connection, and we couldn't find in the
* above list. So, we pick the first connection as the metadata
* connection.
* Caller asked a metadata connection, and we couldn't find a connection
* that has already been used for metadata operations.
*
* So, we pick the first connection as the metadata connection.
*/
MultiConnection *metadataConnection =
dlist_container(MultiConnection, connectionNode,
dlist_head_node(connections));
linitial(metadataConnectionCandidateList);
Assert(!metadataConnection->claimedExclusively);
/* remember that we use this connection for metadata operations */
metadataConnection->useForMetadataOperations = true;
#ifdef USE_ASSERT_CHECKING
AssertSingleMetadataConnectionExists(connections);
#endif
/*
* We cannot have multiple metadata connections. If we see
* this error, it is likely that there is a bug in connection
* management.
*/
ErrorIfMultipleMetadataConnectionExists(connections);
return metadataConnection;
}
@ -527,14 +515,12 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
}
#ifdef USE_ASSERT_CHECKING
/*
* AssertSingleMetadataConnectionExists throws an error if the
* ErrorIfMultipleMetadataConnectionExists throws an error if the
* input connection dlist contains more than one metadata connections.
*/
static void
AssertSingleMetadataConnectionExists(dlist_head *connections)
ErrorIfMultipleMetadataConnectionExists(dlist_head *connections)
{
bool foundMetadataConnection = false;
dlist_iter iter;
@ -556,9 +542,6 @@ AssertSingleMetadataConnectionExists(dlist_head *connections)
}
#endif /* USE_ASSERT_CHECKING */
/*
* CloseAllConnectionsAfterTransaction sets the forceClose flag of all the
* connections. This is mainly done when citus.node_conninfo changes.