diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 221757a5e..a118994ff 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -380,6 +380,96 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) } +/* + * PutRemoteCopyData is a wrapper around PQputCopyData() that handles + * interrupts. + * + * Returns false if PQputCopyData() failed, true otherwise. + */ +bool +PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes) +{ + PGconn *pgConn = connection->pgConn; + bool wasNonblocking = false; + int copyState = 0; + bool success = false; + + if (PQstatus(pgConn) != CONNECTION_OK) + { + return false; + } + + wasNonblocking = PQisnonblocking(pgConn); + + PQsetnonblocking(pgConn, true); + + copyState = PQputCopyData(pgConn, buffer, nbytes); + + if (copyState == 1) + { + /* successful */ + success = true; + } + else if (copyState == -1) + { + success = false; + } + else + { + bool allowInterrupts = true; + success = FinishConnectionIO(connection, allowInterrupts); + } + + PQsetnonblocking(pgConn, wasNonblocking); + return success; +} + + +/* + * PutRemoteCopyEnd is a wrapper around PQputCopyEnd() that handles + * interrupts. + * + * Returns false if PQputCopyEnd() failed, true otherwise. + */ +bool +PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg) +{ + PGconn *pgConn = connection->pgConn; + bool wasNonblocking = false; + int copyState = 0; + bool success = false; + + if (PQstatus(pgConn) != CONNECTION_OK) + { + return false; + } + + wasNonblocking = PQisnonblocking(pgConn); + + PQsetnonblocking(pgConn, true); + + copyState = PQputCopyEnd(pgConn, errormsg); + + if (copyState == 1) + { + /* successful */ + success = true; + } + else if (copyState == -1) + { + success = false; + } + else + { + bool allowInterrupts = true; + success = FinishConnectionIO(connection, allowInterrupts); + } + + PQsetnonblocking(pgConn, wasNonblocking); + return success; +} + + /* * FinishConnectionIO performs pending IO for the connection, while accepting * interrupts. diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 178dc2f7a..cb8c9dcd5 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -46,6 +46,9 @@ extern int SendRemoteCommandParams(MultiConnection *connection, const char *comm const char *const *parameterValues); extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts); +extern bool PutRemoteCopyData(MultiConnection *connection, const char *buffer, + int nbytes); +extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg); #endif /* REMOTE_COMMAND_H */