refactor AfterXacthodtConnectionHandling (#3202)

pull/3150/head
SaitTalhaNisanci 2019-11-19 14:50:23 +03:00 committed by GitHub
parent 87f57eb92b
commit 306d159072
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 63 additions and 32 deletions

View File

@ -44,6 +44,9 @@ static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key);
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int
cachedConnectionCount);
static void ResetConnection(MultiConnection *connection);
static void DefaultCitusNoticeProcessor(void *arg, const char *message); static void DefaultCitusNoticeProcessor(void *arg, const char *message);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
static bool RemoteTransactionIdle(MultiConnection *connection); static bool RemoteTransactionIdle(MultiConnection *connection);
@ -136,7 +139,7 @@ InvalidateConnParamsHashEntries(void)
/* /*
* Perform connection management activity after the end of a transaction. Both * AfterXactConnectionHandling performs connection management activity after the end of a transaction. Both
* COMMIT and ABORT paths are handled here. * COMMIT and ABORT paths are handled here.
* *
* This is called by Citus' global transaction callback. * This is called by Citus' global transaction callback.
@ -1039,7 +1042,7 @@ FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry)
/* /*
* Close all remote connections if necessary anymore (i.e. not session * AfterXactHostConnectionHandling closes all remote connections if not necessary anymore (i.e. not session
* lifetime), or if in a failed state. * lifetime), or if in a failed state.
*/ */
static void static void
@ -1047,18 +1050,6 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
{ {
dlist_mutable_iter iter; dlist_mutable_iter iter;
int cachedConnectionCount = 0; int cachedConnectionCount = 0;
bool isCitusInitiatedBackend = false;
/*
* When we are in a backend that was created to service an internal connection
* from the coordinator or another worker, we disable connection caching to avoid
* escalating the number of cached connections. We can recognize such backends
* from their application name.
*/
if (application_name != NULL && strcmp(application_name, CITUS_APPLICATION_NAME) == 0)
{
isCitusInitiatedBackend = true;
}
dlist_foreach_modify(iter, entry->connections) dlist_foreach_modify(iter, entry->connections)
{ {
@ -1066,8 +1057,8 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
dlist_container(MultiConnection, connectionNode, iter.cur); dlist_container(MultiConnection, connectionNode, iter.cur);
/* /*
* To avoid code leaking connections we warn if connections are * To avoid leaking connections we warn if connections are
* still claimed exclusively. We can only do so if the transaction * still claimed exclusively. We can only do so if the transaction is
* committed, as it's normal that code didn't have chance to clean * committed, as it's normal that code didn't have chance to clean
* up after errors. * up after errors.
*/ */
@ -1077,14 +1068,8 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
(errmsg("connection claimed exclusively at transaction commit"))); (errmsg("connection claimed exclusively at transaction commit")));
} }
/*
* Preserve session lifespan connections if they are still healthy. if (ShouldShutdownConnection(connection, cachedConnectionCount))
*/
if (isCitusInitiatedBackend ||
cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
connection->forceCloseAtTransactionEnd ||
PQstatus(connection->pgConn) != CONNECTION_OK ||
!RemoteTransactionIdle(connection))
{ {
ShutdownConnection(connection); ShutdownConnection(connection);
@ -1095,14 +1080,10 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
} }
else else
{ {
/* reset per-transaction state */ /*
ResetRemoteTransaction(connection); * reset healthy session lifespan connections.
ResetShardPlacementAssociation(connection); */
ResetConnection(connection);
/* reset copy state */
connection->copyBytesWrittenSinceLastFlush = 0;
UnclaimConnection(connection);
cachedConnectionCount++; cachedConnectionCount++;
} }
@ -1110,6 +1091,56 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
} }
/*
* ShouldShutdownConnection returns true if either one of the followings is true:
* - The connection is citus initiated.
* - Current cached connections is already at MaxCachedConnectionPerWorker
* - Connection is forced to close at the end of transaction
* - Connection is not in OK state
* - A transaction is still in progress (usually because we are cancelling a distributed transaction)
*/
static bool
ShouldShutdownConnection(MultiConnection *connection, const int cachedConnectionCount)
{
bool isCitusInitiatedBackend = false;
/*
* When we are in a backend that was created to serve an internal connection
* from the coordinator or another worker, we disable connection caching to avoid
* escalating the number of cached connections. We can recognize such backends
* from their application name.
*/
if (application_name != NULL && strcmp(application_name, CITUS_APPLICATION_NAME) == 0)
{
isCitusInitiatedBackend = true;
}
return isCitusInitiatedBackend ||
cachedConnectionCount >= MaxCachedConnectionsPerWorker ||
connection->forceCloseAtTransactionEnd ||
PQstatus(connection->pgConn) != CONNECTION_OK ||
!RemoteTransactionIdle(connection);
}
/*
* ResetConnection preserves the given connection for later usage by
* resetting its states.
*/
static void
ResetConnection(MultiConnection *connection)
{
/* reset per-transaction state */
ResetRemoteTransaction(connection);
ResetShardPlacementAssociation(connection);
/* reset copy state */
connection->copyBytesWrittenSinceLastFlush = 0;
UnclaimConnection(connection);
}
/* /*
* RemoteTransactionIdle function returns true if we manually * RemoteTransactionIdle function returns true if we manually
* set flag on run_commands_on_session_level_connection_to_node to true to * set flag on run_commands_on_session_level_connection_to_node to true to