mirror of https://github.com/citusdata/citus.git
Guard against hard WaitEvenSet errors
In short, add wrappers around Postgres' AddWaitEventToSet() and ModifyWaitEvent(). AddWaitEventToSet()/ModifyWaitEvent*() may throw hard errors. For example, when the underlying socket for a connection is closed by the remote server and already reflected by the OS, however Citus hasn't had a chance to get this information. In that case, if replication factor is >1, Citus can failover to other nodes for executing the query. Even if replication factor = 1, Citus can give much nicer errors. So CitusAddWaitEventSetToSet()/CitusModifyWaitEvent() simply puts AddWaitEventToSet()/ModifyWaitEvent() into a PG_TRY/PG_CATCH block in order to catch any hard errors, and returns this information to the caller.pull/5158/head
parent
2ac3cc07eb
commit
86bd28b92c
|
@ -176,6 +176,8 @@
|
||||||
#include "utils/timestamp.h"
|
#include "utils/timestamp.h"
|
||||||
|
|
||||||
#define SLOW_START_DISABLED 0
|
#define SLOW_START_DISABLED 0
|
||||||
|
#define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1
|
||||||
|
#define WAIT_EVENT_SET_INDEX_FAILED -2
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -656,6 +658,10 @@ static int UsableConnectionCount(WorkerPool *workerPool);
|
||||||
static long NextEventTimeout(DistributedExecution *execution);
|
static long NextEventTimeout(DistributedExecution *execution);
|
||||||
static WaitEventSet * BuildWaitEventSet(List *sessionList);
|
static WaitEventSet * BuildWaitEventSet(List *sessionList);
|
||||||
static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
|
static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
|
||||||
|
static int CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,
|
||||||
|
Latch *latch, void *user_data);
|
||||||
|
static bool CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events,
|
||||||
|
Latch *latch);
|
||||||
static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session);
|
static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session);
|
||||||
static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session);
|
static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session);
|
||||||
static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool);
|
static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool);
|
||||||
|
@ -690,6 +696,8 @@ static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo,
|
||||||
Oid **parameterTypes,
|
Oid **parameterTypes,
|
||||||
const char ***parameterValues);
|
const char ***parameterValues);
|
||||||
static int GetEventSetSize(List *sessionList);
|
static int GetEventSetSize(List *sessionList);
|
||||||
|
static bool ProcessSessionsWithFailedWaitEventSetOperations(
|
||||||
|
DistributedExecution *execution);
|
||||||
static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution);
|
static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution);
|
||||||
static int RebuildWaitEventSet(DistributedExecution *execution);
|
static int RebuildWaitEventSet(DistributedExecution *execution);
|
||||||
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
|
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
|
||||||
|
@ -2155,6 +2163,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
|
||||||
session->connection = connection;
|
session->connection = connection;
|
||||||
session->workerPool = workerPool;
|
session->workerPool = workerPool;
|
||||||
session->commandsSent = 0;
|
session->commandsSent = 0;
|
||||||
|
session->waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED;
|
||||||
|
|
||||||
dlist_init(&session->pendingTaskQueue);
|
dlist_init(&session->pendingTaskQueue);
|
||||||
dlist_init(&session->readyTaskQueue);
|
dlist_init(&session->readyTaskQueue);
|
||||||
|
@ -2318,6 +2327,7 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
ManageWorkerPool(workerPool);
|
ManageWorkerPool(workerPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool skipWaitEvents = false;
|
||||||
if (execution->remoteTaskList == NIL)
|
if (execution->remoteTaskList == NIL)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
@ -2339,11 +2349,28 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
}
|
}
|
||||||
eventSetSize = RebuildWaitEventSet(execution);
|
eventSetSize = RebuildWaitEventSet(execution);
|
||||||
events = palloc0(eventSetSize * sizeof(WaitEvent));
|
events = palloc0(eventSetSize * sizeof(WaitEvent));
|
||||||
|
|
||||||
|
skipWaitEvents =
|
||||||
|
ProcessSessionsWithFailedWaitEventSetOperations(execution);
|
||||||
}
|
}
|
||||||
else if (execution->waitFlagsChanged)
|
else if (execution->waitFlagsChanged)
|
||||||
{
|
{
|
||||||
RebuildWaitEventSetFlags(execution->waitEventSet, execution->sessionList);
|
RebuildWaitEventSetFlags(execution->waitEventSet, execution->sessionList);
|
||||||
execution->waitFlagsChanged = false;
|
execution->waitFlagsChanged = false;
|
||||||
|
|
||||||
|
skipWaitEvents =
|
||||||
|
ProcessSessionsWithFailedWaitEventSetOperations(execution);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (skipWaitEvents)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Some operation on the wait event set is failed, retry
|
||||||
|
* as we already removed the problematic connections.
|
||||||
|
*/
|
||||||
|
execution->rebuildWaitEventSet = true;
|
||||||
|
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* wait for I/O events */
|
/* wait for I/O events */
|
||||||
|
@ -2392,6 +2419,51 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ProcessSessionsWithFailedEventSetOperations goes over the session list and
|
||||||
|
* processes sessions with failed wait event set operations.
|
||||||
|
*
|
||||||
|
* Failed sessions are not going to generate any further events, so it is our
|
||||||
|
* only chance to process the failure by calling into `ConnectionStateMachine`.
|
||||||
|
*
|
||||||
|
* The function returns true if any session failed.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
ProcessSessionsWithFailedWaitEventSetOperations(DistributedExecution *execution)
|
||||||
|
{
|
||||||
|
bool foundFailedSession = false;
|
||||||
|
WorkerSession *session = NULL;
|
||||||
|
foreach_ptr(session, execution->sessionList)
|
||||||
|
{
|
||||||
|
if (session->waitEventSetIndex == WAIT_EVENT_SET_INDEX_FAILED)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We can only lost only already connected connections,
|
||||||
|
* others are regular failures.
|
||||||
|
*/
|
||||||
|
MultiConnection *connection = session->connection;
|
||||||
|
if (connection->connectionState == MULTI_CONNECTION_CONNECTED)
|
||||||
|
{
|
||||||
|
connection->connectionState = MULTI_CONNECTION_LOST;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
connection->connectionState = MULTI_CONNECTION_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ConnectionStateMachine(session);
|
||||||
|
|
||||||
|
session->waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED;
|
||||||
|
|
||||||
|
foundFailedSession = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return foundFailedSession;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* HasIncompleteConnectionEstablishment returns true if any of the connections
|
* HasIncompleteConnectionEstablishment returns true if any of the connections
|
||||||
* that has been initiated by the executor is in initilization stage.
|
* that has been initiated by the executor is in initilization stage.
|
||||||
|
@ -5066,18 +5138,79 @@ BuildWaitEventSet(List *sessionList)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int waitEventSetIndex = AddWaitEventToSet(waitEventSet, connection->waitFlags,
|
int waitEventSetIndex =
|
||||||
sock, NULL, (void *) session);
|
CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags, sock,
|
||||||
|
NULL, (void *) session);
|
||||||
session->waitEventSetIndex = waitEventSetIndex;
|
session->waitEventSetIndex = waitEventSetIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
|
CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
|
||||||
AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
|
NULL);
|
||||||
|
CitusAddWaitEventSetToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,
|
||||||
|
NULL);
|
||||||
|
|
||||||
return waitEventSet;
|
return waitEventSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CitusAddWaitEventSetToSet is a wrapper around Postgres' AddWaitEventToSet().
|
||||||
|
*
|
||||||
|
* AddWaitEventToSet() may throw hard errors. For example, when the
|
||||||
|
* underlying socket for a connection is closed by the remote server
|
||||||
|
* and already reflected by the OS, however Citus hasn't had a chance
|
||||||
|
* to get this information. In that case, if replication factor is >1,
|
||||||
|
* Citus can failover to other nodes for executing the query. Even if
|
||||||
|
* replication factor = 1, Citus can give much nicer errors.
|
||||||
|
*
|
||||||
|
* So CitusAddWaitEventSetToSet simply puts ModifyWaitEvent into a
|
||||||
|
* PG_TRY/PG_CATCH block in order to catch any hard errors, and
|
||||||
|
* returns this information to the caller.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
CitusAddWaitEventSetToSet(WaitEventSet *set, uint32 events, pgsocket fd,
|
||||||
|
Latch *latch, void *user_data)
|
||||||
|
{
|
||||||
|
volatile int waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED;
|
||||||
|
MemoryContext savedContext = CurrentMemoryContext;
|
||||||
|
|
||||||
|
PG_TRY();
|
||||||
|
{
|
||||||
|
waitEventSetIndex =
|
||||||
|
AddWaitEventToSet(set, events, fd, latch, (void *) user_data);
|
||||||
|
}
|
||||||
|
PG_CATCH();
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We might be in an arbitrary memory context when the
|
||||||
|
* error is thrown and we should get back to one we had
|
||||||
|
* at PG_TRY() time, especially because we are not
|
||||||
|
* re-throwing the error.
|
||||||
|
*/
|
||||||
|
MemoryContextSwitchTo(savedContext);
|
||||||
|
|
||||||
|
FlushErrorState();
|
||||||
|
|
||||||
|
if (user_data != NULL)
|
||||||
|
{
|
||||||
|
WorkerSession *workerSession = (WorkerSession *) user_data;
|
||||||
|
|
||||||
|
ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
|
errmsg("Adding wait event for node %s:%d failed. "
|
||||||
|
"The socket was: %d",
|
||||||
|
workerSession->workerPool->nodeName,
|
||||||
|
workerSession->workerPool->nodePort, fd)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* let the callers know about the failure */
|
||||||
|
waitEventSetIndex = WAIT_EVENT_SET_INDEX_FAILED;
|
||||||
|
}
|
||||||
|
PG_END_TRY();
|
||||||
|
|
||||||
|
return waitEventSetIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GetEventSetSize returns the event set size for a list of sessions.
|
* GetEventSetSize returns the event set size for a list of sessions.
|
||||||
*/
|
*/
|
||||||
|
@ -5121,11 +5254,68 @@ RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ModifyWaitEvent(waitEventSet, waitEventSetIndex, connection->waitFlags, NULL);
|
bool success =
|
||||||
|
CitusModifyWaitEvent(waitEventSet, waitEventSetIndex,
|
||||||
|
connection->waitFlags, NULL);
|
||||||
|
if (!success)
|
||||||
|
{
|
||||||
|
ereport(DEBUG1, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
|
errmsg("Modifying wait event for node %s:%d failed. "
|
||||||
|
"The wait event index was: %d",
|
||||||
|
connection->hostname, connection->port,
|
||||||
|
waitEventSetIndex)));
|
||||||
|
|
||||||
|
session->waitEventSetIndex = WAIT_EVENT_SET_INDEX_FAILED;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CitusModifyWaitEvent is a wrapper around Postgres' ModifyWaitEvent().
|
||||||
|
*
|
||||||
|
* ModifyWaitEvent may throw hard errors. For example, when the underlying
|
||||||
|
* socket for a connection is closed by the remote server and already
|
||||||
|
* reflected by the OS, however Citus hasn't had a chance to get this
|
||||||
|
* information. In that case, if repliction factor is >1, Citus can
|
||||||
|
* failover to other nodes for executing the query. Even if replication
|
||||||
|
* factor = 1, Citus can give much nicer errors.
|
||||||
|
*
|
||||||
|
* So CitusModifyWaitEvent simply puts ModifyWaitEvent into a PG_TRY/PG_CATCH
|
||||||
|
* block in order to catch any hard errors, and returns this information to the
|
||||||
|
* caller.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
CitusModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
|
||||||
|
{
|
||||||
|
volatile bool success = true;
|
||||||
|
MemoryContext savedContext = CurrentMemoryContext;
|
||||||
|
|
||||||
|
PG_TRY();
|
||||||
|
{
|
||||||
|
ModifyWaitEvent(set, pos, events, latch);
|
||||||
|
}
|
||||||
|
PG_CATCH();
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We might be in an arbitrary memory context when the
|
||||||
|
* error is thrown and we should get back to one we had
|
||||||
|
* at PG_TRY() time, especially because we are not
|
||||||
|
* re-throwing the error.
|
||||||
|
*/
|
||||||
|
MemoryContextSwitchTo(savedContext);
|
||||||
|
|
||||||
|
FlushErrorState();
|
||||||
|
|
||||||
|
/* let the callers know about the failure */
|
||||||
|
success = false;
|
||||||
|
}
|
||||||
|
PG_END_TRY();
|
||||||
|
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SetLocalForceMaxQueryParallelization is simply a C interface for setting
|
* SetLocalForceMaxQueryParallelization is simply a C interface for setting
|
||||||
* the following:
|
* the following:
|
||||||
|
|
Loading…
Reference in New Issue