mirror of https://github.com/citusdata/citus.git
Wait for I/O to finish after PQputCopyData
parent
f5361d52e7
commit
d1befa4df9
|
@ -497,6 +497,7 @@ PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes)
|
||||||
{
|
{
|
||||||
PGconn *pgConn = connection->pgConn;
|
PGconn *pgConn = connection->pgConn;
|
||||||
int copyState = 0;
|
int copyState = 0;
|
||||||
|
bool allowInterrupts = true;
|
||||||
|
|
||||||
if (PQstatus(pgConn) != CONNECTION_OK)
|
if (PQstatus(pgConn) != CONNECTION_OK)
|
||||||
{
|
{
|
||||||
|
@ -506,22 +507,23 @@ PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes)
|
||||||
Assert(PQisnonblocking(pgConn));
|
Assert(PQisnonblocking(pgConn));
|
||||||
|
|
||||||
copyState = PQputCopyData(pgConn, buffer, nbytes);
|
copyState = PQputCopyData(pgConn, buffer, nbytes);
|
||||||
|
if (copyState == -1)
|
||||||
if (copyState == 1)
|
|
||||||
{
|
|
||||||
/* successful */
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
else if (copyState == -1)
|
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
/*
|
||||||
bool allowInterrupts = true;
|
* PQputCopyData may have queued up part of the data even if it managed
|
||||||
|
* to send some of it succesfully. We provide back pressure by waiting
|
||||||
|
* until the socket is writable to prevent the internal libpq buffers
|
||||||
|
* from growing excessively.
|
||||||
|
*
|
||||||
|
* In the future, we could reduce the frequency of these pushbacks to
|
||||||
|
* achieve higher throughput.
|
||||||
|
*/
|
||||||
|
|
||||||
return FinishConnectionIO(connection, allowInterrupts);
|
return FinishConnectionIO(connection, allowInterrupts);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -535,6 +537,7 @@ PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg)
|
||||||
{
|
{
|
||||||
PGconn *pgConn = connection->pgConn;
|
PGconn *pgConn = connection->pgConn;
|
||||||
int copyState = 0;
|
int copyState = 0;
|
||||||
|
bool allowInterrupts = true;
|
||||||
|
|
||||||
if (PQstatus(pgConn) != CONNECTION_OK)
|
if (PQstatus(pgConn) != CONNECTION_OK)
|
||||||
{
|
{
|
||||||
|
@ -544,22 +547,15 @@ PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg)
|
||||||
Assert(PQisnonblocking(pgConn));
|
Assert(PQisnonblocking(pgConn));
|
||||||
|
|
||||||
copyState = PQputCopyEnd(pgConn, errormsg);
|
copyState = PQputCopyEnd(pgConn, errormsg);
|
||||||
|
if (copyState == -1)
|
||||||
if (copyState == 1)
|
|
||||||
{
|
|
||||||
/* successful */
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
else if (copyState == -1)
|
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
/* see PutRemoteCopyData() */
|
||||||
bool allowInterrupts = true;
|
|
||||||
return FinishConnectionIO(connection, allowInterrupts);
|
return FinishConnectionIO(connection, allowInterrupts);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
Loading…
Reference in New Issue