diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 4c8beb97c..6779ff764 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -208,7 +208,7 @@ typedef struct DistributedExecution * Flag to indiciate that the set of connections we are interested * in has changed and waitEventSet needs to be rebuilt. */ - bool connectionSetChanged; + bool rebuildWaitEventSet; /* * Flag to indiciate that the set of wait events we are interested @@ -1064,7 +1064,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, execution->raiseInterrupts = true; - execution->connectionSetChanged = false; + execution->rebuildWaitEventSet = false; execution->waitFlagsChanged = false; execution->jobIdList = jobIdList; @@ -2008,6 +2008,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) session->connection = connection; session->workerPool = workerPool; session->commandsSent = 0; + dlist_init(&session->pendingTaskQueue); dlist_init(&session->readyTaskQueue); @@ -2142,7 +2143,7 @@ RunDistributedExecution(DistributedExecution *execution) int eventSetSize = GetEventSetSize(execution->sessionList); /* always (re)build the wait event set the first time */ - execution->connectionSetChanged = true; + execution->rebuildWaitEventSet = true; while (execution->unfinishedTaskCount > 0 && !cancellationReceived) { @@ -2154,7 +2155,7 @@ RunDistributedExecution(DistributedExecution *execution) ManageWorkerPool(workerPool); } - if (execution->connectionSetChanged) + if (execution->rebuildWaitEventSet) { if (events != NULL) { @@ -2236,7 +2237,7 @@ RebuildWaitEventSet(DistributedExecution *execution) } execution->waitEventSet = BuildWaitEventSet(execution->sessionList); - execution->connectionSetChanged = false; + execution->rebuildWaitEventSet = false; execution->waitFlagsChanged = false; return GetEventSetSize(execution->sessionList); @@ -2482,7 +2483,7 @@ ManageWorkerPool(WorkerPool *workerPool) } INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime); - execution->connectionSetChanged = true; + execution->rebuildWaitEventSet = true; } @@ -2751,7 +2752,15 @@ ConnectionStateMachine(WorkerSession *session) break; } + int beforePollSocket = PQsocket(connection->pgConn); PostgresPollingStatusType pollMode = PQconnectPoll(connection->pgConn); + + if (beforePollSocket != PQsocket(connection->pgConn)) + { + /* rebuild the wait events if PQconnectPoll() changed the socket */ + execution->rebuildWaitEventSet = true; + } + if (pollMode == PGRES_POLLING_FAILED) { connection->connectionState = MULTI_CONNECTION_FAILED; @@ -2759,10 +2768,16 @@ ConnectionStateMachine(WorkerSession *session) else if (pollMode == PGRES_POLLING_READING) { UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE); + + /* we should have a valid socket */ + Assert(PQsocket(connection->pgConn) != -1); } else if (pollMode == PGRES_POLLING_WRITING) { UpdateConnectionWaitFlags(session, WL_SOCKET_WRITEABLE); + + /* we should have a valid socket */ + Assert(PQsocket(connection->pgConn) != -1); } else { @@ -2771,6 +2786,9 @@ ConnectionStateMachine(WorkerSession *session) WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); connection->connectionState = MULTI_CONNECTION_CONNECTED; + + /* we should have a valid socket */ + Assert(PQsocket(connection->pgConn) != -1); } break; @@ -2855,7 +2873,7 @@ ConnectionStateMachine(WorkerSession *session) ShutdownConnection(connection); /* remove connection from wait event set */ - execution->connectionSetChanged = true; + execution->rebuildWaitEventSet = true; /* * Reset the transaction state machine since CloseConnection()