mirror of https://github.com/citusdata/citus.git
Add interrupt aware PQputCopy{End,Data} wrappers.
parent
02fa8cee0e
commit
734921eca6
|
@ -380,6 +380,96 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* PutRemoteCopyData is a wrapper around PQputCopyData() that handles
|
||||
* interrupts.
|
||||
*
|
||||
* Returns false if PQputCopyData() failed, true otherwise.
|
||||
*/
|
||||
bool
|
||||
PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes)
|
||||
{
|
||||
PGconn *pgConn = connection->pgConn;
|
||||
bool wasNonblocking = false;
|
||||
int copyState = 0;
|
||||
bool success = false;
|
||||
|
||||
if (PQstatus(pgConn) != CONNECTION_OK)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
wasNonblocking = PQisnonblocking(pgConn);
|
||||
|
||||
PQsetnonblocking(pgConn, true);
|
||||
|
||||
copyState = PQputCopyData(pgConn, buffer, nbytes);
|
||||
|
||||
if (copyState == 1)
|
||||
{
|
||||
/* successful */
|
||||
success = true;
|
||||
}
|
||||
else if (copyState == -1)
|
||||
{
|
||||
success = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
bool allowInterrupts = true;
|
||||
success = FinishConnectionIO(connection, allowInterrupts);
|
||||
}
|
||||
|
||||
PQsetnonblocking(pgConn, wasNonblocking);
|
||||
return success;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PutRemoteCopyEnd is a wrapper around PQputCopyEnd() that handles
|
||||
* interrupts.
|
||||
*
|
||||
* Returns false if PQputCopyEnd() failed, true otherwise.
|
||||
*/
|
||||
bool
|
||||
PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg)
|
||||
{
|
||||
PGconn *pgConn = connection->pgConn;
|
||||
bool wasNonblocking = false;
|
||||
int copyState = 0;
|
||||
bool success = false;
|
||||
|
||||
if (PQstatus(pgConn) != CONNECTION_OK)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
wasNonblocking = PQisnonblocking(pgConn);
|
||||
|
||||
PQsetnonblocking(pgConn, true);
|
||||
|
||||
copyState = PQputCopyEnd(pgConn, errormsg);
|
||||
|
||||
if (copyState == 1)
|
||||
{
|
||||
/* successful */
|
||||
success = true;
|
||||
}
|
||||
else if (copyState == -1)
|
||||
{
|
||||
success = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
bool allowInterrupts = true;
|
||||
success = FinishConnectionIO(connection, allowInterrupts);
|
||||
}
|
||||
|
||||
PQsetnonblocking(pgConn, wasNonblocking);
|
||||
return success;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FinishConnectionIO performs pending IO for the connection, while accepting
|
||||
* interrupts.
|
||||
|
|
|
@ -46,6 +46,9 @@ extern int SendRemoteCommandParams(MultiConnection *connection, const char *comm
|
|||
const char *const *parameterValues);
|
||||
extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection,
|
||||
bool raiseInterrupts);
|
||||
extern bool PutRemoteCopyData(MultiConnection *connection, const char *buffer,
|
||||
int nbytes);
|
||||
extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg);
|
||||
|
||||
|
||||
#endif /* REMOTE_COMMAND_H */
|
||||
|
|
Loading…
Reference in New Issue