diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 72b7e3ae4..ffab29354 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -386,6 +386,9 @@ typedef struct WorkerSession * distributed transaction related commands such as BEGIN/COMMIT etc. */ uint64 commandsSent; + + /* index in the wait event set */ + int waitEventSetIndex; } WorkerSession; @@ -540,6 +543,7 @@ static int UsableConnectionCount(WorkerPool *workerPool); static long NextEventTimeout(DistributedExecution *execution); static long MillisecondsBetweenTimestamps(TimestampTz startTime, TimestampTz endTime); static WaitEventSet * BuildWaitEventSet(List *sessionList); +static void UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList); static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session); static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool); @@ -1647,29 +1651,31 @@ RunDistributedExecution(DistributedExecution *execution) ManageWorkerPool(workerPool); } - if (execution->connectionSetChanged || execution->waitFlagsChanged) + if (execution->connectionSetChanged) { FreeWaitEventSet(execution->waitEventSet); execution->waitEventSet = BuildWaitEventSet(execution->sessionList); - if (execution->connectionSetChanged) - { - /* - * The execution might take a while, so explicitly free at this point - * because we don't need anymore. - */ - pfree(events); + /* + * The execution might take a while, so explicitly free at this point + * because we don't need anymore. + */ + pfree(events); - /* recalculate (and allocate) since the sessions have changed */ - eventSetSize = list_length(execution->sessionList) + 2; + /* recalculate (and allocate) since the sessions have changed */ + eventSetSize = list_length(execution->sessionList) + 2; - events = palloc0(eventSetSize * sizeof(WaitEvent)); - } + events = palloc0(eventSetSize * sizeof(WaitEvent)); execution->connectionSetChanged = false; execution->waitFlagsChanged = false; } + else if (execution->waitFlagsChanged) + { + UpdateWaitEventSetFlags(execution->waitEventSet, execution->sessionList); + execution->waitFlagsChanged = false; + } /* wait for I/O events */ #if (PG_VERSION_NUM >= 100000) @@ -3413,6 +3419,7 @@ BuildWaitEventSet(List *sessionList) WorkerSession *session = lfirst(sessionCell); MultiConnection *connection = session->connection; int socket = 0; + int waitEventSetIndex = 0; if (connection->pgConn == NULL) { @@ -3433,8 +3440,9 @@ BuildWaitEventSet(List *sessionList) continue; } - AddWaitEventToSet(waitEventSet, connection->waitFlags, socket, NULL, - (void *) session); + waitEventSetIndex = AddWaitEventToSet(waitEventSet, connection->waitFlags, socket, + NULL, (void *) session); + session->waitEventSetIndex = waitEventSetIndex; } AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); @@ -3444,6 +3452,46 @@ BuildWaitEventSet(List *sessionList) } +/* + * UpdateWaitEventSetFlags modifies the given waitEventSet with the wait flags + * for connections in the sessionList. + */ +static void +UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList) +{ + ListCell *sessionCell = NULL; + + foreach(sessionCell, sessionList) + { + WorkerSession *session = lfirst(sessionCell); + MultiConnection *connection = session->connection; + int socket = 0; + int waitEventSetIndex = session->waitEventSetIndex; + + if (connection->pgConn == NULL) + { + /* connection died earlier in the transaction */ + continue; + } + + if (connection->waitFlags == 0) + { + /* not currently waiting for this connection */ + continue; + } + + socket = PQsocket(connection->pgConn); + if (socket == -1) + { + /* connection was closed */ + continue; + } + + ModifyWaitEvent(waitEventSet, waitEventSetIndex, connection->waitFlags, NULL); + } +} + + /* * SetLocalForceMaxQueryParallelization simply a C interface for * setting the following: