Merge pull request #1481 from citusdata/async_commit

Wait for commit/abort/prepare results asynchronously
pull/1452/head
Marco Slot 2017-08-11 00:17:14 +02:00 committed by GitHub
commit c4cd3a6e06
3 changed files with 258 additions and 4 deletions

View File

@ -23,7 +23,12 @@
/* GUC, determining whether statements sent to remote nodes are logged */
bool LogRemoteCommands = false;
static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts);
static WaitEventSet * BuildWaitEventSet(MultiConnection **allConnections,
int totalConnectionCount,
int pendingConnectionsStartIndex);
/* simple helpers */
@ -652,3 +657,235 @@ FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts)
return false;
}
/*
* WaitForAllConnections blocks until all connections in the list are no
* longer busy, meaning the pending command has either finished or failed.
*/
void
WaitForAllConnections(List *connectionList, bool raiseInterrupts)
{
int totalConnectionCount = list_length(connectionList);
int pendingConnectionsStartIndex = 0;
int connectionIndex = 0;
ListCell *connectionCell = NULL;
MultiConnection *allConnections[totalConnectionCount];
WaitEvent events[totalConnectionCount];
bool connectionReady[totalConnectionCount];
WaitEventSet *waitEventSet = NULL;
/* convert connection list to an array such that we can move items around */
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
allConnections[connectionIndex] = connection;
connectionReady[connectionIndex] = false;
connectionIndex++;
}
/* make an initial pass to check for failed and idle connections */
for (connectionIndex = pendingConnectionsStartIndex;
connectionIndex < totalConnectionCount; connectionIndex++)
{
MultiConnection *connection = allConnections[connectionIndex];
if (PQstatus(connection->pgConn) == CONNECTION_BAD ||
!PQisBusy(connection->pgConn))
{
/* connection is already done; keep non-ready connections at the end */
allConnections[connectionIndex] =
allConnections[pendingConnectionsStartIndex];
pendingConnectionsStartIndex++;
}
}
PG_TRY();
{
bool rebuildWaitEventSet = true;
while (pendingConnectionsStartIndex < totalConnectionCount)
{
int eventIndex = 0;
int eventCount = 0;
long timeout = -1;
int pendingConnectionCount = totalConnectionCount -
pendingConnectionsStartIndex;
/*
* We cannot disable wait events as of postgres 9.6, so we rebuild the
* WaitEventSet whenever connections are ready.
*/
if (rebuildWaitEventSet)
{
if (waitEventSet != NULL)
{
FreeWaitEventSet(waitEventSet);
}
waitEventSet = BuildWaitEventSet(allConnections, totalConnectionCount,
pendingConnectionsStartIndex);
rebuildWaitEventSet = false;
}
/* wait for I/O events */
#if (PG_VERSION_NUM >= 100000)
eventCount = WaitEventSetWait(waitEventSet, timeout, events,
pendingConnectionCount, WAIT_EVENT_CLIENT_READ);
#else
eventCount = WaitEventSetWait(waitEventSet, timeout, events,
pendingConnectionCount);
#endif
/* process I/O events */
for (; eventIndex < eventCount; eventIndex++)
{
WaitEvent *event = &events[eventIndex];
MultiConnection *connection = NULL;
bool connectionIsReady = false;
if (event->events & WL_POSTMASTER_DEATH)
{
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
}
if (event->events & WL_LATCH_SET)
{
ResetLatch(MyLatch);
if (raiseInterrupts)
{
CHECK_FOR_INTERRUPTS();
}
if (InterruptHoldoffCount > 0 && (QueryCancelPending ||
ProcDiePending))
{
/* return immediately in case of cancellation */
FreeWaitEventSet(waitEventSet);
return;
}
continue;
}
connection = (MultiConnection *) event->user_data;
connectionIndex = event->pos + pendingConnectionsStartIndex;
if (event->events & WL_SOCKET_WRITEABLE)
{
int sendStatus = PQflush(connection->pgConn);
if (sendStatus == -1)
{
/* send failed, done with this connection */
connectionIsReady = true;
}
else if (sendStatus == 0)
{
/* done writing, only wait for read events */
ModifyWaitEvent(waitEventSet, connectionIndex, WL_SOCKET_READABLE,
NULL);
}
}
if (event->events & WL_SOCKET_READABLE)
{
int receiveStatus = PQconsumeInput(connection->pgConn);
if (receiveStatus == 0)
{
/* receive failed, done with this connection */
connectionIsReady = true;
}
else if (!PQisBusy(connection->pgConn))
{
/* result was received */
connectionIsReady = true;
}
}
if (connectionIsReady)
{
connectionReady[connectionIndex] = true;
rebuildWaitEventSet = true;
}
}
/* move non-ready connections to the back of the array */
for (connectionIndex = pendingConnectionsStartIndex;
connectionIndex < totalConnectionCount; connectionIndex++)
{
if (connectionReady[connectionIndex])
{
allConnections[connectionIndex] =
allConnections[pendingConnectionsStartIndex];
pendingConnectionsStartIndex++;
}
}
}
if (waitEventSet != NULL)
{
FreeWaitEventSet(waitEventSet);
waitEventSet = NULL;
}
}
PG_CATCH();
{
/* make sure the epoll file descriptor is always closed */
if (waitEventSet != NULL)
{
FreeWaitEventSet(waitEventSet);
waitEventSet = NULL;
}
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* BuildWaitEventSet creates a WaitEventSet for the given array of connections
* which can be used to wait for any of the sockets to become read-ready, or
* write-ready in case there is data to send.
*/
static WaitEventSet *
BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
int pendingConnectionsStartIndex)
{
int pendingConnectionCount = totalConnectionCount - pendingConnectionsStartIndex;
WaitEventSet *waitEventSet = NULL;
int connectionIndex = 0;
/* allocate pending connections + 2 for the signal latch and postmaster death */
waitEventSet = CreateWaitEventSet(CurrentMemoryContext, pendingConnectionCount + 2);
for (connectionIndex = pendingConnectionsStartIndex;
connectionIndex < totalConnectionCount; connectionIndex++)
{
MultiConnection *connection = allConnections[connectionIndex];
int socket = PQsocket(connection->pgConn);
int eventMask = WL_SOCKET_READABLE;
int sendStatus = PQflush(connection->pgConn);
if (sendStatus == 1)
{
/* we have data to send, wake up when the socket is ready to write */
eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
}
AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, (void *) connection);
}
/*
* Put the wait events for the signal latch and postmaster death at the end such that
* event index + pendingConnectionsStartIndex = the connection index in the array.
*/
AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
return waitEventSet;
}

View File

@ -521,6 +521,7 @@ void
RemoteTransactionsBeginIfNecessary(List *connectionList)
{
ListCell *connectionCell = NULL;
bool raiseInterrupts = true;
/*
* Don't do anything if not in a coordinated transaction. That allows the
@ -554,7 +555,8 @@ RemoteTransactionsBeginIfNecessary(List *connectionList)
StartRemoteTransactionBegin(connection);
}
/* XXX: Should perform network IO for all connections in a non-blocking manner */
raiseInterrupts = true;
WaitForAllConnections(connectionList, raiseInterrupts);
/* get result of all the BEGINs */
foreach(connectionCell, connectionList)
@ -661,6 +663,8 @@ void
CoordinatedRemoteTransactionsPrepare(void)
{
dlist_iter iter;
bool raiseInterrupts = false;
List *connectionList = NIL;
/* issue PREPARE TRANSACTION; to all relevant remote nodes */
@ -680,9 +684,11 @@ CoordinatedRemoteTransactionsPrepare(void)
}
StartRemoteTransactionPrepare(connection);
connectionList = lappend(connectionList, connection);
}
/* XXX: Should perform network IO for all connections in a non-blocking manner */
raiseInterrupts = true;
WaitForAllConnections(connectionList, raiseInterrupts);
/* Wait for result */
dlist_foreach(iter, &InProgressTransactions)
@ -715,6 +721,8 @@ void
CoordinatedRemoteTransactionsCommit(void)
{
dlist_iter iter;
List *connectionList = NIL;
bool raiseInterrupts = false;
/*
* Before starting to commit on any of the nodes - after which we can't
@ -744,9 +752,11 @@ CoordinatedRemoteTransactionsCommit(void)
}
StartRemoteTransactionCommit(connection);
connectionList = lappend(connectionList, connection);
}
/* XXX: Should perform network IO for all connections in a non-blocking manner */
raiseInterrupts = false;
WaitForAllConnections(connectionList, raiseInterrupts);
/* wait for the replies to the commands to come in */
dlist_foreach(iter, &InProgressTransactions)
@ -780,6 +790,8 @@ void
CoordinatedRemoteTransactionsAbort(void)
{
dlist_iter iter;
List *connectionList = NIL;
bool raiseInterrupts = false;
/* asynchronously send ROLLBACK [PREPARED] */
dlist_foreach(iter, &InProgressTransactions)
@ -797,9 +809,11 @@ CoordinatedRemoteTransactionsAbort(void)
}
StartRemoteTransactionAbort(connection);
connectionList = lappend(connectionList, connection);
}
/* XXX: Should perform network IO for all connections in a non-blocking manner */
raiseInterrupts = false;
WaitForAllConnections(connectionList, raiseInterrupts);
/* and wait for the results */
dlist_foreach(iter, &InProgressTransactions)

View File

@ -52,5 +52,8 @@ extern bool PutRemoteCopyData(MultiConnection *connection, const char *buffer,
int nbytes);
extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg);
/* waiting for multiple command results */
extern void WaitForAllConnections(List *connectionList, bool raiseInterrupts);
#endif /* REMOTE_COMMAND_H */