From c674bc86401b359c302363d864d1c9ef097cd5de Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 30 Jun 2017 18:20:54 -0700 Subject: [PATCH] Add interrupt aware PQputCopy{End,Data} wrappers. --- .../distributed/connection/remote_commands.c | 90 +++++++++++++++++++ src/include/distributed/remote_commands.h | 3 + 2 files changed, 93 insertions(+) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index ffa4625cf..8e3dcc669 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -414,6 +414,96 @@ GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts) } +/* + * PutRemoteCopyData is a wrapper around PQputCopyData() that handles + * interrupts. + * + * Returns false if PQputCopyData() failed, true otherwise. + */ +bool +PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes) +{ + PGconn *pgConn = connection->pgConn; + bool wasNonblocking = false; + int copyState = 0; + bool success = false; + + if (PQstatus(pgConn) != CONNECTION_OK) + { + return false; + } + + wasNonblocking = PQisnonblocking(pgConn); + + PQsetnonblocking(pgConn, true); + + copyState = PQputCopyData(pgConn, buffer, nbytes); + + if (copyState == 1) + { + /* successful */ + success = true; + } + else if (copyState == -1) + { + success = false; + } + else + { + bool allowInterrupts = true; + success = FinishConnectionIO(connection, allowInterrupts); + } + + PQsetnonblocking(pgConn, wasNonblocking); + return success; +} + + +/* + * PutRemoteCopyEnd is a wrapper around PQputCopyEnd() that handles + * interrupts. + * + * Returns false if PQputCopyEnd() failed, true otherwise. + */ +bool +PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg) +{ + PGconn *pgConn = connection->pgConn; + bool wasNonblocking = false; + int copyState = 0; + bool success = false; + + if (PQstatus(pgConn) != CONNECTION_OK) + { + return false; + } + + wasNonblocking = PQisnonblocking(pgConn); + + PQsetnonblocking(pgConn, true); + + copyState = PQputCopyEnd(pgConn, errormsg); + + if (copyState == 1) + { + /* successful */ + success = true; + } + else if (copyState == -1) + { + success = false; + } + else + { + bool allowInterrupts = true; + success = FinishConnectionIO(connection, allowInterrupts); + } + + PQsetnonblocking(pgConn, wasNonblocking); + return success; +} + + /* * FinishConnectionIO performs pending IO for the connection, while accepting * interrupts. diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 3799d9d97..6d3f2abb3 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -47,6 +47,9 @@ extern int SendRemoteCommandParams(MultiConnection *connection, const char *comm extern List * ReadFirstColumnAsText(struct pg_result *queryResult); extern struct pg_result * GetRemoteCommandResult(MultiConnection *connection, bool raiseInterrupts); +extern bool PutRemoteCopyData(MultiConnection *connection, const char *buffer, + int nbytes); +extern bool PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg); #endif /* REMOTE_COMMAND_H */