diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 0bd1f2680..f30b94563 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -497,6 +497,7 @@ PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes) { PGconn *pgConn = connection->pgConn; int copyState = 0; + bool allowInterrupts = true; if (PQstatus(pgConn) != CONNECTION_OK) { @@ -506,21 +507,22 @@ PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes) Assert(PQisnonblocking(pgConn)); copyState = PQputCopyData(pgConn, buffer, nbytes); - - if (copyState == 1) - { - /* successful */ - return true; - } - else if (copyState == -1) + if (copyState == -1) { return false; } - else - { - bool allowInterrupts = true; - return FinishConnectionIO(connection, allowInterrupts); - } + + /* + * PQputCopyData may have queued up part of the data even if it managed + * to send some of it succesfully. We provide back pressure by waiting + * until the socket is writable to prevent the internal libpq buffers + * from growing excessively. + * + * In the future, we could reduce the frequency of these pushbacks to + * achieve higher throughput. + */ + + return FinishConnectionIO(connection, allowInterrupts); } @@ -535,6 +537,7 @@ PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg) { PGconn *pgConn = connection->pgConn; int copyState = 0; + bool allowInterrupts = true; if (PQstatus(pgConn) != CONNECTION_OK) { @@ -544,21 +547,14 @@ PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg) Assert(PQisnonblocking(pgConn)); copyState = PQputCopyEnd(pgConn, errormsg); - - if (copyState == 1) - { - /* successful */ - return true; - } - else if (copyState == -1) + if (copyState == -1) { return false; } - else - { - bool allowInterrupts = true; - return FinishConnectionIO(connection, allowInterrupts); - } + + /* see PutRemoteCopyData() */ + + return FinishConnectionIO(connection, allowInterrupts); }