mirror of https://github.com/citusdata/citus.git
Merge pull request #5158 from citusdata/heap_on_master
Guard against hard WaitEvenSet errorspull/5171/head
commit
272f4d7ce5
|
@ -176,6 +176,8 @@
|
|||
#include "utils/timestamp.h"
|
||||
|
||||
#define SLOW_START_DISABLED 0
|
||||
#define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1
|
||||
#define WAIT_EVENT_SET_INDEX_FAILED -2
|
||||
|
||||
|
||||
/*
|
||||
|
@ -656,6 +658,10 @@ static int UsableConnectionCount(WorkerPool *workerPool);
|
|||
static long NextEventTimeout(DistributedExecution *execution);
|
||||
static WaitEventSet * BuildWaitEventSet(List *sessionList);
|
||||
static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
|
||||
static int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,
|
||||
Latch *latch, void *user_data);
|
||||
static bool CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events,
|
||||
Latch *latch);
|
||||
static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session);
|
||||
static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session);
|
||||
static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool);
|
||||
|
@ -690,6 +696,8 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo,
|
|||
Oid **parameterTypes,
|
||||
const char ***parameterValues);
|
||||
static int GetEventSetSize(List *sessionList);
|
||||
static bool ProcessSessionsWithFailedWaitEventSetOperations(
|
||||
DistributedExecution *execution);
|
||||
static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution);
|
||||
static int RebuildWaitEventSet(DistributedExecution *execution);
|
||||
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
|
||||
|
@ -2155,6 +2163,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
|
|||
session->connection = connection;
|
||||
session->workerPool = workerPool;
|
||||
session->commandsSent = 0;
|
||||
session->waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED;
|
||||
|
||||
dlist_init(&session->pendingTaskQueue);
|
||||
dlist_init(&session->readyTaskQueue);
|
||||
|
@ -2318,6 +2327,7 @@ RunDistributedExecution(DistributedExecution *execution)
|
|||
ManageWorkerPool(workerPool);
|
||||
}
|
||||
|
||||
bool skipWaitEvents = false;
|
||||
if (execution->remoteTaskList == NIL)
|
||||
{
|
||||
/*
|
||||
|
@ -2339,11 +2349,28 @@ RunDistributedExecution(DistributedExecution *execution)
|
|||
}
|
||||
eventSetSize = RebuildWaitEventSet(execution);
|
||||
events = palloc0(eventSetSize * sizeof(WaitEvent));
|
||||
|
||||
skipWaitEvents =
|
||||
ProcessSessionsWithFailedWaitEventSetOperations(execution);
|
||||
}
|
||||
else if (execution->waitFlagsChanged)
|
||||
{
|
||||
RebuildWaitEventSetFlags(execution->waitEventSet, execution->sessionList);
|
||||
execution->waitFlagsChanged = false;
|
||||
|
||||
skipWaitEvents =
|
||||
ProcessSessionsWithFailedWaitEventSetOperations(execution);
|
||||
}
|
||||
|
||||
if (skipWaitEvents)
|
||||
{
|
||||
/*
|
||||
* Some operation on the wait event set is failed, retry
|
||||
* as we already removed the problematic connections.
|
||||
*/
|
||||
execution->rebuildWaitEventSet = true;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
/* wait for I/O events */
|
||||
|
@ -2392,6 +2419,51 @@ RunDistributedExecution(DistributedExecution *execution)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ProcessSessionsWithFailedEventSetOperations goes over the session list and
|
||||
* processes sessions with failed wait event set operations.
|
||||
*
|
||||
* Failed sessions are not going to generate any further events, so it is our
|
||||
* only chance to process the failure by calling into `ConnectionStateMachine`.
|
||||
*
|
||||
* The function returns true if any session failed.
|
||||
*/
|
||||
static bool
|
||||
ProcessSessionsWithFailedWaitEventSetOperations(DistributedExecution *execution)
|
||||
{
|
||||
bool foundFailedSession = false;
|
||||
WorkerSession *session = NULL;
|
||||
foreach_ptr(session, execution->sessionList)
|
||||
{
|
||||
if (session->waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED)
|
||||
{
|
||||
/*
|
||||
* We can only lost only already connected connections,
|
||||
* others are regular failures.
|
||||
*/
|
||||
MultiConnection *connection = session->connection;
|
||||
if (connection->connectionState == MULTI_CONNECTION_CONNECTED)
|
||||
{
|
||||
connection->connectionState = MULTI_CONNECTION_LOST;
|
||||
}
|
||||
else
|
||||
{
|
||||
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||
}
|
||||
|
||||
|
||||
ConnectionStateMachine(session);
|
||||
|
||||
session->waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED;
|
||||
|
||||
foundFailedSession = true;
|
||||
}
|
||||
}
|
||||
|
||||
return foundFailedSession;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* HasIncompleteConnectionEstablishment returns true if any of the connections
|
||||
* that has been initiated by the executor is in initilization stage.
|
||||
|
@ -5066,18 +5138,79 @@ BuildWaitEventSet(List *sessionList)
|
|||
continue;
|
||||
}
|
||||
|
||||
int waitEventSetIndex = AddWaitEventToSet(waitEventSet, connection->waitFlags,
|
||||
sock, NULL, (void *) session);
|
||||
int waitEventSetIndex =
|
||||
CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags, sock,
|
||||
NULL, (void *) session);
|
||||
session->waitEventSetIndex = waitEventSetIndex;
|
||||
}
|
||||
|
||||
AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
|
||||
AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
|
||||
CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
|
||||
NULL);
|
||||
CitusAddWaitEventSetToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,
|
||||
NULL);
|
||||
|
||||
return waitEventSet;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusAddWaitEventSetToSet is a wrapper around Postgres' AddWaitEventToSet().
|
||||
*
|
||||
* AddWaitEventToSet() may throw hard errors. For example, when the
|
||||
* underlying socket for a connection is closed by the remote server
|
||||
* and already reflected by the OS, however Citus hasn't had a chance
|
||||
* to get this information. In that case, if replication factor is >1,
|
||||
* Citus can failover to other nodes for executing the query. Even if
|
||||
* replication factor = 1, Citus can give much nicer errors.
|
||||
*
|
||||
* So CitusAddWaitEventSetToSet simply puts ModifyWaitEvent into a
|
||||
* PG_TRY/PG_CATCH block in order to catch any hard errors, and
|
||||
* returns this information to the caller.
|
||||
*/
|
||||
static int
|
||||
CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,
|
||||
Latch *latch, void *user_data)
|
||||
{
|
||||
volatile int waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED;
|
||||
MemoryContext savedContext = CurrentMemoryContext;
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
waitEventSetIndex =
|
||||
AddWaitEventToSet(set, events, fd, latch, (void *) user_data);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
/*
|
||||
* We might be in an arbitrary memory context when the
|
||||
* error is thrown and we should get back to one we had
|
||||
* at PG_TRY() time, especially because we are not
|
||||
* re-throwing the error.
|
||||
*/
|
||||
MemoryContextSwitchTo(savedContext);
|
||||
|
||||
FlushErrorState();
|
||||
|
||||
if (user_data != NULL)
|
||||
{
|
||||
WorkerSession *workerSession = (WorkerSession *) user_data;
|
||||
|
||||
ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("Adding wait event for node %s:%d failed. "
|
||||
"The socket was: %d",
|
||||
workerSession->workerPool->nodeName,
|
||||
workerSession->workerPool->nodePort, fd)));
|
||||
}
|
||||
|
||||
/* let the callers know about the failure */
|
||||
waitEventSetIndex = WAIT_EVENT_SET_INDEX_FAILED;
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
return waitEventSetIndex;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetEventSetSize returns the event set size for a list of sessions.
|
||||
*/
|
||||
|
@ -5121,11 +5254,68 @@ RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList)
|
|||
continue;
|
||||
}
|
||||
|
||||
ModifyWaitEvent(waitEventSet, waitEventSetIndex, connection->waitFlags, NULL);
|
||||
bool success =
|
||||
CitusModifyWaitEvent(waitEventSet, waitEventSetIndex,
|
||||
connection->waitFlags, NULL);
|
||||
if (!success)
|
||||
{
|
||||
ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("Modifying wait event for node %s:%d failed. "
|
||||
"The wait event index was: %d",
|
||||
connection->hostname, connection->port,
|
||||
waitEventSetIndex)));
|
||||
|
||||
session->waitEventSetIndex = WAIT_EVENT_SET_INDEX_FAILED;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusModifyWaitEvent is a wrapper around Postgres' ModifyWaitEvent().
|
||||
*
|
||||
* ModifyWaitEvent may throw hard errors. For example, when the underlying
|
||||
* socket for a connection is closed by the remote server and already
|
||||
* reflected by the OS, however Citus hasn't had a chance to get this
|
||||
* information. In that case, if repliction factor is >1, Citus can
|
||||
* failover to other nodes for executing the query. Even if replication
|
||||
* factor = 1, Citus can give much nicer errors.
|
||||
*
|
||||
* So CitusModifyWaitEvent simply puts ModifyWaitEvent into a PG_TRY/PG_CATCH
|
||||
* block in order to catch any hard errors, and returns this information to the
|
||||
* caller.
|
||||
*/
|
||||
static bool
|
||||
CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
|
||||
{
|
||||
volatile bool success = true;
|
||||
MemoryContext savedContext = CurrentMemoryContext;
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
ModifyWaitEvent(set, pos, events, latch);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
/*
|
||||
* We might be in an arbitrary memory context when the
|
||||
* error is thrown and we should get back to one we had
|
||||
* at PG_TRY() time, especially because we are not
|
||||
* re-throwing the error.
|
||||
*/
|
||||
MemoryContextSwitchTo(savedContext);
|
||||
|
||||
FlushErrorState();
|
||||
|
||||
/* let the callers know about the failure */
|
||||
success = false;
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetLocalForceMaxQueryParallelization is simply a C interface for setting
|
||||
* the following:
|
||||
|
|
Loading…
Reference in New Issue