Wait for commit/abort/prepare results asynchronously

pull/1481/head
Marco Slot 2017-08-10 20:05:16 +03:00
parent fca986f214
commit fdff210ef7
1 changed files with 18 additions and 4 deletions

View File

@ -521,6 +521,7 @@ void
RemoteTransactionsBeginIfNecessary(List *connectionList) RemoteTransactionsBeginIfNecessary(List *connectionList)
{ {
ListCell *connectionCell = NULL; ListCell *connectionCell = NULL;
bool raiseInterrupts = true;
/* /*
* Don't do anything if not in a coordinated transaction. That allows the * Don't do anything if not in a coordinated transaction. That allows the
@ -554,7 +555,8 @@ RemoteTransactionsBeginIfNecessary(List *connectionList)
StartRemoteTransactionBegin(connection); StartRemoteTransactionBegin(connection);
} }
/* XXX: Should perform network IO for all connections in a non-blocking manner */ raiseInterrupts = true;
WaitForAllConnections(connectionList, raiseInterrupts);
/* get result of all the BEGINs */ /* get result of all the BEGINs */
foreach(connectionCell, connectionList) foreach(connectionCell, connectionList)
@ -661,6 +663,8 @@ void
CoordinatedRemoteTransactionsPrepare(void) CoordinatedRemoteTransactionsPrepare(void)
{ {
dlist_iter iter; dlist_iter iter;
bool raiseInterrupts = false;
List *connectionList = NIL;
/* issue PREPARE TRANSACTION; to all relevant remote nodes */ /* issue PREPARE TRANSACTION; to all relevant remote nodes */
@ -680,9 +684,11 @@ CoordinatedRemoteTransactionsPrepare(void)
} }
StartRemoteTransactionPrepare(connection); StartRemoteTransactionPrepare(connection);
connectionList = lappend(connectionList, connection);
} }
/* XXX: Should perform network IO for all connections in a non-blocking manner */ raiseInterrupts = true;
WaitForAllConnections(connectionList, raiseInterrupts);
/* Wait for result */ /* Wait for result */
dlist_foreach(iter, &InProgressTransactions) dlist_foreach(iter, &InProgressTransactions)
@ -715,6 +721,8 @@ void
CoordinatedRemoteTransactionsCommit(void) CoordinatedRemoteTransactionsCommit(void)
{ {
dlist_iter iter; dlist_iter iter;
List *connectionList = NIL;
bool raiseInterrupts = false;
/* /*
* Before starting to commit on any of the nodes - after which we can't * Before starting to commit on any of the nodes - after which we can't
@ -744,9 +752,11 @@ CoordinatedRemoteTransactionsCommit(void)
} }
StartRemoteTransactionCommit(connection); StartRemoteTransactionCommit(connection);
connectionList = lappend(connectionList, connection);
} }
/* XXX: Should perform network IO for all connections in a non-blocking manner */ raiseInterrupts = false;
WaitForAllConnections(connectionList, raiseInterrupts);
/* wait for the replies to the commands to come in */ /* wait for the replies to the commands to come in */
dlist_foreach(iter, &InProgressTransactions) dlist_foreach(iter, &InProgressTransactions)
@ -780,6 +790,8 @@ void
CoordinatedRemoteTransactionsAbort(void) CoordinatedRemoteTransactionsAbort(void)
{ {
dlist_iter iter; dlist_iter iter;
List *connectionList = NIL;
bool raiseInterrupts = false;
/* asynchronously send ROLLBACK [PREPARED] */ /* asynchronously send ROLLBACK [PREPARED] */
dlist_foreach(iter, &InProgressTransactions) dlist_foreach(iter, &InProgressTransactions)
@ -797,9 +809,11 @@ CoordinatedRemoteTransactionsAbort(void)
} }
StartRemoteTransactionAbort(connection); StartRemoteTransactionAbort(connection);
connectionList = lappend(connectionList, connection);
} }
/* XXX: Should perform network IO for all connections in a non-blocking manner */ raiseInterrupts = false;
WaitForAllConnections(connectionList, raiseInterrupts);
/* and wait for the results */ /* and wait for the results */
dlist_foreach(iter, &InProgressTransactions) dlist_foreach(iter, &InProgressTransactions)