diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 577378803..6c4535e4e 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -1562,6 +1562,8 @@ CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd, volatile int waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED; MemoryContext savedContext = CurrentMemoryContext; + events |= WL_SOCKET_CLOSED; + PG_TRY(); { waitEventSetIndex = diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index a0e16876a..94fb4fa0d 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -2783,6 +2783,11 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); } + if (event->events & WL_SOCKET_CLOSED) + { + ereport(WARNING, (errmsg("Connection closed detected"))); + } + if (event->events & WL_LATCH_SET) { ResetLatch(MyLatch); @@ -2808,6 +2813,7 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC WorkerSession *session = (WorkerSession *) event->user_data; session->latestUnconsumedWaitEvents = event->events; + ConnectionStateMachine(session); } } @@ -4158,6 +4164,8 @@ UpdateConnectionWaitFlags(WorkerSession *session, int waitFlags) MultiConnection *connection = session->connection; DistributedExecution *execution = session->workerPool->distributedExecution; + waitFlags |= WL_SOCKET_CLOSED; + /* do not take any actions if the flags not changed */ if (connection->waitFlags == waitFlags) { @@ -4202,6 +4210,11 @@ CheckConnectionReady(WorkerSession *session) waitFlags = waitFlags | WL_SOCKET_WRITEABLE; } + if ((session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED) != 0) + { + elog(WARNING, "DETECT SOCKET CLOSED"); + } + if ((session->latestUnconsumedWaitEvents & WL_SOCKET_READABLE) != 0) { if (PQconsumeInput(connection->pgConn) == 0) diff --git a/src/include/pg_version_compat.h b/src/include/pg_version_compat.h index f551085a7..b217cb8f9 100644 --- a/src/include/pg_version_compat.h +++ b/src/include/pg_version_compat.h @@ -62,6 +62,12 @@ RelationGetSmgr(Relation rel) } +/* + * Postgres 15+ supports detecting the remote connection closed. For earlier version, + * we simply ignore that. + */ +#define WL_SOCKET_CLOSED 0 + #endif #if PG_VERSION_NUM >= PG_VERSION_14