mirror of https://github.com/citusdata/citus.git
Move interrupt handling code from GetRemoteCommandResult to FinishConnectionIO.
Nearby commits will add additional interrupt handling functions, this way we don't have to duplicate the code.pull/1650/head
parent
7db61f148b
commit
02fa8cee0e
|
@ -21,6 +21,7 @@
|
||||||
/* GUC, determining whether statements sent to remote nodes are logged */
|
/* GUC, determining whether statements sent to remote nodes are logged */
|
||||||
bool LogRemoteCommands = false;
|
bool LogRemoteCommands = false;
|
||||||
|
|
||||||
|
static bool FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts);
|
||||||
|
|
||||||
/* simple helpers */
|
/* simple helpers */
|
||||||
|
|
||||||
|
@ -333,7 +334,7 @@ SendRemoteCommand(MultiConnection *connection, const char *command)
|
||||||
* an error.
|
* an error.
|
||||||
*
|
*
|
||||||
* If raiseInterrupts is false and an interrupt arrives that'd otherwise raise
|
* If raiseInterrupts is false and an interrupt arrives that'd otherwise raise
|
||||||
* an error, GetRemotecommandResult returns NULL, and the transaction is
|
* an error, GetRemoteCommandResult returns NULL, and the transaction is
|
||||||
* marked as having failed. While that's not a perfect way to signal failure,
|
* marked as having failed. While that's not a perfect way to signal failure,
|
||||||
* callers will usually treat that as an error, and it's easy to use.
|
* callers will usually treat that as an error, and it's easy to use.
|
||||||
*
|
*
|
||||||
|
@ -345,11 +346,8 @@ PGresult *
|
||||||
GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
|
GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
|
||||||
{
|
{
|
||||||
PGconn *pgConn = connection->pgConn;
|
PGconn *pgConn = connection->pgConn;
|
||||||
int socket = 0;
|
|
||||||
int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET;
|
|
||||||
bool wasNonblocking = false;
|
bool wasNonblocking = false;
|
||||||
PGresult *result = NULL;
|
PGresult *result = NULL;
|
||||||
bool failed = false;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Short circuit tests around the more expensive parts of this
|
* Short circuit tests around the more expensive parts of this
|
||||||
|
@ -361,97 +359,92 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
|
||||||
return PQgetResult(connection->pgConn);
|
return PQgetResult(connection->pgConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
socket = PQsocket(pgConn);
|
|
||||||
wasNonblocking = PQisnonblocking(pgConn);
|
wasNonblocking = PQisnonblocking(pgConn);
|
||||||
|
|
||||||
/* make sure not to block anywhere */
|
/* make sure not to block anywhere */
|
||||||
if (!wasNonblocking)
|
|
||||||
{
|
|
||||||
PQsetnonblocking(pgConn, true);
|
PQsetnonblocking(pgConn, true);
|
||||||
|
|
||||||
|
if (!FinishConnectionIO(connection, raiseInterrupts))
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (raiseInterrupts)
|
/* no IO should be necessary to get result */
|
||||||
{
|
Assert(!PQisBusy(pgConn));
|
||||||
CHECK_FOR_INTERRUPTS();
|
|
||||||
|
result = PQgetResult(connection->pgConn);
|
||||||
|
|
||||||
|
PQsetnonblocking(pgConn, wasNonblocking);
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* make sure command has been sent out */
|
|
||||||
while (!failed)
|
|
||||||
{
|
|
||||||
int rc = 0;
|
|
||||||
|
|
||||||
ResetLatch(MyLatch);
|
|
||||||
|
|
||||||
/* try to send all the data */
|
|
||||||
rc = PQflush(pgConn);
|
|
||||||
|
|
||||||
/* stop writing if all data has been sent, or there was none to send */
|
|
||||||
if (rc == 0)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* if sending failed, there's nothing more we can do */
|
|
||||||
if (rc == -1)
|
|
||||||
{
|
|
||||||
failed = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* this means we have to wait for data to go out */
|
|
||||||
Assert(rc == 1);
|
|
||||||
|
|
||||||
rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_WRITEABLE, socket, 0);
|
|
||||||
|
|
||||||
if (rc & WL_POSTMASTER_DEATH)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (rc & WL_LATCH_SET)
|
|
||||||
{
|
|
||||||
/* if allowed raise errors */
|
|
||||||
if (raiseInterrupts)
|
|
||||||
{
|
|
||||||
CHECK_FOR_INTERRUPTS();
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If raising errors allowed, or called within in a section with
|
* FinishConnectionIO performs pending IO for the connection, while accepting
|
||||||
* interrupts held, return NULL instead, and mark the transaction
|
* interrupts.
|
||||||
* as failed.
|
*
|
||||||
|
* See GetRemoteCommandResult() for documentation of interrupt handling
|
||||||
|
* behaviour.
|
||||||
|
*
|
||||||
|
* Returns true if IO was successfully completed, false otherwise.
|
||||||
*/
|
*/
|
||||||
if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending))
|
static bool
|
||||||
|
FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts)
|
||||||
{
|
{
|
||||||
connection->remoteTransaction.transactionFailed = true;
|
PGconn *pgConn = connection->pgConn;
|
||||||
failed = true;
|
int socket = PQsocket(pgConn);
|
||||||
break;
|
|
||||||
}
|
Assert(pgConn);
|
||||||
}
|
Assert(PQisnonblocking(pgConn));
|
||||||
|
|
||||||
|
if (raiseInterrupts)
|
||||||
|
{
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* wait for the result of the command to come in */
|
/* perform the necessary IO */
|
||||||
while (!failed)
|
while (true)
|
||||||
{
|
{
|
||||||
|
int sendStatus = 0;
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
|
int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET;
|
||||||
|
|
||||||
ResetLatch(MyLatch);
|
/* try to send all pending data */
|
||||||
|
sendStatus = PQflush(pgConn);
|
||||||
|
|
||||||
|
/* if sending failed, there's nothing more we can do */
|
||||||
|
if (sendStatus == -1)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
else if (sendStatus == 1)
|
||||||
|
{
|
||||||
|
waitFlags |= WL_SOCKET_WRITEABLE;
|
||||||
|
}
|
||||||
|
|
||||||
/* if reading fails, there's not much we can do */
|
/* if reading fails, there's not much we can do */
|
||||||
if (PQconsumeInput(pgConn) == 0)
|
if (PQconsumeInput(pgConn) == 0)
|
||||||
{
|
{
|
||||||
failed = true;
|
return false;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
if (PQisBusy(pgConn))
|
||||||
/* check if all the necessary data is now available */
|
|
||||||
if (!PQisBusy(pgConn))
|
|
||||||
{
|
{
|
||||||
result = PQgetResult(connection->pgConn);
|
waitFlags |= WL_SOCKET_READABLE;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((waitFlags & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0)
|
||||||
|
{
|
||||||
|
/* no IO necessary anymore, we're done */
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0,
|
||||||
|
PG_WAIT_EXTENSION);
|
||||||
|
#else
|
||||||
rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0);
|
rc = WaitLatchOrSocket(MyLatch, waitFlags | WL_SOCKET_READABLE, socket, 0);
|
||||||
|
#endif
|
||||||
|
|
||||||
if (rc & WL_POSTMASTER_DEATH)
|
if (rc & WL_POSTMASTER_DEATH)
|
||||||
{
|
{
|
||||||
|
@ -460,6 +453,8 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
|
||||||
|
|
||||||
if (rc & WL_LATCH_SET)
|
if (rc & WL_LATCH_SET)
|
||||||
{
|
{
|
||||||
|
ResetLatch(MyLatch);
|
||||||
|
|
||||||
/* if allowed raise errors */
|
/* if allowed raise errors */
|
||||||
if (raiseInterrupts)
|
if (raiseInterrupts)
|
||||||
{
|
{
|
||||||
|
@ -468,22 +463,16 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If raising errors allowed, or called within in a section with
|
* If raising errors allowed, or called within in a section with
|
||||||
* interrupts held, return NULL instead, and mark the transaction
|
* interrupts held, return instead, and mark the transaction as
|
||||||
* as failed.
|
* failed.
|
||||||
*/
|
*/
|
||||||
if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending))
|
if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending))
|
||||||
{
|
{
|
||||||
connection->remoteTransaction.transactionFailed = true;
|
connection->remoteTransaction.transactionFailed = true;
|
||||||
failed = true;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!wasNonblocking)
|
return false;
|
||||||
{
|
|
||||||
PQsetnonblocking(pgConn, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue