From 6d4e41b8bb6c65ff05fe06f899e556ba71e6de19 Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Wed, 16 Mar 2016 16:35:39 -0700 Subject: [PATCH] Move copy related functions to multi_copy.c --- src/backend/distributed/commands/multi_copy.c | 271 +++++++++++++++++ .../worker/worker_partition_protocol.c | 277 +----------------- src/include/distributed/multi_copy.h | 28 ++ src/include/distributed/worker_protocol.h | 22 -- 4 files changed, 300 insertions(+), 298 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 2e6c0ee14..e0a8f9175 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -261,6 +261,14 @@ static void CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConne static List * ConnectionList(HTAB *connectionHash); static void EndRemoteCopy(List *connectionList, bool stopOnFailure); static void ReportCopyError(PGconn *connection, PGresult *result); +static void CopySendData(OutputCopyState outputState, const void *databuf, int datasize); +static void CopySendString(OutputCopyState outputState, const char *str); +static void CopySendChar(OutputCopyState outputState, char c); +static void CopySendInt32(OutputCopyState outputState, int32 val); +static void CopySendInt16(OutputCopyState outputState, int16 val); +static void CopyAttributeOutText(OutputCopyState outputState, char *string); +static inline void CopyFlushOutput(OutputCopyState outputState, char *start, + char *pointer); /* @@ -1060,3 +1068,266 @@ ReportCopyError(PGconn *connection, PGresult *result) ereport(ERROR, (errmsg("%s", remoteMessage))); } } + + +/* + * OutputRow serializes one row using the column output functions, + * and appends the data to the row output state object's message buffer. + * This function is modeled after the CopyOneRowTo() function in + * commands/copy.c, but only implements a subset of that functionality. + */ +void +OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, + OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions) +{ + MemoryContext oldContext = NULL; + uint32 columnIndex = 0; + uint32 columnCount = 0; + + /* reset previous tuple's output data, and the temporary memory context */ + resetStringInfo(rowOutputState->fe_msgbuf); + MemoryContextReset(rowOutputState->rowcontext); + + oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext); + + if (rowOutputState->binary) + { + CopySendInt16(rowOutputState, rowDescriptor->natts); + } + + columnCount = (uint32) rowDescriptor->natts; + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + Datum value = valueArray[columnIndex]; + bool isNull = isNullArray[columnIndex]; + bool lastColumn = false; + + if (rowOutputState->binary) + { + if (!isNull) + { + FmgrInfo *outputFunctionPointer = &columnOutputFunctions[columnIndex]; + bytea *outputBytes = SendFunctionCall(outputFunctionPointer, value); + + CopySendInt32(rowOutputState, VARSIZE(outputBytes) - VARHDRSZ); + CopySendData(rowOutputState, VARDATA(outputBytes), + VARSIZE(outputBytes) - VARHDRSZ); + } + else + { + CopySendInt32(rowOutputState, -1); + } + } + else + { + if (!isNull) + { + FmgrInfo *outputFunctionPointer = &columnOutputFunctions[columnIndex]; + char *columnText = OutputFunctionCall(outputFunctionPointer, value); + + CopyAttributeOutText(rowOutputState, columnText); + } + else + { + CopySendString(rowOutputState, rowOutputState->null_print_client); + } + + lastColumn = ((columnIndex + 1) == columnCount); + if (!lastColumn) + { + CopySendChar(rowOutputState, rowOutputState->delim[0]); + } + } + } + + if (!rowOutputState->binary) + { + /* append default line termination string depending on the platform */ +#ifndef WIN32 + CopySendChar(rowOutputState, '\n'); +#else + CopySendString(rowOutputState, "\r\n"); +#endif + } + + MemoryContextSwitchTo(oldContext); +} + + +/* Append binary headers to the copy buffer in headerOutputState. */ +void +CopySendBinaryHeaders(OutputCopyState headerOutputState) +{ + const int32 zero = 0; + + /* Signature */ + CopySendData(headerOutputState, BinarySignature, 11); + + /* Flags field (no OIDs) */ + CopySendInt32(headerOutputState, zero); + + /* No header extension */ + CopySendInt32(headerOutputState, zero); +} + + +/* Append binary footers to the copy buffer in footerOutputState. */ +void +CopySendBinaryFooters(OutputCopyState footerOutputState) +{ + int16 negative = -1; + + CopySendInt16(footerOutputState, negative); +} + + +/* *INDENT-OFF* */ +/* Append data to the copy buffer in outputState */ +static void +CopySendData(OutputCopyState outputState, const void *databuf, int datasize) +{ + appendBinaryStringInfo(outputState->fe_msgbuf, databuf, datasize); +} + + +/* Append a striong to the copy buffer in outputState. */ +static void +CopySendString(OutputCopyState outputState, const char *str) +{ + appendBinaryStringInfo(outputState->fe_msgbuf, str, strlen(str)); +} + + +/* Append a char to the copy buffer in outputState. */ +static void +CopySendChar(OutputCopyState outputState, char c) +{ + appendStringInfoCharMacro(outputState->fe_msgbuf, c); +} + + +/* Append an int32 to the copy buffer in outputState. */ +static void +CopySendInt32(OutputCopyState outputState, int32 val) +{ + uint32 buf = htonl((uint32) val); + CopySendData(outputState, &buf, sizeof(buf)); +} + + +/* Append an int16 to the copy buffer in outputState. */ +static void +CopySendInt16(OutputCopyState outputState, int16 val) +{ + uint16 buf = htons((uint16) val); + CopySendData(outputState, &buf, sizeof(buf)); +} + + +/* + * Send text representation of one column, with conversion and escaping. + * + * NB: This function is based on commands/copy.c and doesn't fully conform to + * our coding style. The function should be kept in sync with copy.c. + */ +static void +CopyAttributeOutText(OutputCopyState cstate, char *string) +{ + char *pointer = NULL; + char *start = NULL; + char c = '\0'; + char delimc = cstate->delim[0]; + + if (cstate->need_transcoding) + { + pointer = pg_server_to_any(string, strlen(string), cstate->file_encoding); + } + else + { + pointer = string; + } + + /* + * We have to grovel through the string searching for control characters + * and instances of the delimiter character. In most cases, though, these + * are infrequent. To avoid overhead from calling CopySendData once per + * character, we dump out all characters between escaped characters in a + * single call. The loop invariant is that the data from "start" to "pointer" + * can be sent literally, but hasn't yet been. + * + * As all encodings here are safe, i.e. backend supported ones, we can + * skip doing pg_encoding_mblen(), because in valid backend encodings, + * extra bytes of a multibyte character never look like ASCII. + */ + start = pointer; + while ((c = *pointer) != '\0') + { + if ((unsigned char) c < (unsigned char) 0x20) + { + /* + * \r and \n must be escaped, the others are traditional. We + * prefer to dump these using the C-like notation, rather than + * a backslash and the literal character, because it makes the + * dump file a bit more proof against Microsoftish data + * mangling. + */ + switch (c) + { + case '\b': + c = 'b'; + break; + case '\f': + c = 'f'; + break; + case '\n': + c = 'n'; + break; + case '\r': + c = 'r'; + break; + case '\t': + c = 't'; + break; + case '\v': + c = 'v'; + break; + default: + /* If it's the delimiter, must backslash it */ + if (c == delimc) + break; + /* All ASCII control chars are length 1 */ + pointer++; + continue; /* fall to end of loop */ + } + /* if we get here, we need to convert the control char */ + CopyFlushOutput(cstate, start, pointer); + CopySendChar(cstate, '\\'); + CopySendChar(cstate, c); + start = ++pointer; /* do not include char in next run */ + } + else if (c == '\\' || c == delimc) + { + CopyFlushOutput(cstate, start, pointer); + CopySendChar(cstate, '\\'); + start = pointer++; /* we include char in next run */ + } + else + { + pointer++; + } + } + + CopyFlushOutput(cstate, start, pointer); +} + + +/* *INDENT-ON* */ +/* Helper function to send pending copy output */ +static inline void +CopyFlushOutput(OutputCopyState cstate, char *start, char *pointer) +{ + if (pointer > start) + { + CopySendData(cstate, start, pointer - start); + } +} diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index c5a7065dc..4c8a7f47e 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -29,6 +29,7 @@ #include "catalog/pg_collation.h" #include "commands/copy.h" #include "commands/defrem.h" +#include "distributed/multi_copy.h" #include "distributed/resource_lock.h" #include "distributed/transmit.h" #include "distributed/worker_protocol.h" @@ -45,7 +46,6 @@ bool BinaryWorkerCopyFormat = false; /* binary format for copying between work int PartitionBufferSize = 16384; /* total partitioning buffer size in KB */ /* Local variables */ -static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; static uint32 FileBufferSizeInBytes = 0; /* file buffer size to init later */ @@ -67,20 +67,8 @@ static int ColumnIndex(TupleDesc rowDescriptor, const char *columnName); static FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); static OutputCopyState InitRowOutputState(void); static void ClearRowOutputState(OutputCopyState copyState); -static void OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, - OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions); static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount); -static void CopySendBinaryHeaders(OutputCopyState headerOutputState); static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount); -static void CopySendBinaryFooters(OutputCopyState footerOutputState); -static void CopySendData(OutputCopyState outputState, const void *databuf, int datasize); -static void CopySendString(OutputCopyState outputState, const char *str); -static void CopySendChar(OutputCopyState outputState, char c); -static void CopySendInt32(OutputCopyState outputState, int32 val); -static void CopySendInt16(OutputCopyState outputState, int16 val); -static void CopyAttributeOutText(OutputCopyState outputState, char *string); -static inline void CopyFlushOutput(OutputCopyState outputState, - char *start, char *pointer); static uint32 RangePartitionId(Datum partitionValue, const void *context); static uint32 HashPartitionId(Datum partitionValue, const void *context); @@ -1004,90 +992,6 @@ ClearRowOutputState(OutputCopyState rowOutputState) } -/* - * OutputRow serializes one row using the column output functions, - * and appends the data to the row output state object's message buffer. - * This function is modeled after the CopyOneRowTo() function in - * commands/copy.c, but only implements a subset of that functionality. - */ -static void -OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, - OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions) -{ - MemoryContext oldContext = NULL; - uint32 columnIndex = 0; - uint32 columnCount = 0; - - /* reset previous tuple's output data, and the temporary memory context */ - resetStringInfo(rowOutputState->fe_msgbuf); - MemoryContextReset(rowOutputState->rowcontext); - - oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext); - - if (rowOutputState->binary) - { - CopySendInt16(rowOutputState, rowDescriptor->natts); - } - - columnCount = (uint32) rowDescriptor->natts; - for (columnIndex = 0; columnIndex < columnCount; columnIndex++) - { - Datum value = valueArray[columnIndex]; - bool isNull = isNullArray[columnIndex]; - bool lastColumn = false; - - if (rowOutputState->binary) - { - if (!isNull) - { - FmgrInfo *outputFunctionPointer = &columnOutputFunctions[columnIndex]; - bytea *outputBytes = SendFunctionCall(outputFunctionPointer, value); - - CopySendInt32(rowOutputState, VARSIZE(outputBytes) - VARHDRSZ); - CopySendData(rowOutputState, VARDATA(outputBytes), - VARSIZE(outputBytes) - VARHDRSZ); - } - else - { - CopySendInt32(rowOutputState, -1); - } - } - else - { - if (!isNull) - { - FmgrInfo *outputFunctionPointer = &columnOutputFunctions[columnIndex]; - char *columnText = OutputFunctionCall(outputFunctionPointer, value); - - CopyAttributeOutText(rowOutputState, columnText); - } - else - { - CopySendString(rowOutputState, rowOutputState->null_print_client); - } - - lastColumn = ((columnIndex + 1) == columnCount); - if (!lastColumn) - { - CopySendChar(rowOutputState, rowOutputState->delim[0]); - } - } - } - - if (!rowOutputState->binary) - { - /* append default line termination string depending on the platform */ -#ifndef WIN32 - CopySendChar(rowOutputState, '\n'); -#else - CopySendString(rowOutputState, "\r\n"); -#endif - } - - MemoryContextSwitchTo(oldContext); -} - - /* * Write the header of postgres' binary serialization format to each partition file. * This function is used when binary_worker_copy_format is enabled. @@ -1114,23 +1018,6 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount) } -/* Append binary headers to the copy buffer in headerOutputState. */ -static void -CopySendBinaryHeaders(OutputCopyState headerOutputState) -{ - const int32 zero = 0; - - /* Signature */ - CopySendData(headerOutputState, BinarySignature, 11); - - /* Flags field (no OIDs) */ - CopySendInt32(headerOutputState, zero); - - /* No header extension */ - CopySendInt32(headerOutputState, zero); -} - - /* * Write the footer of postgres' binary serialization format to each partition file. * This function is used when binary_worker_copy_format is enabled. @@ -1157,168 +1044,6 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount) } -/* Append binary footers to the copy buffer in footerOutputState. */ -static void -CopySendBinaryFooters(OutputCopyState footerOutputState) -{ - int16 negative = -1; - - CopySendInt16(footerOutputState, negative); -} - - -/* *INDENT-OFF* */ -/* Append data to the copy buffer in outputState */ -static void -CopySendData(OutputCopyState outputState, const void *databuf, int datasize) -{ - appendBinaryStringInfo(outputState->fe_msgbuf, databuf, datasize); -} - - -/* Append a striong to the copy buffer in outputState. */ -static void -CopySendString(OutputCopyState outputState, const char *str) -{ - appendBinaryStringInfo(outputState->fe_msgbuf, str, strlen(str)); -} - - -/* Append a char to the copy buffer in outputState. */ -static void -CopySendChar(OutputCopyState outputState, char c) -{ - appendStringInfoCharMacro(outputState->fe_msgbuf, c); -} - - -/* Append an int32 to the copy buffer in outputState. */ -static void -CopySendInt32(OutputCopyState outputState, int32 val) -{ - uint32 buf = htonl((uint32) val); - CopySendData(outputState, &buf, sizeof(buf)); -} - - -/* Append an int16 to the copy buffer in outputState. */ -static void -CopySendInt16(OutputCopyState outputState, int16 val) -{ - uint16 buf = htons((uint16) val); - CopySendData(outputState, &buf, sizeof(buf)); -} - - -/* - * Send text representation of one column, with conversion and escaping. - * - * NB: This function is based on commands/copy.c and doesn't fully conform to - * our coding style. The function should be kept in sync with copy.c. - */ -static void -CopyAttributeOutText(OutputCopyState cstate, char *string) -{ - char *pointer = NULL; - char *start = NULL; - char c = '\0'; - char delimc = cstate->delim[0]; - - if (cstate->need_transcoding) - { - pointer = pg_server_to_any(string, strlen(string), cstate->file_encoding); - } - else - { - pointer = string; - } - - /* - * We have to grovel through the string searching for control characters - * and instances of the delimiter character. In most cases, though, these - * are infrequent. To avoid overhead from calling CopySendData once per - * character, we dump out all characters between escaped characters in a - * single call. The loop invariant is that the data from "start" to "pointer" - * can be sent literally, but hasn't yet been. - * - * As all encodings here are safe, i.e. backend supported ones, we can - * skip doing pg_encoding_mblen(), because in valid backend encodings, - * extra bytes of a multibyte character never look like ASCII. - */ - start = pointer; - while ((c = *pointer) != '\0') - { - if ((unsigned char) c < (unsigned char) 0x20) - { - /* - * \r and \n must be escaped, the others are traditional. We - * prefer to dump these using the C-like notation, rather than - * a backslash and the literal character, because it makes the - * dump file a bit more proof against Microsoftish data - * mangling. - */ - switch (c) - { - case '\b': - c = 'b'; - break; - case '\f': - c = 'f'; - break; - case '\n': - c = 'n'; - break; - case '\r': - c = 'r'; - break; - case '\t': - c = 't'; - break; - case '\v': - c = 'v'; - break; - default: - /* If it's the delimiter, must backslash it */ - if (c == delimc) - break; - /* All ASCII control chars are length 1 */ - pointer++; - continue; /* fall to end of loop */ - } - /* if we get here, we need to convert the control char */ - CopyFlushOutput(cstate, start, pointer); - CopySendChar(cstate, '\\'); - CopySendChar(cstate, c); - start = ++pointer; /* do not include char in next run */ - } - else if (c == '\\' || c == delimc) - { - CopyFlushOutput(cstate, start, pointer); - CopySendChar(cstate, '\\'); - start = pointer++; /* we include char in next run */ - } - else - { - pointer++; - } - } - - CopyFlushOutput(cstate, start, pointer); -} - - -/* *INDENT-ON* */ -/* Helper function to send pending copy output */ -static inline void -CopyFlushOutput(OutputCopyState cstate, char *start, char *pointer) -{ - if (pointer > start) - { - CopySendData(cstate, start, pointer - start); - } -} - - /* Helper function that invokes a function with the default collation oid. */ Datum CompareCall2(FmgrInfo *functionInfo, Datum leftArgument, Datum rightArgument) diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 279b8c165..49c36a779 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -20,6 +20,34 @@ extern int CopyTransactionManager; +static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; + +/* + * A smaller version of copy.c's CopyStateData, trimmed to the elements + * necessary to copy out results. While it'd be a bit nicer to share code, + * it'd require changing core postgres code. + */ +typedef struct OutputCopyStateData +{ + StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for + * dest == COPY_NEW_FE in COPY FROM */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool binary; /* binary format? */ + char *null_print; /* NULL marker string (server encoding!) */ + char *null_print_client; /* same converted to file encoding */ + char *delim; /* column delimiter (must be 1 byte) */ + + MemoryContext rowcontext; /* per-row evaluation context */ +} OutputCopyStateData; + +typedef struct OutputCopyStateData *OutputCopyState; + +extern void OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, + OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions); +extern void CopySendBinaryHeaders(OutputCopyState headerOutputState); +extern void CopySendBinaryFooters(OutputCopyState footerOutputState); + /* function declarations for copying into a distributed table */ extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag); diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index 7b5328377..4231cf6f6 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -79,28 +79,6 @@ typedef struct HashPartitionContext } HashPartitionContext; -/* - * A smaller version of copy.c's CopyStateData, trimmed to the elements - * necessary to copy out results. While it'd be a bit nicer to share code, - * it'd require changing core postgres code. - */ -typedef struct OutputCopyStateData -{ - StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for - * dest == COPY_NEW_FE in COPY FROM */ - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - bool binary; /* binary format? */ - char *null_print; /* NULL marker string (server encoding!) */ - char *null_print_client; /* same converted to file encoding */ - char *delim; /* column delimiter (must be 1 byte) */ - - MemoryContext rowcontext; /* per-row evaluation context */ -} OutputCopyStateData; - -typedef struct OutputCopyStateData *OutputCopyState; - - /* * FileOutputStream helps buffer write operations to a file; these writes are * then regularly flushed to the underlying file. This structure differs from