mirror of https://github.com/citusdata/citus.git
Check WL_SOCKET_CLOSED
parent
5c95604154
commit
65816467f2
|
@ -66,7 +66,7 @@ static bool ShouldShutdownConnection(MultiConnection *connection, const int
|
||||||
static void ResetConnection(MultiConnection *connection);
|
static void ResetConnection(MultiConnection *connection);
|
||||||
static bool RemoteTransactionIdle(MultiConnection *connection);
|
static bool RemoteTransactionIdle(MultiConnection *connection);
|
||||||
static int EventSetSizeForConnectionList(List *connections);
|
static int EventSetSizeForConnectionList(List *connections);
|
||||||
|
static bool RemoteConnectionClosed(MultiConnection *connection);
|
||||||
|
|
||||||
/* types for async connection management */
|
/* types for async connection management */
|
||||||
enum MultiConnectionPhase
|
enum MultiConnectionPhase
|
||||||
|
@ -492,6 +492,11 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (RemoteConnectionClosed(connection))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -526,6 +531,39 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
RemoteConnectionClosed(MultiConnection *connection)
|
||||||
|
{
|
||||||
|
bool socketClosed = false;
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||||
|
|
||||||
|
WaitEventSet *waitEventSet =
|
||||||
|
CreateWaitEventSet(CurrentMemoryContext, 1);
|
||||||
|
int sock = PQsocket(connection->pgConn);
|
||||||
|
WaitEvent *events = palloc0(sizeof(WaitEvent));
|
||||||
|
|
||||||
|
/* TODO: process return value of CitusAddWaitEventSetToSet */
|
||||||
|
CitusAddWaitEventSetToSet(waitEventSet, WL_SOCKET_CLOSED, sock,
|
||||||
|
NULL, (void *) connection);
|
||||||
|
|
||||||
|
long timeout = 0; /* do not wait at all */
|
||||||
|
int eventCount = WaitEventSetWait(waitEventSet, timeout, events,
|
||||||
|
1, WAIT_EVENT_CLIENT_READ);
|
||||||
|
if (eventCount > 0)
|
||||||
|
{
|
||||||
|
Assert(eventCount == 1);
|
||||||
|
|
||||||
|
WaitEvent *event = &events[0];
|
||||||
|
socketClosed = (event->events & WL_SOCKET_CLOSED);
|
||||||
|
}
|
||||||
|
|
||||||
|
FreeWaitEventSet(waitEventSet);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
return socketClosed;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ErrorIfMultipleMetadataConnectionExists throws an error if the
|
* ErrorIfMultipleMetadataConnectionExists throws an error if the
|
||||||
* input connection dlist contains more than one metadata connections.
|
* input connection dlist contains more than one metadata connections.
|
||||||
|
@ -1478,7 +1516,8 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
|
||||||
connection->requiresReplication ||
|
connection->requiresReplication ||
|
||||||
(MaxCachedConnectionLifetime >= 0 &&
|
(MaxCachedConnectionLifetime >= 0 &&
|
||||||
MillisecondsToTimeout(connection->connectionEstablishmentStart,
|
MillisecondsToTimeout(connection->connectionEstablishmentStart,
|
||||||
MaxCachedConnectionLifetime) <= 0);
|
MaxCachedConnectionLifetime) <= 0) ||
|
||||||
|
RemoteConnectionClosed(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue