remove_socket_check
Onder Kalaci 2022-11-18 14:00:10 +01:00
parent 8e5ba45b74
commit 754d6e201e
1 changed files with 62 additions and 30 deletions

View File

@ -243,6 +243,7 @@ typedef struct DistributedExecution
* and "man 3 siglongjmp" for more context. * and "man 3 siglongjmp" for more context.
*/ */
WaitEventSet *waitEventSet; WaitEventSet *waitEventSet;
WaitEvent *events;
/* /*
* The number of connections we aim to open per worker. * The number of connections we aim to open per worker.
@ -680,9 +681,9 @@ static void CheckConnectionTimeout(WorkerPool *workerPool);
static void MarkEstablishingSessionsTimedOut(WorkerPool *workerPool); static void MarkEstablishingSessionsTimedOut(WorkerPool *workerPool);
static int UsableConnectionCount(WorkerPool *workerPool); static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution); static long NextEventTimeout(DistributedExecution *execution);
static WaitEventSet * BuildWaitEventSet(List *sessionList); static WaitEventSet * BuildWaitEventSet(List *sessionList, bool s);
static void AddSessionToWaitEventSet(WorkerSession *session, static void AddSessionToWaitEventSet(WorkerSession *session,
WaitEventSet *waitEventSet); WaitEventSet *waitEventSet, bool s);
static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session);
@ -720,12 +721,12 @@ static int GetEventSetSize(List *sessionList);
static bool ProcessSessionsWithFailedWaitEventSetOperations( static bool ProcessSessionsWithFailedWaitEventSetOperations(
DistributedExecution *execution); DistributedExecution *execution);
static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution); static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution);
static int RebuildWaitEventSet(DistributedExecution *execution); static int RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly);
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
eventCount, bool *cancellationReceived); eventCount, bool *cancellationReceived);
#if PG_VERSION_NUM >= PG_VERSION_15 #if PG_VERSION_NUM >= PG_VERSION_15
static bool ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount); static void ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount);
#endif #endif
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime); static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime); static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
@ -2492,6 +2493,7 @@ RemoteSocketClosedForNewSession(WorkerSession *session)
{ {
bool socketClosed = false; bool socketClosed = false;
return false;
#if PG_VERSION_NUM >= PG_VERSION_15 #if PG_VERSION_NUM >= PG_VERSION_15
if (!WaitEventSetCanReportClosed()) if (!WaitEventSetCanReportClosed())
@ -2516,7 +2518,7 @@ RemoteSocketClosedForNewSession(WorkerSession *session)
WaitEventSet *waitEventSet = WaitEventSet *waitEventSet =
CreateWaitEventSet(CurrentMemoryContext, eventSetSize); CreateWaitEventSet(CurrentMemoryContext, eventSetSize);
AddSessionToWaitEventSet(session, waitEventSet); AddSessionToWaitEventSet(session, waitEventSet, true);
/* always good to wait for postmaster death */ /* always good to wait for postmaster death */
CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
@ -2524,8 +2526,7 @@ RemoteSocketClosedForNewSession(WorkerSession *session)
int eventCount = WaitEventSetWait(waitEventSet, timeout, events, int eventCount = WaitEventSetWait(waitEventSet, timeout, events,
eventSetSize, WAIT_EVENT_CLIENT_READ); eventSetSize, WAIT_EVENT_CLIENT_READ);
socketClosed = ProcessWaitEventsForSocketClosed(events, eventCount);
ProcessWaitEventsForSocketClosed(events, eventCount);
/* we can at most receive a single event, which is WL_SOCKET_CLOSED */ /* we can at most receive a single event, which is WL_SOCKET_CLOSED */
Assert(eventCount <= 1); Assert(eventCount <= 1);
@ -2632,8 +2633,6 @@ SequentialRunDistributedExecution(DistributedExecution *execution)
void void
RunDistributedExecution(DistributedExecution *execution) RunDistributedExecution(DistributedExecution *execution)
{ {
WaitEvent *events = NULL;
AssignTasksToConnectionsOrWorkerPool(execution); AssignTasksToConnectionsOrWorkerPool(execution);
PG_TRY(); PG_TRY();
@ -2690,17 +2689,16 @@ RunDistributedExecution(DistributedExecution *execution)
} }
else if (execution->rebuildWaitEventSet) else if (execution->rebuildWaitEventSet)
{ {
if (events != NULL) if (execution->events != NULL)
{ {
/* /*
* The execution might take a while, so explicitly free at this point * The execution might take a while, so explicitly free at this point
* because we don't need anymore. * because we don't need anymore.
*/ */
pfree(events); pfree(execution->events);
events = NULL; execution->events = NULL;
} }
eventSetSize = RebuildWaitEventSet(execution); eventSetSize = RebuildWaitEventSet(execution, false);
events = palloc0(eventSetSize * sizeof(WaitEvent));
skipWaitEvents = skipWaitEvents =
ProcessSessionsWithFailedWaitEventSetOperations(execution); ProcessSessionsWithFailedWaitEventSetOperations(execution);
@ -2727,14 +2725,14 @@ RunDistributedExecution(DistributedExecution *execution)
/* wait for I/O events */ /* wait for I/O events */
long timeout = NextEventTimeout(execution); long timeout = NextEventTimeout(execution);
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, execution->events,
eventSetSize, WAIT_EVENT_CLIENT_READ); eventSetSize, WAIT_EVENT_CLIENT_READ);
ProcessWaitEvents(execution, events, eventCount, &cancellationReceived); ProcessWaitEvents(execution, execution->events, eventCount, &cancellationReceived);
} }
if (events != NULL) if (execution->events != NULL)
{ {
pfree(events); pfree(execution->events);
} }
if (execution->waitEventSet != NULL) if (execution->waitEventSet != NULL)
@ -2844,7 +2842,7 @@ HasIncompleteConnectionEstablishment(DistributedExecution *execution)
* It returns the new event set size. * It returns the new event set size.
*/ */
static int static int
RebuildWaitEventSet(DistributedExecution *execution) RebuildWaitEventSet(DistributedExecution *execution, bool socketClosedOnly)
{ {
if (execution->waitEventSet != NULL) if (execution->waitEventSet != NULL)
{ {
@ -2852,7 +2850,8 @@ RebuildWaitEventSet(DistributedExecution *execution)
execution->waitEventSet = NULL; execution->waitEventSet = NULL;
} }
execution->waitEventSet = BuildWaitEventSet(execution->sessionList); execution->waitEventSet = BuildWaitEventSet(execution->sessionList, socketClosedOnly);
execution->events = palloc0(GetEventSetSize(execution->sessionList) * sizeof(WaitEvent));
execution->rebuildWaitEventSet = false; execution->rebuildWaitEventSet = false;
execution->waitFlagsChanged = false; execution->waitFlagsChanged = false;
@ -2916,7 +2915,7 @@ ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int eventC
* If WL_SOCKET_CLOSED is found, the function sets the underlying connection's * If WL_SOCKET_CLOSED is found, the function sets the underlying connection's
* state as MULTI_CONNECTION_LOST. * state as MULTI_CONNECTION_LOST.
*/ */
static bool static void
ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount) ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount)
{ {
int eventIndex = 0; int eventIndex = 0;
@ -2936,11 +2935,11 @@ ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount)
if (session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED) if (session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED)
{ {
return true; session->connection->connectionState = MULTI_CONNECTION_LOST;
} }
} }
return false; return ;
} }
@ -3024,7 +3023,6 @@ ManageWorkerPool(WorkerPool *workerPool)
} }
INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime); INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime);
execution->rebuildWaitEventSet = true;
} }
@ -3347,6 +3345,7 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
ereport(DEBUG4, (errmsg("opening %d new connections to %s:%d", newConnectionCount, ereport(DEBUG4, (errmsg("opening %d new connections to %s:%d", newConnectionCount,
workerPool->nodeName, workerPool->nodePort))); workerPool->nodeName, workerPool->nodePort)));
List *newConnectionsList = NIL;
for (int connectionIndex = 0; connectionIndex < newConnectionCount; connectionIndex++) for (int connectionIndex = 0; connectionIndex < newConnectionCount; connectionIndex++)
{ {
/* experimental: just to see the perf benefits of caching connections */ /* experimental: just to see the perf benefits of caching connections */
@ -3406,6 +3405,22 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
/* create a session for the connection */ /* create a session for the connection */
WorkerSession *session = FindOrCreateWorkerSession(workerPool, connection); WorkerSession *session = FindOrCreateWorkerSession(workerPool, connection);
newConnectionsList = lappend(newConnectionsList, session);
}
bool socketClosedOnly = true;
int size = RebuildWaitEventSet(workerPool->distributedExecution, socketClosedOnly);
long timeout = 0;/* don't wait */
int eventCount = WaitEventSetWait(workerPool->distributedExecution->waitEventSet, timeout, workerPool->distributedExecution->events, size, WAIT_EVENT_CLIENT_READ);
ProcessWaitEventsForSocketClosed(workerPool->distributedExecution->events, eventCount);
workerPool->distributedExecution->waitFlagsChanged = true;
WorkerSession *session = NULL;
foreach_ptr(session, workerPool->sessionList)
{
/* immediately run the state machine to handle potential failure */ /* immediately run the state machine to handle potential failure */
ConnectionStateMachine(session); ConnectionStateMachine(session);
} }
@ -3666,6 +3681,13 @@ ConnectionStateMachine(WorkerSession *session)
MultiConnection *connection = session->connection; MultiConnection *connection = session->connection;
MultiConnectionState currentState; MultiConnectionState currentState;
#if PG_VERSION_NUM >= PG_VERSION_15
if ((session->latestUnconsumedWaitEvents & WL_SOCKET_CLOSED) != 0)
{
connection->connectionState = MULTI_CONNECTION_LOST;
}
#endif
do { do {
currentState = connection->connectionState; currentState = connection->connectionState;
@ -5518,7 +5540,7 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution)
* write-ready. * write-ready.
*/ */
static WaitEventSet * static WaitEventSet *
BuildWaitEventSet(List *sessionList) BuildWaitEventSet(List *sessionList, bool socketClosedOnly)
{ {
/* additional 2 is for postmaster and latch */ /* additional 2 is for postmaster and latch */
int eventSetSize = GetEventSetSize(sessionList); int eventSetSize = GetEventSetSize(sessionList);
@ -5529,7 +5551,7 @@ BuildWaitEventSet(List *sessionList)
WorkerSession *session = NULL; WorkerSession *session = NULL;
foreach_ptr(session, sessionList) foreach_ptr(session, sessionList)
{ {
AddSessionToWaitEventSet(session, waitEventSet); AddSessionToWaitEventSet(session, waitEventSet, socketClosedOnly);
} }
CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, CitusAddWaitEventSetToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
@ -5547,7 +5569,7 @@ BuildWaitEventSet(List *sessionList)
* to the waitEventSet. * to the waitEventSet.
*/ */
static void static void
AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet) AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet, bool socketClosedOnly)
{ {
MultiConnection *connection = session->connection; MultiConnection *connection = session->connection;
@ -5557,7 +5579,7 @@ AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet)
return; return;
} }
if (connection->waitFlags == 0) if (connection->waitFlags == 0 && !socketClosedOnly)
{ {
/* not currently waiting for this connection */ /* not currently waiting for this connection */
return; return;
@ -5570,9 +5592,19 @@ AddSessionToWaitEventSet(WorkerSession *session, WaitEventSet *waitEventSet)
return; return;
} }
int waitEventSetIndex = int waitEventSetIndex = WAIT_EVENT_SET_INDEX_NOT_INITIALIZED;
CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags, sock,
if (socketClosedOnly)
{
waitEventSetIndex = CitusAddWaitEventSetToSet(waitEventSet, WL_SOCKET_CLOSED , sock,
NULL, (void *) session); NULL, (void *) session);
}
else
{
waitEventSetIndex = CitusAddWaitEventSetToSet(waitEventSet, connection->waitFlags , sock,
NULL, (void *) session);
}
session->waitEventSetIndex = waitEventSetIndex; session->waitEventSetIndex = waitEventSetIndex;
/* /*