WIP: Detect connection socket close

version-15-socket
Onder Kalaci 2022-04-20 17:50:47 +02:00
parent bed3770d54
commit ed9bc63cdd
3 changed files with 21 additions and 0 deletions

View File

@ -1562,6 +1562,8 @@ CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,
volatile int waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED; volatile int waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED;
MemoryContext savedContext = CurrentMemoryContext; MemoryContext savedContext = CurrentMemoryContext;
events |= WL_SOCKET_CLOSED;
PG_TRY(); PG_TRY();
{ {
waitEventSetIndex = waitEventSetIndex =

View File

@ -2783,6 +2783,11 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC
ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
} }
if (event->events & WL_SOCKET_CLOSED)
{
ereport(WARNING, (errmsg("Connection closed detected")));
}
if (event->events & WL_LATCH_SET) if (event->events & WL_LATCH_SET)
{ {
ResetLatch(MyLatch); ResetLatch(MyLatch);
@ -2808,6 +2813,7 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC
WorkerSession *session = (WorkerSession *) event->user_data; WorkerSession *session = (WorkerSession *) event->user_data;
session->latestUnconsumedWaitEvents = event->events; session->latestUnconsumedWaitEvents = event->events;
ConnectionStateMachine(session); ConnectionStateMachine(session);
} }
} }
@ -4158,6 +4164,8 @@ UpdateConnectionWaitFlags(WorkerSession *session, int waitFlags)
MultiConnection *connection = session->connection; MultiConnection *connection = session->connection;
DistributedExecution *execution = session->workerPool->distributedExecution; DistributedExecution *execution = session->workerPool->distributedExecution;
waitFlags |= WL_SOCKET_CLOSED;
/* do not take any actions if the flags not changed */ /* do not take any actions if the flags not changed */
if (connection->waitFlags == waitFlags) if (connection->waitFlags == waitFlags)
{ {
@ -4202,6 +4210,11 @@ CheckConnectionReady(WorkerSession *session)
waitFlags = waitFlags | WL_SOCKET_WRITEABLE; waitFlags = waitFlags | WL_SOCKET_WRITEABLE;
} }
if ((session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED) != 0)
{
elog(WARNING, "DETECT SOCKET CLOSED");
}
if ((session->latestUnconsumedWaitEvents & WL_SOCKET_READABLE) != 0) if ((session->latestUnconsumedWaitEvents & WL_SOCKET_READABLE) != 0)
{ {
if (PQconsumeInput(connection->pgConn) == 0) if (PQconsumeInput(connection->pgConn) == 0)

View File

@ -62,6 +62,12 @@ RelationGetSmgr(Relation rel)
} }
/*
* Postgres 15+ supports detecting the remote connection closed. For earlier version,
* we simply ignore that.
*/
#define WL_SOCKET_CLOSED 0
#endif #endif
#if PG_VERSION_NUM >= PG_VERSION_14 #if PG_VERSION_NUM >= PG_VERSION_14