Check WL_SOCKET_CLOSED

pull/6259/head
Onder Kalaci 2022-08-29 09:49:08 +02:00
parent c14bf3a660
commit d377dce852
1 changed files with 135 additions and 17 deletions

View File

@ -57,8 +57,10 @@ static uint32 ConnectionHashHash(const void *key, Size keysize);
static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static void StartConnectionEstablishment(MultiConnection *connectionn,
ConnectionHashKey *key);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections);
static MultiConnection * FindAvailableConnection(List *connections, uint32 flags);
static List * EnsureAndGetHealthyConnections(dlist_head *connections, uint32 flags);
static bool RemoteSocketClosed(MultiConnection *connection);
static void ErrorIfMultipleMetadataConnectionExists(List *connections);
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int
@ -67,7 +69,6 @@ static void ResetConnection(MultiConnection *connection);
static bool RemoteTransactionIdle(MultiConnection *connection);
static int EventSetSizeForConnectionList(List *connections);
/* types for async connection management */
enum MultiConnectionPhase
{
@ -334,8 +335,11 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
/* if desired, check whether there's a usable connection */
if (!(flags & FORCE_NEW_CONNECTION))
{
List *healthyConnections = EnsureAndGetHealthyConnections(entry->connections,
flags);
/* check connection cache for a connection that's not already in use */
MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
MultiConnection *connection = FindAvailableConnection(healthyConnections, flags);
if (connection)
{
return connection;
@ -427,16 +431,13 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
* If no connection is available, FindAvailableConnection returns NULL.
*/
static MultiConnection *
FindAvailableConnection(dlist_head *connections, uint32 flags)
FindAvailableConnection(List *connections, uint32 flags)
{
List *metadataConnectionCandidateList = NIL;
dlist_iter iter;
dlist_foreach(iter, connections)
MultiConnection *connection = NULL;
foreach_ptr(connection, connections)
{
MultiConnection *connection =
dlist_container(MultiConnection, connectionNode, iter.cur);
if (flags & OUTSIDE_TRANSACTION)
{
/* don't return connections that are used in transactions */
@ -527,19 +528,133 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
/*
* ErrorIfMultipleMetadataConnectionExists throws an error if the
* input connection dlist contains more than one metadata connections.
* EnsureAndGetHealthyConnections is a helper function that goes over the
* input connections, and:
* - Errors for the connections that are not healthy but still part of a
* remote transaction
* - Skips connections that are not healthy, marks them to be closed
* at the end of the transaction such that they do not linger around
*
* And, finally returns the list (unless already errored out) of connections
* that can be reused.
*/
static void
ErrorIfMultipleMetadataConnectionExists(dlist_head *connections)
static List *
EnsureAndGetHealthyConnections(dlist_head *connections, uint32 flags)
{
bool foundMetadataConnection = false;
List *healthyConnections = NIL;
dlist_iter iter;
dlist_foreach(iter, connections)
{
MultiConnection *connection =
dlist_container(MultiConnection, connectionNode, iter.cur);
bool healthyConnection = true;
if (connection->pgConn == NULL || PQsocket(connection->pgConn) == -1 ||
PQstatus(connection->pgConn) != CONNECTION_OK ||
connection->connectionState != MULTI_CONNECTION_CONNECTED ||
RemoteSocketClosed(connection))
{
/* connection is not healthy */
healthyConnection = false;
}
RemoteTransaction *transaction = &connection->remoteTransaction;
bool inRemoteTrasaction =
transaction->transactionState != REMOTE_TRANS_NOT_STARTED;
if (inRemoteTrasaction)
{
PGTransactionStatusType status = PQtransactionStatus(connection->pgConn);
/* if the connection is in a bad state, so is the transaction's state */
if (status == PQTRANS_INERROR || status == PQTRANS_UNKNOWN)
{
transaction->transactionFailed = true;
}
}
if (healthyConnection)
{
healthyConnections = lappend(healthyConnections, connection);
}
else if ((inRemoteTrasaction && !(flags & OUTSIDE_TRANSACTION)) &&
(transaction->transactionCritical ||
!dlist_is_empty(&connection->referencedPlacements)))
{
ReportConnectionError(connection, ERROR);
/*
* If a connection is executing a critical transaction or accessed any
* placements, we should not continue the execution.
*/
ereport(WARNING, (errmsg("failure on connection marked as essential: %s:%d",
connection->hostname, connection->port)));
}
else
{
/*
* If a connection is not usable and we do not need to throw an error,
* close at the end of the transaction.
*/
connection->forceCloseAtTransactionEnd = true;
}
}
return healthyConnections;
}
/*
* RemoteSocketClosed returns true if the remote socket is closed.
*
* Note that we rely on WL_SOCKET_CLOSED, which is only available
* for PG 15+ and for kernels that support the feature.
*/
static bool
RemoteSocketClosed(MultiConnection *connection)
{
bool socketClosed = false;
#if PG_VERSION_NUM >= PG_VERSION_15
WaitEventSet *waitEventSet =
CreateWaitEventSet(CurrentMemoryContext, 1);
int sock = PQsocket(connection->pgConn);
WaitEvent *events = palloc0(sizeof(WaitEvent));
/* TODO: process return value of CitusAddWaitEventSetToSet */
CitusAddWaitEventSetToSet(waitEventSet, WL_SOCKET_CLOSED, sock,
NULL, (void *) connection);
long timeout = 0; /* do not wait at all */
int eventCount = WaitEventSetWait(waitEventSet, timeout, events,
1, WAIT_EVENT_CLIENT_READ);
if (eventCount > 0)
{
Assert(eventCount == 1);
WaitEvent *event = &events[0];
socketClosed = (event->events & WL_SOCKET_CLOSED);
}
FreeWaitEventSet(waitEventSet);
#endif
return socketClosed;
}
/*
* ErrorIfMultipleMetadataConnectionExists throws an error if the
* input connection dlist contains more than one metadata connections.
*/
static void
ErrorIfMultipleMetadataConnectionExists(List *connections)
{
bool foundMetadataConnection = false;
MultiConnection *connection = NULL;
foreach_ptr(connection, connections)
{
if (connection->useForMetadataOperations)
{
if (foundMetadataConnection)
@ -617,7 +732,9 @@ ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName,
}
int flags = 0;
MultiConnection *connection = FindAvailableConnection(entry->connections, flags);
List *healthyConnections = EnsureAndGetHealthyConnections(entry->connections, flags);
MultiConnection *connection = FindAvailableConnection(healthyConnections, flags);
return connection;
}
@ -1478,7 +1595,8 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
connection->requiresReplication ||
(MaxCachedConnectionLifetime >= 0 &&
MillisecondsToTimeout(connection->connectionEstablishmentStart,
MaxCachedConnectionLifetime) <= 0);
MaxCachedConnectionLifetime) <= 0) ||
RemoteSocketClosed(connection);
}