diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 6bfcbe753..ed4ad6b07 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -176,6 +176,8 @@ #include "utils/timestamp.h" #define SLOW_START_DISABLED 0 +#define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1 +#define WAIT_EVENT_SET_INDEX_FAILED -2 /* @@ -656,6 +658,10 @@ static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); static WaitEventSet * BuildWaitEventSet(List *sessionList); static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); +static int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd, + Latch *latch, void *user_data); +static bool CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, + Latch *latch); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool); @@ -690,6 +696,8 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); static int GetEventSetSize(List *sessionList); +static bool ProcessSessionsWithFailedWaitEventSetOperations( + DistributedExecution *execution); static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution); static int RebuildWaitEventSet(DistributedExecution *execution); static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int @@ -2155,6 +2163,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) session->connection = connection; session->workerPool = workerPool; session->commandsSent = 0; + session->waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED; dlist_init(&session->pendingTaskQueue); dlist_init(&session->readyTaskQueue); @@ -2318,6 +2327,7 @@ RunDistributedExecution(DistributedExecution *execution) ManageWorkerPool(workerPool); } + bool skipWaitEvents = false; if (execution->remoteTaskList == NIL) { /* @@ -2339,11 +2349,28 @@ RunDistributedExecution(DistributedExecution *execution) } eventSetSize = RebuildWaitEventSet(execution); events = palloc0(eventSetSize * sizeof(WaitEvent)); + + skipWaitEvents = + ProcessSessionsWithFailedWaitEventSetOperations(execution); } else if (execution->waitFlagsChanged) { RebuildWaitEventSetFlags(execution->waitEventSet, execution->sessionList); execution->waitFlagsChanged = false; + + skipWaitEvents = + ProcessSessionsWithFailedWaitEventSetOperations(execution); + } + + if (skipWaitEvents) + { + /* + * Some operation on the wait event set is failed, retry + * as we already removed the problematic connections. + */ + execution->rebuildWaitEventSet = true; + + continue; } /* wait for I/O events */ @@ -2392,6 +2419,51 @@ RunDistributedExecution(DistributedExecution *execution) } +/* + * ProcessSessionsWithFailedEventSetOperations goes over the session list and + * processes sessions with failed wait event set operations. + * + * Failed sessions are not going to generate any further events, so it is our + * only chance to process the failure by calling into `ConnectionStateMachine`. + * + * The function returns true if any session failed. + */ +static bool +ProcessSessionsWithFailedWaitEventSetOperations(DistributedExecution *execution) +{ + bool foundFailedSession = false; + WorkerSession *session = NULL; + foreach_ptr(session, execution->sessionList) + { + if (session->waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED) + { + /* + * We can only lost only already connected connections, + * others are regular failures. + */ + MultiConnection *connection = session->connection; + if (connection->connectionState == MULTI_CONNECTION_CONNECTED) + { + connection->connectionState = MULTI_CONNECTION_LOST; + } + else + { + connection->connectionState = MULTI_CONNECTION_FAILED; + } + + + ConnectionStateMachine(session); + + session->waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED; + + foundFailedSession = true; + } + } + + return foundFailedSession; +} + + /* * HasIncompleteConnectionEstablishment returns true if any of the connections * that has been initiated by the executor is in initilization stage. @@ -5066,18 +5138,79 @@ BuildWaitEventSet(List *sessionList) continue; } - int waitEventSetIndex = AddWaitEventToSet(waitEventSet, connection->waitFlags, - sock, NULL, (void *) session); + int waitEventSetIndex = + CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags, sock, + NULL, (void *) session); session->waitEventSetIndex = waitEventSetIndex; } - AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); - AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); + CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, + NULL); + CitusAddWaitEventSetToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, + NULL); return waitEventSet; } +/* + * CitusAddWaitEventSetToSet is a wrapper around Postgres' AddWaitEventToSet(). + * + * AddWaitEventToSet() may throw hard errors. For example, when the + * underlying socket for a connection is closed by the remote server + * and already reflected by the OS, however Citus hasn't had a chance + * to get this information. In that case, if replication factor is >1, + * Citus can failover to other nodes for executing the query. Even if + * replication factor = 1, Citus can give much nicer errors. + * + * So CitusAddWaitEventSetToSet simply puts ModifyWaitEvent into a + * PG_TRY/PG_CATCH block in order to catch any hard errors, and + * returns this information to the caller. + */ +static int +CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd, + Latch *latch, void *user_data) +{ + volatile int waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED; + MemoryContext savedContext = CurrentMemoryContext; + + PG_TRY(); + { + waitEventSetIndex = + AddWaitEventToSet(set, events, fd, latch, (void *) user_data); + } + PG_CATCH(); + { + /* + * We might be in an arbitrary memory context when the + * error is thrown and we should get back to one we had + * at PG_TRY() time, especially because we are not + * re-throwing the error. + */ + MemoryContextSwitchTo(savedContext); + + FlushErrorState(); + + if (user_data != NULL) + { + WorkerSession *workerSession = (WorkerSession *) user_data; + + ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("Adding wait event for node %s:%d failed. " + "The socket was: %d", + workerSession->workerPool->nodeName, + workerSession->workerPool->nodePort, fd))); + } + + /* let the callers know about the failure */ + waitEventSetIndex = WAIT_EVENT_SET_INDEX_FAILED; + } + PG_END_TRY(); + + return waitEventSetIndex; +} + + /* * GetEventSetSize returns the event set size for a list of sessions. */ @@ -5121,11 +5254,68 @@ RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) continue; } - ModifyWaitEvent(waitEventSet, waitEventSetIndex, connection->waitFlags, NULL); + bool success = + CitusModifyWaitEvent(waitEventSet, waitEventSetIndex, + connection->waitFlags, NULL); + if (!success) + { + ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("Modifying wait event for node %s:%d failed. " + "The wait event index was: %d", + connection->hostname, connection->port, + waitEventSetIndex))); + + session->waitEventSetIndex = WAIT_EVENT_SET_INDEX_FAILED; + } } } +/* + * CitusModifyWaitEvent is a wrapper around Postgres' ModifyWaitEvent(). + * + * ModifyWaitEvent may throw hard errors. For example, when the underlying + * socket for a connection is closed by the remote server and already + * reflected by the OS, however Citus hasn't had a chance to get this + * information. In that case, if repliction factor is >1, Citus can + * failover to other nodes for executing the query. Even if replication + * factor = 1, Citus can give much nicer errors. + * + * So CitusModifyWaitEvent simply puts ModifyWaitEvent into a PG_TRY/PG_CATCH + * block in order to catch any hard errors, and returns this information to the + * caller. + */ +static bool +CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) +{ + volatile bool success = true; + MemoryContext savedContext = CurrentMemoryContext; + + PG_TRY(); + { + ModifyWaitEvent(set, pos, events, latch); + } + PG_CATCH(); + { + /* + * We might be in an arbitrary memory context when the + * error is thrown and we should get back to one we had + * at PG_TRY() time, especially because we are not + * re-throwing the error. + */ + MemoryContextSwitchTo(savedContext); + + FlushErrorState(); + + /* let the callers know about the failure */ + success = false; + } + PG_END_TRY(); + + return success; +} + + /* * SetLocalForceMaxQueryParallelization is simply a C interface for setting * the following: