mirror of https://github.com/citusdata/citus.git
Add API for waiting for multiple connections
parent
9d93fb5551
commit
fca986f214
|
@ -23,7 +23,12 @@
|
||||||
/* GUC, determining whether statements sent to remote nodes are logged */
|
/* GUC, determining whether statements sent to remote nodes are logged */
|
||||||
bool LogRemoteCommands = false;
|
bool LogRemoteCommands = false;
|
||||||
|
|
||||||
|
|
||||||
static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts);
|
static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts);
|
||||||
|
static WaitEventSet * BuildWaitEventSet(MultiConnection **allConnections,
|
||||||
|
int totalConnectionCount,
|
||||||
|
int pendingConnectionsStartIndex);
|
||||||
|
|
||||||
|
|
||||||
/* simple helpers */
|
/* simple helpers */
|
||||||
|
|
||||||
|
@ -652,3 +657,235 @@ FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts)
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* WaitForAllConnections blocks until all connections in the list are no
|
||||||
|
* longer busy, meaning the pending command has either finished or failed.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
WaitForAllConnections(List *connectionList, bool raiseInterrupts)
|
||||||
|
{
|
||||||
|
int totalConnectionCount = list_length(connectionList);
|
||||||
|
int pendingConnectionsStartIndex = 0;
|
||||||
|
int connectionIndex = 0;
|
||||||
|
ListCell *connectionCell = NULL;
|
||||||
|
|
||||||
|
MultiConnection *allConnections[totalConnectionCount];
|
||||||
|
WaitEvent events[totalConnectionCount];
|
||||||
|
bool connectionReady[totalConnectionCount];
|
||||||
|
WaitEventSet *waitEventSet = NULL;
|
||||||
|
|
||||||
|
/* convert connection list to an array such that we can move items around */
|
||||||
|
foreach(connectionCell, connectionList)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
|
||||||
|
|
||||||
|
allConnections[connectionIndex] = connection;
|
||||||
|
connectionReady[connectionIndex] = false;
|
||||||
|
connectionIndex++;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* make an initial pass to check for failed and idle connections */
|
||||||
|
for (connectionIndex = pendingConnectionsStartIndex;
|
||||||
|
connectionIndex < totalConnectionCount; connectionIndex++)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = allConnections[connectionIndex];
|
||||||
|
|
||||||
|
if (PQstatus(connection->pgConn) == CONNECTION_BAD ||
|
||||||
|
!PQisBusy(connection->pgConn))
|
||||||
|
{
|
||||||
|
/* connection is already done; keep non-ready connections at the end */
|
||||||
|
allConnections[connectionIndex] =
|
||||||
|
allConnections[pendingConnectionsStartIndex];
|
||||||
|
pendingConnectionsStartIndex++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
PG_TRY();
|
||||||
|
{
|
||||||
|
bool rebuildWaitEventSet = true;
|
||||||
|
|
||||||
|
while (pendingConnectionsStartIndex < totalConnectionCount)
|
||||||
|
{
|
||||||
|
int eventIndex = 0;
|
||||||
|
int eventCount = 0;
|
||||||
|
long timeout = -1;
|
||||||
|
int pendingConnectionCount = totalConnectionCount -
|
||||||
|
pendingConnectionsStartIndex;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We cannot disable wait events as of postgres 9.6, so we rebuild the
|
||||||
|
* WaitEventSet whenever connections are ready.
|
||||||
|
*/
|
||||||
|
if (rebuildWaitEventSet)
|
||||||
|
{
|
||||||
|
if (waitEventSet != NULL)
|
||||||
|
{
|
||||||
|
FreeWaitEventSet(waitEventSet);
|
||||||
|
}
|
||||||
|
|
||||||
|
waitEventSet = BuildWaitEventSet(allConnections, totalConnectionCount,
|
||||||
|
pendingConnectionsStartIndex);
|
||||||
|
|
||||||
|
rebuildWaitEventSet = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* wait for I/O events */
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
eventCount = WaitEventSetWait(waitEventSet, timeout, events,
|
||||||
|
pendingConnectionCount, WAIT_EVENT_CLIENT_READ);
|
||||||
|
#else
|
||||||
|
eventCount = WaitEventSetWait(waitEventSet, timeout, events,
|
||||||
|
pendingConnectionCount);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* process I/O events */
|
||||||
|
for (; eventIndex < eventCount; eventIndex++)
|
||||||
|
{
|
||||||
|
WaitEvent *event = &events[eventIndex];
|
||||||
|
MultiConnection *connection = NULL;
|
||||||
|
bool connectionIsReady = false;
|
||||||
|
|
||||||
|
if (event->events & WL_POSTMASTER_DEATH)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event->events & WL_LATCH_SET)
|
||||||
|
{
|
||||||
|
ResetLatch(MyLatch);
|
||||||
|
|
||||||
|
if (raiseInterrupts)
|
||||||
|
{
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (InterruptHoldoffCount > 0 && (QueryCancelPending ||
|
||||||
|
ProcDiePending))
|
||||||
|
{
|
||||||
|
/* return immediately in case of cancellation */
|
||||||
|
FreeWaitEventSet(waitEventSet);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
connection = (MultiConnection *) event->user_data;
|
||||||
|
connectionIndex = event->pos + pendingConnectionsStartIndex;
|
||||||
|
|
||||||
|
if (event->events & WL_SOCKET_WRITEABLE)
|
||||||
|
{
|
||||||
|
int sendStatus = PQflush(connection->pgConn);
|
||||||
|
if (sendStatus == -1)
|
||||||
|
{
|
||||||
|
/* send failed, done with this connection */
|
||||||
|
connectionIsReady = true;
|
||||||
|
}
|
||||||
|
else if (sendStatus == 0)
|
||||||
|
{
|
||||||
|
/* done writing, only wait for read events */
|
||||||
|
ModifyWaitEvent(waitEventSet, connectionIndex, WL_SOCKET_READABLE,
|
||||||
|
NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event->events & WL_SOCKET_READABLE)
|
||||||
|
{
|
||||||
|
int receiveStatus = PQconsumeInput(connection->pgConn);
|
||||||
|
if (receiveStatus == 0)
|
||||||
|
{
|
||||||
|
/* receive failed, done with this connection */
|
||||||
|
connectionIsReady = true;
|
||||||
|
}
|
||||||
|
else if (!PQisBusy(connection->pgConn))
|
||||||
|
{
|
||||||
|
/* result was received */
|
||||||
|
connectionIsReady = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (connectionIsReady)
|
||||||
|
{
|
||||||
|
connectionReady[connectionIndex] = true;
|
||||||
|
rebuildWaitEventSet = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* move non-ready connections to the back of the array */
|
||||||
|
for (connectionIndex = pendingConnectionsStartIndex;
|
||||||
|
connectionIndex < totalConnectionCount; connectionIndex++)
|
||||||
|
{
|
||||||
|
if (connectionReady[connectionIndex])
|
||||||
|
{
|
||||||
|
allConnections[connectionIndex] =
|
||||||
|
allConnections[pendingConnectionsStartIndex];
|
||||||
|
pendingConnectionsStartIndex++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (waitEventSet != NULL)
|
||||||
|
{
|
||||||
|
FreeWaitEventSet(waitEventSet);
|
||||||
|
waitEventSet = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
PG_CATCH();
|
||||||
|
{
|
||||||
|
/* make sure the epoll file descriptor is always closed */
|
||||||
|
if (waitEventSet != NULL)
|
||||||
|
{
|
||||||
|
FreeWaitEventSet(waitEventSet);
|
||||||
|
waitEventSet = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
PG_RE_THROW();
|
||||||
|
}
|
||||||
|
PG_END_TRY();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BuildWaitEventSet creates a WaitEventSet for the given array of connections
|
||||||
|
* which can be used to wait for any of the sockets to become read-ready, or
|
||||||
|
* write-ready in case there is data to send.
|
||||||
|
*/
|
||||||
|
static WaitEventSet *
|
||||||
|
BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
|
||||||
|
int pendingConnectionsStartIndex)
|
||||||
|
{
|
||||||
|
int pendingConnectionCount = totalConnectionCount - pendingConnectionsStartIndex;
|
||||||
|
WaitEventSet *waitEventSet = NULL;
|
||||||
|
int connectionIndex = 0;
|
||||||
|
|
||||||
|
/* allocate pending connections + 2 for the signal latch and postmaster death */
|
||||||
|
waitEventSet = CreateWaitEventSet(CurrentMemoryContext, pendingConnectionCount + 2);
|
||||||
|
|
||||||
|
for (connectionIndex = pendingConnectionsStartIndex;
|
||||||
|
connectionIndex < totalConnectionCount; connectionIndex++)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = allConnections[connectionIndex];
|
||||||
|
int socket = PQsocket(connection->pgConn);
|
||||||
|
int eventMask = WL_SOCKET_READABLE;
|
||||||
|
|
||||||
|
int sendStatus = PQflush(connection->pgConn);
|
||||||
|
if (sendStatus == 1)
|
||||||
|
{
|
||||||
|
/* we have data to send, wake up when the socket is ready to write */
|
||||||
|
eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, (void *) connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Put the wait events for the signal latch and postmaster death at the end such that
|
||||||
|
* event index + pendingConnectionsStartIndex = the connection index in the array.
|
||||||
|
*/
|
||||||
|
AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
|
||||||
|
AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
|
||||||
|
|
||||||
|
return waitEventSet;
|
||||||
|
}
|
||||||
|
|
|
@ -52,5 +52,8 @@ extern bool PutRemoteCopyData(MultiConnection *connection, const char *buffer,
|
||||||
int nbytes);
|
int nbytes);
|
||||||
extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg);
|
extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg);
|
||||||
|
|
||||||
|
/* waiting for multiple command results */
|
||||||
|
extern void WaitForAllConnections(List *connectionList, bool raiseInterrupts);
|
||||||
|
|
||||||
|
|
||||||
#endif /* REMOTE_COMMAND_H */
|
#endif /* REMOTE_COMMAND_H */
|
||||||
|
|
Loading…
Reference in New Issue