From 338752d96e3f04a9592d8800fbe59eb512a61268 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 16 Nov 2021 11:47:30 +0100 Subject: [PATCH] Guard against hard wait event set errors Similar to https://github.com/citusdata/citus/pull/5158, but this time instead of the executor, use this in all the remaining places. --- .../connection/connection_management.c | 28 ++++++++++++++++-- .../distributed/connection/remote_commands.c | 29 ++++++++++++++++--- .../distributed/executor/adaptive_executor.c | 2 +- 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index c3a02bb6b..7b89b3e96 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -870,7 +870,19 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount) int eventMask = MultiConnectionStateEventMask(connectionState); - AddWaitEventToSet(waitEventSet, eventMask, sock, NULL, connectionState); + int waitEventSetIndex = + CitusAddWaitEventSetToSet(waitEventSet, eventMask, sock, + NULL, (void *) connectionState); + if (waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection establishment for node %s:%d failed", + connectionState->connection->hostname, + connectionState->connection->port), + errhint("Check both the local and remote server logs for the " + "connection establishment errors."))); + } + numEventsAdded++; if (waitCount) @@ -1020,7 +1032,19 @@ FinishConnectionListEstablishment(List *multiConnectionList) { /* connection state changed, reset the event mask */ uint32 eventMask = MultiConnectionStateEventMask(connectionState); - ModifyWaitEvent(waitEventSet, event->pos, eventMask, NULL); + bool success = + CitusModifyWaitEvent(waitEventSet, event->pos, + eventMask, NULL); + if (!success) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection establishment for node %s:%d " + "failed", connection->hostname, + connection->port), + errhint("Check both the local and remote server " + "logs for the connection establishment " + "errors."))); + } } /* diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 6511a675c..4c1aae6bf 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -906,8 +906,20 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) else if (sendStatus == 0) { /* done writing, only wait for read events */ - ModifyWaitEvent(waitEventSet, event->pos, WL_SOCKET_READABLE, - NULL); + bool success = + CitusModifyWaitEvent(waitEventSet, event->pos, + WL_SOCKET_READABLE, NULL); + if (!success) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection establishment for " + "node %s:%d failed", + connection->hostname, + connection->port), + errhint("Check both the local and remote " + "server logs for the connection " + "establishment errors."))); + } } } @@ -1052,8 +1064,17 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, * and writeability (server is ready to receive bytes). */ int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; - - AddWaitEventToSet(waitEventSet, eventMask, sock, NULL, (void *) connection); + int waitEventSetIndex = + CitusAddWaitEventSetToSet(waitEventSet, eventMask, sock, + NULL, (void *) connection); + if (waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection establishment for node %s:%d failed", + connection->hostname, connection->port), + errhint("Check both the local and remote server logs for the " + "connection establishment errors."))); + } } /* diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 452d9b9eb..959dc0d20 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -5434,7 +5434,7 @@ RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) if (!success) { ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("Modifying wait event for node %s:%d failed. " + errmsg("modifying wait event for node %s:%d failed. " "The wait event index was: %d", connection->hostname, connection->port, waitEventSetIndex)));