mirror of https://github.com/citusdata/citus.git
Move copy related functions to multi_copy.c
parent
d5e003768e
commit
6d4e41b8bb
|
@ -261,6 +261,14 @@ static void CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConne
|
|||
static List * ConnectionList(HTAB *connectionHash);
|
||||
static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
|
||||
static void ReportCopyError(PGconn *connection, PGresult *result);
|
||||
static void CopySendData(OutputCopyState outputState, const void *databuf, int datasize);
|
||||
static void CopySendString(OutputCopyState outputState, const char *str);
|
||||
static void CopySendChar(OutputCopyState outputState, char c);
|
||||
static void CopySendInt32(OutputCopyState outputState, int32 val);
|
||||
static void CopySendInt16(OutputCopyState outputState, int16 val);
|
||||
static void CopyAttributeOutText(OutputCopyState outputState, char *string);
|
||||
static inline void CopyFlushOutput(OutputCopyState outputState, char *start,
|
||||
char *pointer);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -1060,3 +1068,266 @@ ReportCopyError(PGconn *connection, PGresult *result)
|
|||
ereport(ERROR, (errmsg("%s", remoteMessage)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* OutputRow serializes one row using the column output functions,
|
||||
* and appends the data to the row output state object's message buffer.
|
||||
* This function is modeled after the CopyOneRowTo() function in
|
||||
* commands/copy.c, but only implements a subset of that functionality.
|
||||
*/
|
||||
void
|
||||
OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
||||
OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions)
|
||||
{
|
||||
MemoryContext oldContext = NULL;
|
||||
uint32 columnIndex = 0;
|
||||
uint32 columnCount = 0;
|
||||
|
||||
/* reset previous tuple's output data, and the temporary memory context */
|
||||
resetStringInfo(rowOutputState->fe_msgbuf);
|
||||
MemoryContextReset(rowOutputState->rowcontext);
|
||||
|
||||
oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext);
|
||||
|
||||
if (rowOutputState->binary)
|
||||
{
|
||||
CopySendInt16(rowOutputState, rowDescriptor->natts);
|
||||
}
|
||||
|
||||
columnCount = (uint32) rowDescriptor->natts;
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
{
|
||||
Datum value = valueArray[columnIndex];
|
||||
bool isNull = isNullArray[columnIndex];
|
||||
bool lastColumn = false;
|
||||
|
||||
if (rowOutputState->binary)
|
||||
{
|
||||
if (!isNull)
|
||||
{
|
||||
FmgrInfo *outputFunctionPointer = &columnOutputFunctions[columnIndex];
|
||||
bytea *outputBytes = SendFunctionCall(outputFunctionPointer, value);
|
||||
|
||||
CopySendInt32(rowOutputState, VARSIZE(outputBytes) - VARHDRSZ);
|
||||
CopySendData(rowOutputState, VARDATA(outputBytes),
|
||||
VARSIZE(outputBytes) - VARHDRSZ);
|
||||
}
|
||||
else
|
||||
{
|
||||
CopySendInt32(rowOutputState, -1);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!isNull)
|
||||
{
|
||||
FmgrInfo *outputFunctionPointer = &columnOutputFunctions[columnIndex];
|
||||
char *columnText = OutputFunctionCall(outputFunctionPointer, value);
|
||||
|
||||
CopyAttributeOutText(rowOutputState, columnText);
|
||||
}
|
||||
else
|
||||
{
|
||||
CopySendString(rowOutputState, rowOutputState->null_print_client);
|
||||
}
|
||||
|
||||
lastColumn = ((columnIndex + 1) == columnCount);
|
||||
if (!lastColumn)
|
||||
{
|
||||
CopySendChar(rowOutputState, rowOutputState->delim[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!rowOutputState->binary)
|
||||
{
|
||||
/* append default line termination string depending on the platform */
|
||||
#ifndef WIN32
|
||||
CopySendChar(rowOutputState, '\n');
|
||||
#else
|
||||
CopySendString(rowOutputState, "\r\n");
|
||||
#endif
|
||||
}
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
}
|
||||
|
||||
|
||||
/* Append binary headers to the copy buffer in headerOutputState. */
|
||||
void
|
||||
CopySendBinaryHeaders(OutputCopyState headerOutputState)
|
||||
{
|
||||
const int32 zero = 0;
|
||||
|
||||
/* Signature */
|
||||
CopySendData(headerOutputState, BinarySignature, 11);
|
||||
|
||||
/* Flags field (no OIDs) */
|
||||
CopySendInt32(headerOutputState, zero);
|
||||
|
||||
/* No header extension */
|
||||
CopySendInt32(headerOutputState, zero);
|
||||
}
|
||||
|
||||
|
||||
/* Append binary footers to the copy buffer in footerOutputState. */
|
||||
void
|
||||
CopySendBinaryFooters(OutputCopyState footerOutputState)
|
||||
{
|
||||
int16 negative = -1;
|
||||
|
||||
CopySendInt16(footerOutputState, negative);
|
||||
}
|
||||
|
||||
|
||||
/* *INDENT-OFF* */
|
||||
/* Append data to the copy buffer in outputState */
|
||||
static void
|
||||
CopySendData(OutputCopyState outputState, const void *databuf, int datasize)
|
||||
{
|
||||
appendBinaryStringInfo(outputState->fe_msgbuf, databuf, datasize);
|
||||
}
|
||||
|
||||
|
||||
/* Append a striong to the copy buffer in outputState. */
|
||||
static void
|
||||
CopySendString(OutputCopyState outputState, const char *str)
|
||||
{
|
||||
appendBinaryStringInfo(outputState->fe_msgbuf, str, strlen(str));
|
||||
}
|
||||
|
||||
|
||||
/* Append a char to the copy buffer in outputState. */
|
||||
static void
|
||||
CopySendChar(OutputCopyState outputState, char c)
|
||||
{
|
||||
appendStringInfoCharMacro(outputState->fe_msgbuf, c);
|
||||
}
|
||||
|
||||
|
||||
/* Append an int32 to the copy buffer in outputState. */
|
||||
static void
|
||||
CopySendInt32(OutputCopyState outputState, int32 val)
|
||||
{
|
||||
uint32 buf = htonl((uint32) val);
|
||||
CopySendData(outputState, &buf, sizeof(buf));
|
||||
}
|
||||
|
||||
|
||||
/* Append an int16 to the copy buffer in outputState. */
|
||||
static void
|
||||
CopySendInt16(OutputCopyState outputState, int16 val)
|
||||
{
|
||||
uint16 buf = htons((uint16) val);
|
||||
CopySendData(outputState, &buf, sizeof(buf));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Send text representation of one column, with conversion and escaping.
|
||||
*
|
||||
* NB: This function is based on commands/copy.c and doesn't fully conform to
|
||||
* our coding style. The function should be kept in sync with copy.c.
|
||||
*/
|
||||
static void
|
||||
CopyAttributeOutText(OutputCopyState cstate, char *string)
|
||||
{
|
||||
char *pointer = NULL;
|
||||
char *start = NULL;
|
||||
char c = '\0';
|
||||
char delimc = cstate->delim[0];
|
||||
|
||||
if (cstate->need_transcoding)
|
||||
{
|
||||
pointer = pg_server_to_any(string, strlen(string), cstate->file_encoding);
|
||||
}
|
||||
else
|
||||
{
|
||||
pointer = string;
|
||||
}
|
||||
|
||||
/*
|
||||
* We have to grovel through the string searching for control characters
|
||||
* and instances of the delimiter character. In most cases, though, these
|
||||
* are infrequent. To avoid overhead from calling CopySendData once per
|
||||
* character, we dump out all characters between escaped characters in a
|
||||
* single call. The loop invariant is that the data from "start" to "pointer"
|
||||
* can be sent literally, but hasn't yet been.
|
||||
*
|
||||
* As all encodings here are safe, i.e. backend supported ones, we can
|
||||
* skip doing pg_encoding_mblen(), because in valid backend encodings,
|
||||
* extra bytes of a multibyte character never look like ASCII.
|
||||
*/
|
||||
start = pointer;
|
||||
while ((c = *pointer) != '\0')
|
||||
{
|
||||
if ((unsigned char) c < (unsigned char) 0x20)
|
||||
{
|
||||
/*
|
||||
* \r and \n must be escaped, the others are traditional. We
|
||||
* prefer to dump these using the C-like notation, rather than
|
||||
* a backslash and the literal character, because it makes the
|
||||
* dump file a bit more proof against Microsoftish data
|
||||
* mangling.
|
||||
*/
|
||||
switch (c)
|
||||
{
|
||||
case '\b':
|
||||
c = 'b';
|
||||
break;
|
||||
case '\f':
|
||||
c = 'f';
|
||||
break;
|
||||
case '\n':
|
||||
c = 'n';
|
||||
break;
|
||||
case '\r':
|
||||
c = 'r';
|
||||
break;
|
||||
case '\t':
|
||||
c = 't';
|
||||
break;
|
||||
case '\v':
|
||||
c = 'v';
|
||||
break;
|
||||
default:
|
||||
/* If it's the delimiter, must backslash it */
|
||||
if (c == delimc)
|
||||
break;
|
||||
/* All ASCII control chars are length 1 */
|
||||
pointer++;
|
||||
continue; /* fall to end of loop */
|
||||
}
|
||||
/* if we get here, we need to convert the control char */
|
||||
CopyFlushOutput(cstate, start, pointer);
|
||||
CopySendChar(cstate, '\\');
|
||||
CopySendChar(cstate, c);
|
||||
start = ++pointer; /* do not include char in next run */
|
||||
}
|
||||
else if (c == '\\' || c == delimc)
|
||||
{
|
||||
CopyFlushOutput(cstate, start, pointer);
|
||||
CopySendChar(cstate, '\\');
|
||||
start = pointer++; /* we include char in next run */
|
||||
}
|
||||
else
|
||||
{
|
||||
pointer++;
|
||||
}
|
||||
}
|
||||
|
||||
CopyFlushOutput(cstate, start, pointer);
|
||||
}
|
||||
|
||||
|
||||
/* *INDENT-ON* */
|
||||
/* Helper function to send pending copy output */
|
||||
static inline void
|
||||
CopyFlushOutput(OutputCopyState cstate, char *start, char *pointer)
|
||||
{
|
||||
if (pointer > start)
|
||||
{
|
||||
CopySendData(cstate, start, pointer - start);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
#include "catalog/pg_collation.h"
|
||||
#include "commands/copy.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/transmit.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
|
@ -45,7 +46,6 @@ bool BinaryWorkerCopyFormat = false; /* binary format for copying between work
|
|||
int PartitionBufferSize = 16384; /* total partitioning buffer size in KB */
|
||||
|
||||
/* Local variables */
|
||||
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
|
||||
static uint32 FileBufferSizeInBytes = 0; /* file buffer size to init later */
|
||||
|
||||
|
||||
|
@ -67,20 +67,8 @@ static int ColumnIndex(TupleDesc rowDescriptor, const char *columnName);
|
|||
static FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
||||
static OutputCopyState InitRowOutputState(void);
|
||||
static void ClearRowOutputState(OutputCopyState copyState);
|
||||
static void OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
||||
OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions);
|
||||
static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount);
|
||||
static void CopySendBinaryHeaders(OutputCopyState headerOutputState);
|
||||
static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount);
|
||||
static void CopySendBinaryFooters(OutputCopyState footerOutputState);
|
||||
static void CopySendData(OutputCopyState outputState, const void *databuf, int datasize);
|
||||
static void CopySendString(OutputCopyState outputState, const char *str);
|
||||
static void CopySendChar(OutputCopyState outputState, char c);
|
||||
static void CopySendInt32(OutputCopyState outputState, int32 val);
|
||||
static void CopySendInt16(OutputCopyState outputState, int16 val);
|
||||
static void CopyAttributeOutText(OutputCopyState outputState, char *string);
|
||||
static inline void CopyFlushOutput(OutputCopyState outputState,
|
||||
char *start, char *pointer);
|
||||
static uint32 RangePartitionId(Datum partitionValue, const void *context);
|
||||
static uint32 HashPartitionId(Datum partitionValue, const void *context);
|
||||
|
||||
|
@ -1004,90 +992,6 @@ ClearRowOutputState(OutputCopyState rowOutputState)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* OutputRow serializes one row using the column output functions,
|
||||
* and appends the data to the row output state object's message buffer.
|
||||
* This function is modeled after the CopyOneRowTo() function in
|
||||
* commands/copy.c, but only implements a subset of that functionality.
|
||||
*/
|
||||
static void
|
||||
OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
||||
OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions)
|
||||
{
|
||||
MemoryContext oldContext = NULL;
|
||||
uint32 columnIndex = 0;
|
||||
uint32 columnCount = 0;
|
||||
|
||||
/* reset previous tuple's output data, and the temporary memory context */
|
||||
resetStringInfo(rowOutputState->fe_msgbuf);
|
||||
MemoryContextReset(rowOutputState->rowcontext);
|
||||
|
||||
oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext);
|
||||
|
||||
if (rowOutputState->binary)
|
||||
{
|
||||
CopySendInt16(rowOutputState, rowDescriptor->natts);
|
||||
}
|
||||
|
||||
columnCount = (uint32) rowDescriptor->natts;
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
{
|
||||
Datum value = valueArray[columnIndex];
|
||||
bool isNull = isNullArray[columnIndex];
|
||||
bool lastColumn = false;
|
||||
|
||||
if (rowOutputState->binary)
|
||||
{
|
||||
if (!isNull)
|
||||
{
|
||||
FmgrInfo *outputFunctionPointer = &columnOutputFunctions[columnIndex];
|
||||
bytea *outputBytes = SendFunctionCall(outputFunctionPointer, value);
|
||||
|
||||
CopySendInt32(rowOutputState, VARSIZE(outputBytes) - VARHDRSZ);
|
||||
CopySendData(rowOutputState, VARDATA(outputBytes),
|
||||
VARSIZE(outputBytes) - VARHDRSZ);
|
||||
}
|
||||
else
|
||||
{
|
||||
CopySendInt32(rowOutputState, -1);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!isNull)
|
||||
{
|
||||
FmgrInfo *outputFunctionPointer = &columnOutputFunctions[columnIndex];
|
||||
char *columnText = OutputFunctionCall(outputFunctionPointer, value);
|
||||
|
||||
CopyAttributeOutText(rowOutputState, columnText);
|
||||
}
|
||||
else
|
||||
{
|
||||
CopySendString(rowOutputState, rowOutputState->null_print_client);
|
||||
}
|
||||
|
||||
lastColumn = ((columnIndex + 1) == columnCount);
|
||||
if (!lastColumn)
|
||||
{
|
||||
CopySendChar(rowOutputState, rowOutputState->delim[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!rowOutputState->binary)
|
||||
{
|
||||
/* append default line termination string depending on the platform */
|
||||
#ifndef WIN32
|
||||
CopySendChar(rowOutputState, '\n');
|
||||
#else
|
||||
CopySendString(rowOutputState, "\r\n");
|
||||
#endif
|
||||
}
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Write the header of postgres' binary serialization format to each partition file.
|
||||
* This function is used when binary_worker_copy_format is enabled.
|
||||
|
@ -1114,23 +1018,6 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount)
|
|||
}
|
||||
|
||||
|
||||
/* Append binary headers to the copy buffer in headerOutputState. */
|
||||
static void
|
||||
CopySendBinaryHeaders(OutputCopyState headerOutputState)
|
||||
{
|
||||
const int32 zero = 0;
|
||||
|
||||
/* Signature */
|
||||
CopySendData(headerOutputState, BinarySignature, 11);
|
||||
|
||||
/* Flags field (no OIDs) */
|
||||
CopySendInt32(headerOutputState, zero);
|
||||
|
||||
/* No header extension */
|
||||
CopySendInt32(headerOutputState, zero);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Write the footer of postgres' binary serialization format to each partition file.
|
||||
* This function is used when binary_worker_copy_format is enabled.
|
||||
|
@ -1157,168 +1044,6 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount)
|
|||
}
|
||||
|
||||
|
||||
/* Append binary footers to the copy buffer in footerOutputState. */
|
||||
static void
|
||||
CopySendBinaryFooters(OutputCopyState footerOutputState)
|
||||
{
|
||||
int16 negative = -1;
|
||||
|
||||
CopySendInt16(footerOutputState, negative);
|
||||
}
|
||||
|
||||
|
||||
/* *INDENT-OFF* */
|
||||
/* Append data to the copy buffer in outputState */
|
||||
static void
|
||||
CopySendData(OutputCopyState outputState, const void *databuf, int datasize)
|
||||
{
|
||||
appendBinaryStringInfo(outputState->fe_msgbuf, databuf, datasize);
|
||||
}
|
||||
|
||||
|
||||
/* Append a striong to the copy buffer in outputState. */
|
||||
static void
|
||||
CopySendString(OutputCopyState outputState, const char *str)
|
||||
{
|
||||
appendBinaryStringInfo(outputState->fe_msgbuf, str, strlen(str));
|
||||
}
|
||||
|
||||
|
||||
/* Append a char to the copy buffer in outputState. */
|
||||
static void
|
||||
CopySendChar(OutputCopyState outputState, char c)
|
||||
{
|
||||
appendStringInfoCharMacro(outputState->fe_msgbuf, c);
|
||||
}
|
||||
|
||||
|
||||
/* Append an int32 to the copy buffer in outputState. */
|
||||
static void
|
||||
CopySendInt32(OutputCopyState outputState, int32 val)
|
||||
{
|
||||
uint32 buf = htonl((uint32) val);
|
||||
CopySendData(outputState, &buf, sizeof(buf));
|
||||
}
|
||||
|
||||
|
||||
/* Append an int16 to the copy buffer in outputState. */
|
||||
static void
|
||||
CopySendInt16(OutputCopyState outputState, int16 val)
|
||||
{
|
||||
uint16 buf = htons((uint16) val);
|
||||
CopySendData(outputState, &buf, sizeof(buf));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Send text representation of one column, with conversion and escaping.
|
||||
*
|
||||
* NB: This function is based on commands/copy.c and doesn't fully conform to
|
||||
* our coding style. The function should be kept in sync with copy.c.
|
||||
*/
|
||||
static void
|
||||
CopyAttributeOutText(OutputCopyState cstate, char *string)
|
||||
{
|
||||
char *pointer = NULL;
|
||||
char *start = NULL;
|
||||
char c = '\0';
|
||||
char delimc = cstate->delim[0];
|
||||
|
||||
if (cstate->need_transcoding)
|
||||
{
|
||||
pointer = pg_server_to_any(string, strlen(string), cstate->file_encoding);
|
||||
}
|
||||
else
|
||||
{
|
||||
pointer = string;
|
||||
}
|
||||
|
||||
/*
|
||||
* We have to grovel through the string searching for control characters
|
||||
* and instances of the delimiter character. In most cases, though, these
|
||||
* are infrequent. To avoid overhead from calling CopySendData once per
|
||||
* character, we dump out all characters between escaped characters in a
|
||||
* single call. The loop invariant is that the data from "start" to "pointer"
|
||||
* can be sent literally, but hasn't yet been.
|
||||
*
|
||||
* As all encodings here are safe, i.e. backend supported ones, we can
|
||||
* skip doing pg_encoding_mblen(), because in valid backend encodings,
|
||||
* extra bytes of a multibyte character never look like ASCII.
|
||||
*/
|
||||
start = pointer;
|
||||
while ((c = *pointer) != '\0')
|
||||
{
|
||||
if ((unsigned char) c < (unsigned char) 0x20)
|
||||
{
|
||||
/*
|
||||
* \r and \n must be escaped, the others are traditional. We
|
||||
* prefer to dump these using the C-like notation, rather than
|
||||
* a backslash and the literal character, because it makes the
|
||||
* dump file a bit more proof against Microsoftish data
|
||||
* mangling.
|
||||
*/
|
||||
switch (c)
|
||||
{
|
||||
case '\b':
|
||||
c = 'b';
|
||||
break;
|
||||
case '\f':
|
||||
c = 'f';
|
||||
break;
|
||||
case '\n':
|
||||
c = 'n';
|
||||
break;
|
||||
case '\r':
|
||||
c = 'r';
|
||||
break;
|
||||
case '\t':
|
||||
c = 't';
|
||||
break;
|
||||
case '\v':
|
||||
c = 'v';
|
||||
break;
|
||||
default:
|
||||
/* If it's the delimiter, must backslash it */
|
||||
if (c == delimc)
|
||||
break;
|
||||
/* All ASCII control chars are length 1 */
|
||||
pointer++;
|
||||
continue; /* fall to end of loop */
|
||||
}
|
||||
/* if we get here, we need to convert the control char */
|
||||
CopyFlushOutput(cstate, start, pointer);
|
||||
CopySendChar(cstate, '\\');
|
||||
CopySendChar(cstate, c);
|
||||
start = ++pointer; /* do not include char in next run */
|
||||
}
|
||||
else if (c == '\\' || c == delimc)
|
||||
{
|
||||
CopyFlushOutput(cstate, start, pointer);
|
||||
CopySendChar(cstate, '\\');
|
||||
start = pointer++; /* we include char in next run */
|
||||
}
|
||||
else
|
||||
{
|
||||
pointer++;
|
||||
}
|
||||
}
|
||||
|
||||
CopyFlushOutput(cstate, start, pointer);
|
||||
}
|
||||
|
||||
|
||||
/* *INDENT-ON* */
|
||||
/* Helper function to send pending copy output */
|
||||
static inline void
|
||||
CopyFlushOutput(OutputCopyState cstate, char *start, char *pointer)
|
||||
{
|
||||
if (pointer > start)
|
||||
{
|
||||
CopySendData(cstate, start, pointer - start);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Helper function that invokes a function with the default collation oid. */
|
||||
Datum
|
||||
CompareCall2(FmgrInfo *functionInfo, Datum leftArgument, Datum rightArgument)
|
||||
|
|
|
@ -20,6 +20,34 @@
|
|||
extern int CopyTransactionManager;
|
||||
|
||||
|
||||
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
|
||||
|
||||
/*
|
||||
* A smaller version of copy.c's CopyStateData, trimmed to the elements
|
||||
* necessary to copy out results. While it'd be a bit nicer to share code,
|
||||
* it'd require changing core postgres code.
|
||||
*/
|
||||
typedef struct OutputCopyStateData
|
||||
{
|
||||
StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
|
||||
* dest == COPY_NEW_FE in COPY FROM */
|
||||
int file_encoding; /* file or remote side's character encoding */
|
||||
bool need_transcoding; /* file encoding diff from server? */
|
||||
bool binary; /* binary format? */
|
||||
char *null_print; /* NULL marker string (server encoding!) */
|
||||
char *null_print_client; /* same converted to file encoding */
|
||||
char *delim; /* column delimiter (must be 1 byte) */
|
||||
|
||||
MemoryContext rowcontext; /* per-row evaluation context */
|
||||
} OutputCopyStateData;
|
||||
|
||||
typedef struct OutputCopyStateData *OutputCopyState;
|
||||
|
||||
extern void OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
||||
OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions);
|
||||
extern void CopySendBinaryHeaders(OutputCopyState headerOutputState);
|
||||
extern void CopySendBinaryFooters(OutputCopyState footerOutputState);
|
||||
|
||||
/* function declarations for copying into a distributed table */
|
||||
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
||||
|
||||
|
|
|
@ -79,28 +79,6 @@ typedef struct HashPartitionContext
|
|||
} HashPartitionContext;
|
||||
|
||||
|
||||
/*
|
||||
* A smaller version of copy.c's CopyStateData, trimmed to the elements
|
||||
* necessary to copy out results. While it'd be a bit nicer to share code,
|
||||
* it'd require changing core postgres code.
|
||||
*/
|
||||
typedef struct OutputCopyStateData
|
||||
{
|
||||
StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
|
||||
* dest == COPY_NEW_FE in COPY FROM */
|
||||
int file_encoding; /* file or remote side's character encoding */
|
||||
bool need_transcoding; /* file encoding diff from server? */
|
||||
bool binary; /* binary format? */
|
||||
char *null_print; /* NULL marker string (server encoding!) */
|
||||
char *null_print_client; /* same converted to file encoding */
|
||||
char *delim; /* column delimiter (must be 1 byte) */
|
||||
|
||||
MemoryContext rowcontext; /* per-row evaluation context */
|
||||
} OutputCopyStateData;
|
||||
|
||||
typedef struct OutputCopyStateData *OutputCopyState;
|
||||
|
||||
|
||||
/*
|
||||
* FileOutputStream helps buffer write operations to a file; these writes are
|
||||
* then regularly flushed to the underlying file. This structure differs from
|
||||
|
|
Loading…
Reference in New Issue