Guard against hard wait event set errors

Similar to https://github.com/citusdata/citus/pull/5158, but this
time instead of the executor, use this in all the remaining places.
pull/5466/head
Onder Kalaci 2021-11-16 11:47:30 +01:00 committed by Marco Slot
parent 953951007c
commit 338752d96e
3 changed files with 52 additions and 7 deletions

View File

@ -870,7 +870,19 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount)
int eventMask = MultiConnectionStateEventMask(connectionState); int eventMask = MultiConnectionStateEventMask(connectionState);
AddWaitEventToSet(waitEventSet, eventMask, sock, NULL, connectionState); int waitEventSetIndex =
CitusAddWaitEventSetToSet(waitEventSet, eventMask, sock,
NULL, (void *) connectionState);
if (waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection establishment for node %s:%d failed",
connectionState->connection->hostname,
connectionState->connection->port),
errhint("Check both the local and remote server logs for the "
"connection establishment errors.")));
}
numEventsAdded++; numEventsAdded++;
if (waitCount) if (waitCount)
@ -1020,7 +1032,19 @@ FinishConnectionListEstablishment(List *multiConnectionList)
{ {
/* connection state changed, reset the event mask */ /* connection state changed, reset the event mask */
uint32 eventMask = MultiConnectionStateEventMask(connectionState); uint32 eventMask = MultiConnectionStateEventMask(connectionState);
ModifyWaitEvent(waitEventSet, event->pos, eventMask, NULL); bool success =
CitusModifyWaitEvent(waitEventSet, event->pos,
eventMask, NULL);
if (!success)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection establishment for node %s:%d "
"failed", connection->hostname,
connection->port),
errhint("Check both the local and remote server "
"logs for the connection establishment "
"errors.")));
}
} }
/* /*

View File

@ -906,8 +906,20 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts)
else if (sendStatus == 0) else if (sendStatus == 0)
{ {
/* done writing, only wait for read events */ /* done writing, only wait for read events */
ModifyWaitEvent(waitEventSet, event->pos, WL_SOCKET_READABLE, bool success =
NULL); CitusModifyWaitEvent(waitEventSet, event->pos,
WL_SOCKET_READABLE, NULL);
if (!success)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection establishment for "
"node %s:%d failed",
connection->hostname,
connection->port),
errhint("Check both the local and remote "
"server logs for the connection "
"establishment errors.")));
}
} }
} }
@ -1052,8 +1064,17 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
* and writeability (server is ready to receive bytes). * and writeability (server is ready to receive bytes).
*/ */
int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
int waitEventSetIndex =
AddWaitEventToSet(waitEventSet, eventMask, sock, NULL, (void *) connection); CitusAddWaitEventSetToSet(waitEventSet, eventMask, sock,
NULL, (void *) connection);
if (waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection establishment for node %s:%d failed",
connection->hostname, connection->port),
errhint("Check both the local and remote server logs for the "
"connection establishment errors.")));
}
} }
/* /*

View File

@ -5434,7 +5434,7 @@ RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList)
if (!success) if (!success)
{ {
ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE), ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("Modifying wait event for node %s:%d failed. " errmsg("modifying wait event for node %s:%d failed. "
"The wait event index was: %d", "The wait event index was: %d",
connection->hostname, connection->port, connection->hostname, connection->port,
waitEventSetIndex))); waitEventSetIndex)));