Merge pull request #3812 from citusdata/rebuid_when_socket_channges

Rebuild WaitEventSet if socket changes after calling  `PQconnectPoll `
pull/3813/head
Önder Kalacı 2020-05-01 09:54:52 +02:00 committed by GitHub
commit 30d7765d0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 25 additions and 7 deletions

View File

@ -208,7 +208,7 @@ typedef struct DistributedExecution
* Flag to indiciate that the set of connections we are interested * Flag to indiciate that the set of connections we are interested
* in has changed and waitEventSet needs to be rebuilt. * in has changed and waitEventSet needs to be rebuilt.
*/ */
bool connectionSetChanged; bool rebuildWaitEventSet;
/* /*
* Flag to indiciate that the set of wait events we are interested * Flag to indiciate that the set of wait events we are interested
@ -1064,7 +1064,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
execution->raiseInterrupts = true; execution->raiseInterrupts = true;
execution->connectionSetChanged = false; execution->rebuildWaitEventSet = false;
execution->waitFlagsChanged = false; execution->waitFlagsChanged = false;
execution->jobIdList = jobIdList; execution->jobIdList = jobIdList;
@ -2008,6 +2008,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
session->connection = connection; session->connection = connection;
session->workerPool = workerPool; session->workerPool = workerPool;
session->commandsSent = 0; session->commandsSent = 0;
dlist_init(&session->pendingTaskQueue); dlist_init(&session->pendingTaskQueue);
dlist_init(&session->readyTaskQueue); dlist_init(&session->readyTaskQueue);
@ -2142,7 +2143,7 @@ RunDistributedExecution(DistributedExecution *execution)
int eventSetSize = GetEventSetSize(execution->sessionList); int eventSetSize = GetEventSetSize(execution->sessionList);
/* always (re)build the wait event set the first time */ /* always (re)build the wait event set the first time */
execution->connectionSetChanged = true; execution->rebuildWaitEventSet = true;
while (execution->unfinishedTaskCount > 0 && !cancellationReceived) while (execution->unfinishedTaskCount > 0 && !cancellationReceived)
{ {
@ -2154,7 +2155,7 @@ RunDistributedExecution(DistributedExecution *execution)
ManageWorkerPool(workerPool); ManageWorkerPool(workerPool);
} }
if (execution->connectionSetChanged) if (execution->rebuildWaitEventSet)
{ {
if (events != NULL) if (events != NULL)
{ {
@ -2236,7 +2237,7 @@ RebuildWaitEventSet(DistributedExecution *execution)
} }
execution->waitEventSet = BuildWaitEventSet(execution->sessionList); execution->waitEventSet = BuildWaitEventSet(execution->sessionList);
execution->connectionSetChanged = false; execution->rebuildWaitEventSet = false;
execution->waitFlagsChanged = false; execution->waitFlagsChanged = false;
return GetEventSetSize(execution->sessionList); return GetEventSetSize(execution->sessionList);
@ -2482,7 +2483,7 @@ ManageWorkerPool(WorkerPool *workerPool)
} }
INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime); INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime);
execution->connectionSetChanged = true; execution->rebuildWaitEventSet = true;
} }
@ -2751,7 +2752,15 @@ ConnectionStateMachine(WorkerSession *session)
break; break;
} }
int beforePollSocket = PQsocket(connection->pgConn);
PostgresPollingStatusType pollMode = PQconnectPoll(connection->pgConn); PostgresPollingStatusType pollMode = PQconnectPoll(connection->pgConn);
if (beforePollSocket != PQsocket(connection->pgConn))
{
/* rebuild the wait events if PQconnectPoll() changed the socket */
execution->rebuildWaitEventSet = true;
}
if (pollMode == PGRES_POLLING_FAILED) if (pollMode == PGRES_POLLING_FAILED)
{ {
connection->connectionState = MULTI_CONNECTION_FAILED; connection->connectionState = MULTI_CONNECTION_FAILED;
@ -2759,10 +2768,16 @@ ConnectionStateMachine(WorkerSession *session)
else if (pollMode == PGRES_POLLING_READING) else if (pollMode == PGRES_POLLING_READING)
{ {
UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE); UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE);
/* we should have a valid socket */
Assert(PQsocket(connection->pgConn) != -1);
} }
else if (pollMode == PGRES_POLLING_WRITING) else if (pollMode == PGRES_POLLING_WRITING)
{ {
UpdateConnectionWaitFlags(session, WL_SOCKET_WRITEABLE); UpdateConnectionWaitFlags(session, WL_SOCKET_WRITEABLE);
/* we should have a valid socket */
Assert(PQsocket(connection->pgConn) != -1);
} }
else else
{ {
@ -2771,6 +2786,9 @@ ConnectionStateMachine(WorkerSession *session)
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
connection->connectionState = MULTI_CONNECTION_CONNECTED; connection->connectionState = MULTI_CONNECTION_CONNECTED;
/* we should have a valid socket */
Assert(PQsocket(connection->pgConn) != -1);
} }
break; break;
@ -2855,7 +2873,7 @@ ConnectionStateMachine(WorkerSession *session)
ShutdownConnection(connection); ShutdownConnection(connection);
/* remove connection from wait event set */ /* remove connection from wait event set */
execution->connectionSetChanged = true; execution->rebuildWaitEventSet = true;
/* /*
* Reset the transaction state machine since CloseConnection() * Reset the transaction state machine since CloseConnection()