diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 3c9bf07f5..221757a5e 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -21,6 +21,7 @@ /* GUC, determining whether statements sent to remote nodes are logged */ bool LogRemoteCommands = false; +static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts); /* simple helpers */ @@ -333,7 +334,7 @@ SendRemoteCommand(MultiConnection *connection, const char *command) * an error. * * If raiseInterrupts is false and an interrupt arrives that'd otherwise raise - * an error, GetRemotecommandResult returns NULL, and the transaction is + * an error, GetRemoteCommandResult returns NULL, and the transaction is * marked as having failed. While that's not a perfect way to signal failure, * callers will usually treat that as an error, and it's easy to use. * @@ -345,11 +346,8 @@ PGresult * GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) { PGconn *pgConn = connection->pgConn; - int socket = 0; - int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET; bool wasNonblocking = false; PGresult *result = NULL; - bool failed = false; /* * Short circuit tests around the more expensive parts of this @@ -361,97 +359,92 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) return PQgetResult(connection->pgConn); } - socket = PQsocket(pgConn); wasNonblocking = PQisnonblocking(pgConn); /* make sure not to block anywhere */ - if (!wasNonblocking) + PQsetnonblocking(pgConn, true); + + if (!FinishConnectionIO(connection, raiseInterrupts)) { - PQsetnonblocking(pgConn, true); + return NULL; } + /* no IO should be necessary to get result */ + Assert(!PQisBusy(pgConn)); + + result = PQgetResult(connection->pgConn); + + PQsetnonblocking(pgConn, wasNonblocking); + + return result; +} + + +/* + * FinishConnectionIO performs pending IO for the connection, while accepting + * interrupts. + * + * See GetRemoteCommandResult() for documentation of interrupt handling + * behaviour. + * + * Returns true if IO was successfully completed, false otherwise. + */ +static bool +FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts) +{ + PGconn *pgConn = connection->pgConn; + int socket = PQsocket(pgConn); + + Assert(pgConn); + Assert(PQisnonblocking(pgConn)); + if (raiseInterrupts) { CHECK_FOR_INTERRUPTS(); } - /* make sure command has been sent out */ - while (!failed) + /* perform the necessary IO */ + while (true) { + int sendStatus = 0; int rc = 0; + int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET; - ResetLatch(MyLatch); - - /* try to send all the data */ - rc = PQflush(pgConn); - - /* stop writing if all data has been sent, or there was none to send */ - if (rc == 0) - { - break; - } + /* try to send all pending data */ + sendStatus = PQflush(pgConn); /* if sending failed, there's nothing more we can do */ - if (rc == -1) + if (sendStatus == -1) { - failed = true; - break; + return false; } - - /* this means we have to wait for data to go out */ - Assert(rc == 1); - - rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_WRITEABLE, socket, 0); - - if (rc & WL_POSTMASTER_DEATH) + else if (sendStatus == 1) { - ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + waitFlags |= WL_SOCKET_WRITEABLE; } - if (rc & WL_LATCH_SET) - { - /* if allowed raise errors */ - if (raiseInterrupts) - { - CHECK_FOR_INTERRUPTS(); - } - - /* - * If raising errors allowed, or called within in a section with - * interrupts held, return NULL instead, and mark the transaction - * as failed. - */ - if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending)) - { - connection->remoteTransaction.transactionFailed = true; - failed = true; - break; - } - } - } - - /* wait for the result of the command to come in */ - while (!failed) - { - int rc = 0; - - ResetLatch(MyLatch); - /* if reading fails, there's not much we can do */ if (PQconsumeInput(pgConn) == 0) { - failed = true; - break; + return false; } - - /* check if all the necessary data is now available */ - if (!PQisBusy(pgConn)) + if (PQisBusy(pgConn)) { - result = PQgetResult(connection->pgConn); - break; + waitFlags |= WL_SOCKET_READABLE; } + if ((waitFlags & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0) + { + /* no IO necessary anymore, we're done */ + return true; + } + +#if (PG_VERSION_NUM >= 100000) + rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0, + PG_WAIT_EXTENSION); +#else rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0); +#endif if (rc & WL_POSTMASTER_DEATH) { @@ -460,6 +453,8 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) if (rc & WL_LATCH_SET) { + ResetLatch(MyLatch); + /* if allowed raise errors */ if (raiseInterrupts) { @@ -468,22 +463,16 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) /* * If raising errors allowed, or called within in a section with - * interrupts held, return NULL instead, and mark the transaction - * as failed. + * interrupts held, return instead, and mark the transaction as + * failed. */ if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending)) { connection->remoteTransaction.transactionFailed = true; - failed = true; break; } } } - if (!wasNonblocking) - { - PQsetnonblocking(pgConn, false); - } - - return result; + return false; }