Use ModifyWaitEvent when only wait flags changed

pull/2848/head
Marco Slot 2019-07-21 01:35:04 +02:00
parent cd2905ec23
commit 71ad5c095b
1 changed files with 62 additions and 14 deletions

View File

@ -386,6 +386,9 @@ typedef struct WorkerSession
* distributed transaction related commands such as BEGIN/COMMIT etc.
*/
uint64 commandsSent;
/* index in the wait event set */
int waitEventSetIndex;
} WorkerSession;
@ -540,6 +543,7 @@ static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution);
static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime);
static WaitEventSet * BuildWaitEventSet(List *sessionList);
static void UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool);
@ -1647,29 +1651,31 @@ RunDistributedExecution(DistributedExecution *execution)
ManageWorkerPool(workerPool);
}
if (execution->connectionSetChanged || execution->waitFlagsChanged)
if (execution->connectionSetChanged)
{
FreeWaitEventSet(execution->waitEventSet);
execution->waitEventSet = BuildWaitEventSet(execution->sessionList);
if (execution->connectionSetChanged)
{
/*
* The execution might take a while, so explicitly free at this point
* because we don't need anymore.
*/
pfree(events);
/*
* The execution might take a while, so explicitly free at this point
* because we don't need anymore.
*/
pfree(events);
/* recalculate (and allocate) since the sessions have changed */
eventSetSize = list_length(execution->sessionList) + 2;
/* recalculate (and allocate) since the sessions have changed */
eventSetSize = list_length(execution->sessionList) + 2;
events = palloc0(eventSetSize * sizeof(WaitEvent));
}
events = palloc0(eventSetSize * sizeof(WaitEvent));
execution->connectionSetChanged = false;
execution->waitFlagsChanged = false;
}
else if (execution->waitFlagsChanged)
{
UpdateWaitEventSetFlags(execution->waitEventSet, execution->sessionList);
execution->waitFlagsChanged = false;
}
/* wait for I/O events */
#if (PG_VERSION_NUM >= 100000)
@ -3413,6 +3419,7 @@ BuildWaitEventSet(List *sessionList)
WorkerSession *session = lfirst(sessionCell);
MultiConnection *connection = session->connection;
int socket = 0;
int waitEventSetIndex = 0;
if (connection->pgConn == NULL)
{
@ -3433,8 +3440,9 @@ BuildWaitEventSet(List *sessionList)
continue;
}
AddWaitEventToSet(waitEventSet, connection->waitFlags, socket, NULL,
(void *) session);
waitEventSetIndex = AddWaitEventToSet(waitEventSet, connection->waitFlags, socket,
NULL, (void *) session);
session->waitEventSetIndex = waitEventSetIndex;
}
AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
@ -3444,6 +3452,46 @@ BuildWaitEventSet(List *sessionList)
}
/*
* UpdateWaitEventSetFlags modifies the given waitEventSet with the wait flags
* for connections in the sessionList.
*/
static void
UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList)
{
ListCell *sessionCell = NULL;
foreach(sessionCell, sessionList)
{
WorkerSession *session = lfirst(sessionCell);
MultiConnection *connection = session->connection;
int socket = 0;
int waitEventSetIndex = session->waitEventSetIndex;
if (connection->pgConn == NULL)
{
/* connection died earlier in the transaction */
continue;
}
if (connection->waitFlags == 0)
{
/* not currently waiting for this connection */
continue;
}
socket = PQsocket(connection->pgConn);
if (socket == -1)
{
/* connection was closed */
continue;
}
ModifyWaitEvent(waitEventSet, waitEventSetIndex, connection->waitFlags, NULL);
}
}
/*
* SetLocalForceMaxQueryParallelization simply a C interface for
* setting the following: