Combine CopyDataFromConnection & MultiClientCopyData

merge-copy-data-from-conn-to-file-functions
Philip Dubé 2019-12-24 00:35:16 +00:00
parent 2349f838a1
commit cca1091486
4 changed files with 86 additions and 119 deletions

View File

@ -98,9 +98,6 @@ static void ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo,
Datum *resultIdArray, Datum *resultIdArray,
int resultCount); int resultCount);
static uint64 FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId); static uint64 FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId);
static CopyStatus CopyDataFromConnection(MultiConnection *connection,
FileCompat *fileCompat,
uint64 *bytesReceived);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(read_intermediate_result); PG_FUNCTION_INFO_V1(read_intermediate_result);
@ -933,82 +930,3 @@ FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId)
return totalBytesWritten; return totalBytesWritten;
} }
/*
* CopyDataFromConnection reads a row of copy data from connection and writes it
* to the given file.
*/
static CopyStatus
CopyDataFromConnection(MultiConnection *connection, FileCompat *fileCompat,
uint64 *bytesReceived)
{
/*
* Consume input to handle the case where previous copy operation might have
* received zero bytes.
*/
int consumed = PQconsumeInput(connection->pgConn);
if (consumed == 0)
{
return CLIENT_COPY_FAILED;
}
/* receive copy data message in an asynchronous manner */
char *receiveBuffer = NULL;
bool asynchronous = true;
int receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous);
while (receiveLength > 0)
{
/* received copy data; append these data to file */
errno = 0;
int bytesWritten = FileWriteCompat(fileCompat, receiveBuffer,
receiveLength, PG_WAIT_IO);
if (bytesWritten != receiveLength)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not append to file: %m")));
}
*bytesReceived += receiveLength;
PQfreemem(receiveBuffer);
receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous);
}
if (receiveLength == 0)
{
/* we cannot read more data without blocking */
return CLIENT_COPY_MORE;
}
else if (receiveLength == -1)
{
/* received copy done message */
bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
ExecStatusType resultStatus = PQresultStatus(result);
CopyStatus copyStatus = 0;
if (resultStatus == PGRES_COMMAND_OK)
{
copyStatus = CLIENT_COPY_DONE;
}
else
{
copyStatus = CLIENT_COPY_FAILED;
ReportResultError(connection, result, WARNING);
}
PQclear(result);
ForgetResults(connection);
return copyStatus;
}
else
{
Assert(receiveLength == -2);
ReportConnectionError(connection, WARNING);
return CLIENT_COPY_FAILED;
}
}

View File

@ -16,6 +16,7 @@
#include "fmgr.h" #include "fmgr.h"
#include "libpq-fe.h" #include "libpq-fe.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -46,6 +47,13 @@ static PostgresPollingStatusType ClientPollingStatusArray[MAX_CONNECTION_COUNT];
/* Local functions forward declarations */ /* Local functions forward declarations */
static int FileDescriptorWriteCallback(void *file, char *buffer, int length);
static int FileWriteCompatCallback(void *file, char *buffer, int length);
static CopyStatus CopyDataFromConnectionWithCallbackForWrite(MultiConnection *connection,
void *file,
int (*write)(void *, char *,
int),
uint64 *bytesReceived);
static bool ClientConnectionReady(MultiConnection *connection, static bool ClientConnectionReady(MultiConnection *connection,
PostgresPollingStatusType pollingStatus); PostgresPollingStatusType pollingStatus);
@ -508,18 +516,64 @@ MultiClientQueryStatus(int32 connectionId)
} }
/*
* FileDescriptorWriteCallback writes data to a file descriptor.
* For use with CopyDataFromConnectionWithWriteCallback.
*/
static int
FileDescriptorWriteCallback(void *file, char *buffer, int length)
{
return write(*(int32 *) file, buffer, length);
}
/* MultiClientCopyData copies data from the file. */ /* MultiClientCopyData copies data from the file. */
CopyStatus CopyStatus
MultiClientCopyData(int32 connectionId, int32 fileDescriptor, uint64 *returnBytesReceived) MultiClientCopyData(int32 connectionId, int32 fileDescriptor, uint64 *bytesReceived)
{ {
char *receiveBuffer = NULL;
const int asynchronous = 1;
CopyStatus copyStatus = CLIENT_INVALID_COPY;
Assert(connectionId != INVALID_CONNECTION_ID); Assert(connectionId != INVALID_CONNECTION_ID);
MultiConnection *connection = ClientConnectionArray[connectionId]; MultiConnection *connection = ClientConnectionArray[connectionId];
Assert(connection != NULL); Assert(connection != NULL);
return CopyDataFromConnectionWithCallbackForWrite(
connection, &fileDescriptor, FileDescriptorWriteCallback, bytesReceived);
}
/*
* FileWriteCompatCallback writes data to a FileCompat.
* For use with CopyDataFromConnectionWithWriteCallback.
*/
static int
FileWriteCompatCallback(void *file, char *buffer, int length)
{
return FileWriteCompat(file, buffer, length, PG_WAIT_IO);
}
/*
* CopyDataFromConnection reads a row of copy data from connection and writes it
* to the given file.
*/
CopyStatus
CopyDataFromConnection(MultiConnection *connection, FileCompat *fileCompat,
uint64 *bytesReceived)
{
return CopyDataFromConnectionWithCallbackForWrite(
connection, fileCompat, FileWriteCompatCallback, bytesReceived);
}
/*
* CopyDataFromConnection reads a row of copy data from connection and writes it
* to the given file.
*/
static CopyStatus
CopyDataFromConnectionWithCallbackForWrite(MultiConnection *connection,
void *file,
int (*writer)(void *, char *, int),
uint64 *bytesReceived)
{
/* /*
* Consume input to handle the case where previous copy operation might have * Consume input to handle the case where previous copy operation might have
* received zero bytes. * received zero bytes.
@ -527,44 +581,44 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor, uint64 *returnByte
int consumed = PQconsumeInput(connection->pgConn); int consumed = PQconsumeInput(connection->pgConn);
if (consumed == 0) if (consumed == 0)
{ {
ereport(WARNING, (errmsg("could not read data from worker node")));
return CLIENT_COPY_FAILED; return CLIENT_COPY_FAILED;
} }
/* receive copy data message in an asynchronous manner */ /* receive copy data message in an asynchronous manner */
char *receiveBuffer = NULL;
bool asynchronous = true;
int receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous); int receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous);
while (receiveLength > 0) while (receiveLength > 0)
{ {
/* received copy data; append these data to file */ /* received copy data; append these data to file */
errno = 0; errno = 0;
if (returnBytesReceived) int bytesWritten = writer(file, receiveBuffer, receiveLength);
{ if (bytesWritten != receiveLength)
*returnBytesReceived += receiveLength;
}
int appended = write(fileDescriptor, receiveBuffer, receiveLength);
if (appended != receiveLength)
{ {
/* if write didn't set errno, assume problem is no disk space */ /* if write didn't set errno, assume problem is no disk space */
if (errno == 0) if (errno == 0)
{ {
errno = ENOSPC; errno = ENOSPC;
} }
ereport(FATAL, (errcode_for_file_access(),
errmsg("could not append to copied file: %m"))); ereport(ERROR, (errcode_for_file_access(),
errmsg("could not append to file: %m")));
}
if (bytesReceived != NULL)
{
*bytesReceived += receiveLength;
} }
PQfreemem(receiveBuffer); PQfreemem(receiveBuffer);
receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous); receiveLength = PQgetCopyData(connection->pgConn, &receiveBuffer, asynchronous);
} }
/* we now check the last received length returned by copy data */
if (receiveLength == 0) if (receiveLength == 0)
{ {
/* we cannot read more data without blocking */ /* we cannot read more data without blocking */
copyStatus = CLIENT_COPY_MORE; return CLIENT_COPY_MORE;
} }
else if (receiveLength == -1) else if (receiveLength == -1)
{ {
@ -572,12 +626,9 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor, uint64 *returnByte
bool raiseInterrupts = true; bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
ExecStatusType resultStatus = PQresultStatus(result); ExecStatusType resultStatus = PQresultStatus(result);
CopyStatus copyStatus = CLIENT_COPY_DONE;
if (resultStatus == PGRES_COMMAND_OK) if (resultStatus != PGRES_COMMAND_OK)
{
copyStatus = CLIENT_COPY_DONE;
}
else
{ {
copyStatus = CLIENT_COPY_FAILED; copyStatus = CLIENT_COPY_FAILED;
@ -585,22 +636,17 @@ MultiClientCopyData(int32 connectionId, int32 fileDescriptor, uint64 *returnByte
} }
PQclear(result); PQclear(result);
}
else if (receiveLength == -2)
{
/* received an error */
copyStatus = CLIENT_COPY_FAILED;
ReportConnectionError(connection, WARNING);
}
/* if copy out completed, make sure we drain all results from libpq */
if (receiveLength < 0)
{
ForgetResults(connection); ForgetResults(connection);
}
return copyStatus; return copyStatus;
}
else
{
Assert(receiveLength == -2);
ReportConnectionError(connection, WARNING);
return CLIENT_COPY_FAILED;
}
} }

View File

@ -16,6 +16,7 @@
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/version_compat.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#ifdef HAVE_POLL_H #ifdef HAVE_POLL_H
@ -123,6 +124,8 @@ extern ResultStatus MultiClientResultStatus(int32 connectionId);
extern QueryStatus MultiClientQueryStatus(int32 connectionId); extern QueryStatus MultiClientQueryStatus(int32 connectionId);
extern CopyStatus MultiClientCopyData(int32 connectionId, int32 fileDescriptor, extern CopyStatus MultiClientCopyData(int32 connectionId, int32 fileDescriptor,
uint64 *returnBytesReceived); uint64 *returnBytesReceived);
extern CopyStatus CopyDataFromConnection(MultiConnection *connection,
FileCompat *fileCompat, uint64 *bytesReceived);
extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryResult, extern BatchQueryStatus MultiClientBatchResult(int32 connectionId, void **queryResult,
int *rowCount, int *columnCount); int *rowCount, int *columnCount);
extern char * MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex); extern char * MultiClientGetValue(void *queryResult, int rowIndex, int columnIndex);