diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 13b52790a..7b89b3e96 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -870,7 +870,19 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount) int eventMask = MultiConnectionStateEventMask(connectionState); - AddWaitEventToSet(waitEventSet, eventMask, sock, NULL, connectionState); + int waitEventSetIndex = + CitusAddWaitEventSetToSet(waitEventSet, eventMask, sock, + NULL, (void *) connectionState); + if (waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection establishment for node %s:%d failed", + connectionState->connection->hostname, + connectionState->connection->port), + errhint("Check both the local and remote server logs for the " + "connection establishment errors."))); + } + numEventsAdded++; if (waitCount) @@ -1020,7 +1032,19 @@ FinishConnectionListEstablishment(List *multiConnectionList) { /* connection state changed, reset the event mask */ uint32 eventMask = MultiConnectionStateEventMask(connectionState); - ModifyWaitEvent(waitEventSet, event->pos, eventMask, NULL); + bool success = + CitusModifyWaitEvent(waitEventSet, event->pos, + eventMask, NULL); + if (!success) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection establishment for node %s:%d " + "failed", connection->hostname, + connection->port), + errhint("Check both the local and remote server " + "logs for the connection establishment " + "errors."))); + } } /* @@ -1521,3 +1545,95 @@ MarkConnectionConnected(MultiConnection *connection) INSTR_TIME_SET_CURRENT(connection->connectionEstablishmentEnd); } } + + +/* + * CitusAddWaitEventSetToSet is a wrapper around Postgres' AddWaitEventToSet(). + * + * AddWaitEventToSet() may throw hard errors. For example, when the + * underlying socket for a connection is closed by the remote server + * and already reflected by the OS, however Citus hasn't had a chance + * to get this information. In that case, if replication factor is >1, + * Citus can failover to other nodes for executing the query. Even if + * replication factor = 1, Citus can give much nicer errors. + * + * So CitusAddWaitEventSetToSet simply puts ModifyWaitEvent into a + * PG_TRY/PG_CATCH block in order to catch any hard errors, and + * returns this information to the caller. + */ +int +CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd, + Latch *latch, void *user_data) +{ + volatile int waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED; + MemoryContext savedContext = CurrentMemoryContext; + + PG_TRY(); + { + waitEventSetIndex = + AddWaitEventToSet(set, events, fd, latch, (void *) user_data); + } + PG_CATCH(); + { + /* + * We might be in an arbitrary memory context when the + * error is thrown and we should get back to one we had + * at PG_TRY() time, especially because we are not + * re-throwing the error. + */ + MemoryContextSwitchTo(savedContext); + + FlushErrorState(); + + /* let the callers know about the failure */ + waitEventSetIndex = WAIT_EVENT_SET_INDEX_FAILED; + } + PG_END_TRY(); + + return waitEventSetIndex; +} + + +/* + * CitusModifyWaitEvent is a wrapper around Postgres' ModifyWaitEvent(). + * + * ModifyWaitEvent may throw hard errors. For example, when the underlying + * socket for a connection is closed by the remote server and already + * reflected by the OS, however Citus hasn't had a chance to get this + * information. In that case, if replication factor is >1, Citus can + * failover to other nodes for executing the query. Even if replication + * factor = 1, Citus can give much nicer errors. + * + * So CitusModifyWaitEvent simply puts ModifyWaitEvent into a PG_TRY/PG_CATCH + * block in order to catch any hard errors, and returns this information to the + * caller. + */ +bool +CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) +{ + volatile bool success = true; + MemoryContext savedContext = CurrentMemoryContext; + + PG_TRY(); + { + ModifyWaitEvent(set, pos, events, latch); + } + PG_CATCH(); + { + /* + * We might be in an arbitrary memory context when the + * error is thrown and we should get back to one we had + * at PG_TRY() time, especially because we are not + * re-throwing the error. + */ + MemoryContextSwitchTo(savedContext); + + FlushErrorState(); + + /* let the callers know about the failure */ + success = false; + } + PG_END_TRY(); + + return success; +} diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 6511a675c..4c1aae6bf 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -906,8 +906,20 @@ WaitForAllConnections(List *connectionList, bool raiseInterrupts) else if (sendStatus == 0) { /* done writing, only wait for read events */ - ModifyWaitEvent(waitEventSet, event->pos, WL_SOCKET_READABLE, - NULL); + bool success = + CitusModifyWaitEvent(waitEventSet, event->pos, + WL_SOCKET_READABLE, NULL); + if (!success) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection establishment for " + "node %s:%d failed", + connection->hostname, + connection->port), + errhint("Check both the local and remote " + "server logs for the connection " + "establishment errors."))); + } } } @@ -1052,8 +1064,17 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount, * and writeability (server is ready to receive bytes). */ int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; - - AddWaitEventToSet(waitEventSet, eventMask, sock, NULL, (void *) connection); + int waitEventSetIndex = + CitusAddWaitEventSetToSet(waitEventSet, eventMask, sock, + NULL, (void *) connection); + if (waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection establishment for node %s:%d failed", + connection->hostname, connection->port), + errhint("Check both the local and remote server logs for the " + "connection establishment errors."))); + } } /* diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 1f04751bb..959dc0d20 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -178,8 +178,6 @@ #include "utils/timestamp.h" #define SLOW_START_DISABLED 0 -#define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1 -#define WAIT_EVENT_SET_INDEX_FAILED -2 /* @@ -678,10 +676,6 @@ static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); static WaitEventSet * BuildWaitEventSet(List *sessionList); static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); -static int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd, - Latch *latch, void *user_data); -static bool CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, - Latch *latch); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool); @@ -5367,6 +5361,19 @@ BuildWaitEventSet(List *sessionList) CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags, sock, NULL, (void *) session); session->waitEventSetIndex = waitEventSetIndex; + + /* + * Inform failed to add to wait event set with a debug message as this + * is too detailed information for users. + */ + if (session->waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED) + { + ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("Adding wait event for node %s:%d failed. " + "The socket was: %d", + session->workerPool->nodeName, + session->workerPool->nodePort, sock))); + } } CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, @@ -5378,64 +5385,6 @@ BuildWaitEventSet(List *sessionList) } -/* - * CitusAddWaitEventSetToSet is a wrapper around Postgres' AddWaitEventToSet(). - * - * AddWaitEventToSet() may throw hard errors. For example, when the - * underlying socket for a connection is closed by the remote server - * and already reflected by the OS, however Citus hasn't had a chance - * to get this information. In that case, if replication factor is >1, - * Citus can failover to other nodes for executing the query. Even if - * replication factor = 1, Citus can give much nicer errors. - * - * So CitusAddWaitEventSetToSet simply puts ModifyWaitEvent into a - * PG_TRY/PG_CATCH block in order to catch any hard errors, and - * returns this information to the caller. - */ -static int -CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd, - Latch *latch, void *user_data) -{ - volatile int waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED; - MemoryContext savedContext = CurrentMemoryContext; - - PG_TRY(); - { - waitEventSetIndex = - AddWaitEventToSet(set, events, fd, latch, (void *) user_data); - } - PG_CATCH(); - { - /* - * We might be in an arbitrary memory context when the - * error is thrown and we should get back to one we had - * at PG_TRY() time, especially because we are not - * re-throwing the error. - */ - MemoryContextSwitchTo(savedContext); - - FlushErrorState(); - - if (user_data != NULL) - { - WorkerSession *workerSession = (WorkerSession *) user_data; - - ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("Adding wait event for node %s:%d failed. " - "The socket was: %d", - workerSession->workerPool->nodeName, - workerSession->workerPool->nodePort, fd))); - } - - /* let the callers know about the failure */ - waitEventSetIndex = WAIT_EVENT_SET_INDEX_FAILED; - } - PG_END_TRY(); - - return waitEventSetIndex; -} - - /* * GetEventSetSize returns the event set size for a list of sessions. */ @@ -5485,7 +5434,7 @@ RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) if (!success) { ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("Modifying wait event for node %s:%d failed. " + errmsg("modifying wait event for node %s:%d failed. " "The wait event index was: %d", connection->hostname, connection->port, waitEventSetIndex))); @@ -5496,51 +5445,6 @@ RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) } -/* - * CitusModifyWaitEvent is a wrapper around Postgres' ModifyWaitEvent(). - * - * ModifyWaitEvent may throw hard errors. For example, when the underlying - * socket for a connection is closed by the remote server and already - * reflected by the OS, however Citus hasn't had a chance to get this - * information. In that case, if replication factor is >1, Citus can - * failover to other nodes for executing the query. Even if replication - * factor = 1, Citus can give much nicer errors. - * - * So CitusModifyWaitEvent simply puts ModifyWaitEvent into a PG_TRY/PG_CATCH - * block in order to catch any hard errors, and returns this information to the - * caller. - */ -static bool -CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) -{ - volatile bool success = true; - MemoryContext savedContext = CurrentMemoryContext; - - PG_TRY(); - { - ModifyWaitEvent(set, pos, events, latch); - } - PG_CATCH(); - { - /* - * We might be in an arbitrary memory context when the - * error is thrown and we should get back to one we had - * at PG_TRY() time, especially because we are not - * re-throwing the error. - */ - MemoryContextSwitchTo(savedContext); - - FlushErrorState(); - - /* let the callers know about the failure */ - success = false; - } - PG_END_TRY(); - - return success; -} - - /* * SetLocalForceMaxQueryParallelization is simply a C interface for setting * the following: diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index ad575cfe5..2e31bc9da 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -18,6 +18,7 @@ #include "lib/ilist.h" #include "pg_config.h" #include "portability/instr_time.h" +#include "storage/latch.h" #include "utils/guc.h" #include "utils/hsearch.h" #include "utils/timestamp.h" @@ -34,6 +35,10 @@ /* application name used for internal connections in rebalancer */ #define CITUS_REBALANCER_NAME "citus_rebalancer" +/* deal with waiteventset errors */ +#define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1 +#define WAIT_EVENT_SET_INDEX_FAILED -2 + /* forward declare, to avoid forcing large headers on everyone */ struct pg_conn; /* target of the PGconn typedef */ struct MemoryContextData; @@ -284,6 +289,13 @@ extern bool IsCitusInternalBackend(void); extern bool IsRebalancerInternalBackend(void); extern void MarkConnectionConnected(MultiConnection *connection); +/* waiteventset utilities */ +extern int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd, + Latch *latch, void *user_data); + +extern bool CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, + Latch *latch); + /* time utilities */ extern double MillisecondsPassedSince(instr_time moment); extern long MillisecondsToTimeout(instr_time start, long msAfterStart);