From cca10914869a7f5e6385ded4fdca7c4edacda630 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 24 Dec 2019 00:35:16 +0000 Subject: [PATCH] Combine CopyDataFromConnection & MultiClientCopyData --- .../executor/intermediate_results.c | 82 ------------ .../executor/multi_client_executor.c | 120 ++++++++++++------ .../distributed/multi_client_executor.h | 3 + .../expected/multi_sql_function.out.modified | 0 4 files changed, 86 insertions(+), 119 deletions(-) create mode 100644 src/test/regress/expected/multi_sql_function.out.modified diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index b8d3c2c62..1bd19e5f8 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -98,9 +98,6 @@ static void ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, Datum *resultIdArray, int resultCount); static uint64 FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId); -static CopyStatus CopyDataFromConnection(MultiConnection *connection, - FileCompat *fileCompat, - uint64 *bytesReceived); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(read_intermediate_result); @@ -933,82 +930,3 @@ FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId) return totalBytesWritten; } - - -/* - * CopyDataFromConnection reads a row of copy data from connection and writes it - * to the given file. - */ -static CopyStatus -CopyDataFromConnection(MultiConnection *connection, FileCompat *fileCompat, - uint64 *bytesReceived) -{ - /* - * Consume input to handle the case where previous copy operation might have - * received zero bytes. - */ - int consumed = PQconsumeInput(connection->pgConn); - if (consumed == 0) - { - return CLIENT_COPY_FAILED; - } - - /* receive copy data message in an asynchronous manner */ - char *receiveBuffer = NULL; - bool asynchronous = true; - int receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous); - while (receiveLength > 0) - { - /* received copy data; append these data to file */ - errno = 0; - - int bytesWritten = FileWriteCompat(fileCompat, receiveBuffer, - receiveLength, PG_WAIT_IO); - if (bytesWritten != receiveLength) - { - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not append to file: %m"))); - } - - *bytesReceived += receiveLength; - PQfreemem(receiveBuffer); - receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous); - } - - if (receiveLength == 0) - { - /* we cannot read more data without blocking */ - return CLIENT_COPY_MORE; - } - else if (receiveLength == -1) - { - /* received copy done message */ - bool raiseInterrupts = true; - PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); - ExecStatusType resultStatus = PQresultStatus(result); - CopyStatus copyStatus = 0; - - if (resultStatus == PGRES_COMMAND_OK) - { - copyStatus = CLIENT_COPY_DONE; - } - else - { - copyStatus = CLIENT_COPY_FAILED; - - ReportResultError(connection, result, WARNING); - } - - PQclear(result); - ForgetResults(connection); - - return copyStatus; - } - else - { - Assert(receiveLength == -2); - ReportConnectionError(connection, WARNING); - - return CLIENT_COPY_FAILED; - } -} diff --git a/src/backend/distributed/executor/multi_client_executor.c b/src/backend/distributed/executor/multi_client_executor.c index e6918ce20..0080066ef 100644 --- a/src/backend/distributed/executor/multi_client_executor.c +++ b/src/backend/distributed/executor/multi_client_executor.c @@ -16,6 +16,7 @@ #include "fmgr.h" #include "libpq-fe.h" #include "miscadmin.h" +#include "pgstat.h" #include "commands/dbcommands.h" #include "distributed/metadata_cache.h" @@ -46,6 +47,13 @@ static PostgresPollingStatusType ClientPollingStatusArray[MAX_CONNECTION_COUNT]; /* Local functions forward declarations */ +static int FileDescriptorWriteCallback(void *file, char *buffer, int length); +static int FileWriteCompatCallback(void *file, char *buffer, int length); +static CopyStatus CopyDataFromConnectionWithCallbackForWrite(MultiConnection *connection, + void *file, + int (*write)(void *, char *, + int), + uint64 *bytesReceived); static bool ClientConnectionReady(MultiConnection *connection, PostgresPollingStatusType pollingStatus); @@ -508,18 +516,64 @@ MultiClientQueryStatus(int32 connectionId) } +/* + * FileDescriptorWriteCallback writes data to a file descriptor. + * For use with CopyDataFromConnectionWithWriteCallback. + */ +static int +FileDescriptorWriteCallback(void *file, char *buffer, int length) +{ + return write(*(int32 *) file, buffer, length); +} + + /* MultiClientCopyData copies data from the file. */ CopyStatus -MultiClientCopyData(int32 connectionId, int32 fileDescriptor, uint64 *returnBytesReceived) +MultiClientCopyData(int32 connectionId, int32 fileDescriptor, uint64 *bytesReceived) { - char *receiveBuffer = NULL; - const int asynchronous = 1; - CopyStatus copyStatus = CLIENT_INVALID_COPY; - Assert(connectionId != INVALID_CONNECTION_ID); MultiConnection *connection = ClientConnectionArray[connectionId]; Assert(connection != NULL); + return CopyDataFromConnectionWithCallbackForWrite( + connection, &fileDescriptor, FileDescriptorWriteCallback, bytesReceived); +} + + +/* + * FileWriteCompatCallback writes data to a FileCompat. + * For use with CopyDataFromConnectionWithWriteCallback. + */ +static int +FileWriteCompatCallback(void *file, char *buffer, int length) +{ + return FileWriteCompat(file, buffer, length, PG_WAIT_IO); +} + + +/* + * CopyDataFromConnection reads a row of copy data from connection and writes it + * to the given file. + */ +CopyStatus +CopyDataFromConnection(MultiConnection *connection, FileCompat *fileCompat, + uint64 *bytesReceived) +{ + return CopyDataFromConnectionWithCallbackForWrite( + connection, fileCompat, FileWriteCompatCallback, bytesReceived); +} + + +/* + * CopyDataFromConnection reads a row of copy data from connection and writes it + * to the given file. + */ +static CopyStatus +CopyDataFromConnectionWithCallbackForWrite(MultiConnection *connection, + void *file, + int (*writer)(void *, char *, int), + uint64 *bytesReceived) +{ /* * Consume input to handle the case where previous copy operation might have * received zero bytes. @@ -527,44 +581,44 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor, uint64 *returnByte int consumed = PQconsumeInput(connection->pgConn); if (consumed == 0) { - ereport(WARNING, (errmsg("could not read data from worker node"))); return CLIENT_COPY_FAILED; } /* receive copy data message in an asynchronous manner */ + char *receiveBuffer = NULL; + bool asynchronous = true; int receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous); while (receiveLength > 0) { /* received copy data; append these data to file */ errno = 0; - if (returnBytesReceived) - { - *returnBytesReceived += receiveLength; - } - - int appended = write(fileDescriptor, receiveBuffer, receiveLength); - if (appended != receiveLength) + int bytesWritten = writer(file, receiveBuffer, receiveLength); + if (bytesWritten != receiveLength) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) { errno = ENOSPC; } - ereport(FATAL, (errcode_for_file_access(), - errmsg("could not append to copied file: %m"))); + + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not append to file: %m"))); + } + + if (bytesReceived != NULL) + { + *bytesReceived += receiveLength; } PQfreemem(receiveBuffer); - receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous); } - /* we now check the last received length returned by copy data */ if (receiveLength == 0) { /* we cannot read more data without blocking */ - copyStatus = CLIENT_COPY_MORE; + return CLIENT_COPY_MORE; } else if (receiveLength == -1) { @@ -572,12 +626,9 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor, uint64 *returnByte bool raiseInterrupts = true; PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); ExecStatusType resultStatus = PQresultStatus(result); + CopyStatus copyStatus = CLIENT_COPY_DONE; - if (resultStatus == PGRES_COMMAND_OK) - { - copyStatus = CLIENT_COPY_DONE; - } - else + if (resultStatus != PGRES_COMMAND_OK) { copyStatus = CLIENT_COPY_FAILED; @@ -585,22 +636,17 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor, uint64 *returnByte } PQclear(result); - } - else if (receiveLength == -2) - { - /* received an error */ - copyStatus = CLIENT_COPY_FAILED; - - ReportConnectionError(connection, WARNING); - } - - /* if copy out completed, make sure we drain all results from libpq */ - if (receiveLength < 0) - { ForgetResults(connection); - } - return copyStatus; + return copyStatus; + } + else + { + Assert(receiveLength == -2); + ReportConnectionError(connection, WARNING); + + return CLIENT_COPY_FAILED; + } } diff --git a/src/include/distributed/multi_client_executor.h b/src/include/distributed/multi_client_executor.h index 764b168cb..ab3220f54 100644 --- a/src/include/distributed/multi_client_executor.h +++ b/src/include/distributed/multi_client_executor.h @@ -16,6 +16,7 @@ #include "distributed/connection_management.h" +#include "distributed/version_compat.h" #include "nodes/pg_list.h" #ifdef HAVE_POLL_H @@ -123,6 +124,8 @@ extern ResultStatus MultiClientResultStatus(int32 connectionId); extern QueryStatus MultiClientQueryStatus(int32 connectionId); extern CopyStatus MultiClientCopyData(int32 connectionId, int32 fileDescriptor, uint64 *returnBytesReceived); +extern CopyStatus CopyDataFromConnection(MultiConnection *connection, + FileCompat *fileCompat, uint64 *bytesReceived); extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryResult, int *rowCount, int *columnCount); extern char * MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex); diff --git a/src/test/regress/expected/multi_sql_function.out.modified b/src/test/regress/expected/multi_sql_function.out.modified new file mode 100644 index 000000000..e69de29bb