From 77c397e9ae1ecf09c0814b7a2543d6b7f773491b Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 30 Apr 2020 09:56:18 +0200 Subject: [PATCH] Rebuild wait event sets after PQconnectPoll() if socket changes The reason is that PQconnectPoll() may change the underlying socket. If we don't rebuild the wait event set, the low level APIs (such as epoll_ctl()) may fail due to invalid sockets. Instead, rebuilding ensures that we'll use accurate/active sockets. --- .../distributed/executor/adaptive_executor.c | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 4c8beb97c..6779ff764 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -208,7 +208,7 @@ typedef struct DistributedExecution * Flag to indiciate that the set of connections we are interested * in has changed and waitEventSet needs to be rebuilt. */ - bool connectionSetChanged; + bool rebuildWaitEventSet; /* * Flag to indiciate that the set of wait events we are interested @@ -1064,7 +1064,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, execution->raiseInterrupts = true; - execution->connectionSetChanged = false; + execution->rebuildWaitEventSet = false; execution->waitFlagsChanged = false; execution->jobIdList = jobIdList; @@ -2008,6 +2008,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) session->connection = connection; session->workerPool = workerPool; session->commandsSent = 0; + dlist_init(&session->pendingTaskQueue); dlist_init(&session->readyTaskQueue); @@ -2142,7 +2143,7 @@ RunDistributedExecution(DistributedExecution *execution) int eventSetSize = GetEventSetSize(execution->sessionList); /* always (re)build the wait event set the first time */ - execution->connectionSetChanged = true; + execution->rebuildWaitEventSet = true; while (execution->unfinishedTaskCount > 0 && !cancellationReceived) { @@ -2154,7 +2155,7 @@ RunDistributedExecution(DistributedExecution *execution) ManageWorkerPool(workerPool); } - if (execution->connectionSetChanged) + if (execution->rebuildWaitEventSet) { if (events != NULL) { @@ -2236,7 +2237,7 @@ RebuildWaitEventSet(DistributedExecution *execution) } execution->waitEventSet = BuildWaitEventSet(execution->sessionList); - execution->connectionSetChanged = false; + execution->rebuildWaitEventSet = false; execution->waitFlagsChanged = false; return GetEventSetSize(execution->sessionList); @@ -2482,7 +2483,7 @@ ManageWorkerPool(WorkerPool *workerPool) } INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime); - execution->connectionSetChanged = true; + execution->rebuildWaitEventSet = true; } @@ -2751,7 +2752,15 @@ ConnectionStateMachine(WorkerSession *session) break; } + int beforePollSocket = PQsocket(connection->pgConn); PostgresPollingStatusType pollMode = PQconnectPoll(connection->pgConn); + + if (beforePollSocket != PQsocket(connection->pgConn)) + { + /* rebuild the wait events if PQconnectPoll() changed the socket */ + execution->rebuildWaitEventSet = true; + } + if (pollMode == PGRES_POLLING_FAILED) { connection->connectionState = MULTI_CONNECTION_FAILED; @@ -2759,10 +2768,16 @@ ConnectionStateMachine(WorkerSession *session) else if (pollMode == PGRES_POLLING_READING) { UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE); + + /* we should have a valid socket */ + Assert(PQsocket(connection->pgConn) != -1); } else if (pollMode == PGRES_POLLING_WRITING) { UpdateConnectionWaitFlags(session, WL_SOCKET_WRITEABLE); + + /* we should have a valid socket */ + Assert(PQsocket(connection->pgConn) != -1); } else { @@ -2771,6 +2786,9 @@ ConnectionStateMachine(WorkerSession *session) WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); connection->connectionState = MULTI_CONNECTION_CONNECTED; + + /* we should have a valid socket */ + Assert(PQsocket(connection->pgConn) != -1); } break; @@ -2855,7 +2873,7 @@ ConnectionStateMachine(WorkerSession *session) ShutdownConnection(connection); /* remove connection from wait event set */ - execution->connectionSetChanged = true; + execution->rebuildWaitEventSet = true; /* * Reset the transaction state machine since CloseConnection()