diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index e0a8f9175..d30a6e636 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -97,136 +97,6 @@ int CopyTransactionManager = TRANSACTION_MANAGER_1PC; -/* Data structures from copy.c, to keep track of COPY processing state */ -typedef enum CopyDest -{ - COPY_FILE, /* to/from file (or a piped program) */ - COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ - COPY_NEW_FE /* to/from frontend (3.0 protocol) */ -} CopyDest; - -typedef enum EolType -{ - EOL_UNKNOWN, - EOL_NL, - EOL_CR, - EOL_CRNL -} EolType; - -typedef struct CopyStateData -{ - /* low-level state data */ - CopyDest copy_dest; /* type of copy source/destination */ - FILE *copy_file; /* used if copy_dest == COPY_FILE */ - StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for - * dest == COPY_NEW_FE in COPY FROM */ - bool fe_eof; /* true if detected end of copy data */ - EolType eol_type; /* EOL type of input */ - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy to or from */ - QueryDesc *queryDesc; /* executable query to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDIN/STDOUT */ - bool is_program; /* is 'filename' a program to popen? */ - bool binary; /* binary format? */ - bool oids; /* include OIDs? */ - bool freeze; /* freeze rows on loading? */ - bool csv_mode; /* Comma Separated Value format? */ - bool header_line; /* CSV header line? */ - char *null_print; /* NULL marker string (server encoding!) */ - int null_print_len; /* length of same */ - char *null_print_client; /* same converted to file encoding */ - char *delim; /* column delimiter (must be 1 byte) */ - char *quote; /* CSV quote char (must be 1 byte) */ - char *escape; /* CSV escape char (must be 1 byte) */ - List *force_quote; /* list of column names */ - bool force_quote_all; /* FORCE QUOTE *? */ - bool *force_quote_flags; /* per-column CSV FQ flags */ - List *force_notnull; /* list of column names */ - bool *force_notnull_flags; /* per-column CSV FNN flags */ -#if PG_VERSION_NUM >= 90400 - List *force_null; /* list of column names */ - bool *force_null_flags; /* per-column CSV FN flags */ -#endif - bool convert_selectively; /* do selective binary conversion? */ - List *convert_select; /* list of column names (can be NIL) */ - bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ - - /* these are just for error messages, see CopyFromErrorCallback */ - const char *cur_relname; /* table name for error messages */ - int cur_lineno; /* line number for error messages */ - const char *cur_attname; /* current att for error messages */ - const char *cur_attval; /* current att value for error messages */ - - /* - * Working state for COPY TO/FROM - */ - MemoryContext copycontext; /* per-copy execution context */ - - /* - * Working state for COPY TO - */ - FmgrInfo *out_functions; /* lookup info for output functions */ - MemoryContext rowcontext; /* per-row evaluation context */ - - /* - * Working state for COPY FROM - */ - AttrNumber num_defaults; - bool file_has_oids; - FmgrInfo oid_in_function; - Oid oid_typioparam; - FmgrInfo *in_functions; /* array of input functions for each attrs */ - Oid *typioparams; /* array of element types for in_functions */ - int *defmap; /* array of default att numbers */ - ExprState **defexprs; /* array of default att expressions */ - bool volatile_defexprs; /* is any of defexprs volatile? */ - List *range_table; - - /* - * These variables are used to reduce overhead in textual COPY FROM. - * - * attribute_buf holds the separated, de-escaped text for each field of - * the current line. The CopyReadAttributes functions return arrays of - * pointers into this buffer. We avoid palloc/pfree overhead by re-using - * the buffer on each cycle. - */ - StringInfoData attribute_buf; - - /* field raw data pointers found by COPY FROM */ - - int max_fields; - char **raw_fields; - - /* - * Similarly, line_buf holds the whole input line being processed. The - * input cycle is first to read the whole line into line_buf, convert it - * to server encoding there, and then extract the individual attribute - * fields into attribute_buf. line_buf is preserved unmodified so that we - * can display it in error messages if appropriate. - */ - StringInfoData line_buf; - bool line_buf_converted; /* converted to server encoding? */ - bool line_buf_valid; /* contains the row being processed? */ - - /* - * Finally, raw_buf holds raw data read from the data source (file or - * client connection). CopyReadLine parses this data sufficiently to - * locate line boundaries, then transfers the data to line_buf and - * converts it. Note: we guarantee that there is a \0 at - * raw_buf[raw_buf_len]. - */ -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ - char *raw_buf; - int raw_buf_index; /* next byte to process */ - int raw_buf_len; /* total # of bytes stored */ -} CopyStateData; - - /* ShardConnections represents a set of connections for each placement of a shard */ typedef struct ShardConnections { @@ -255,8 +125,6 @@ static void OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections, int64 shardId); static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId); -static void AppendColumnNames(StringInfo command, List *columnList); -static void AppendCopyOptions(StringInfo command, List *copyOptionList); static void CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections); static List * ConnectionList(HTAB *connectionHash); static void EndRemoteCopy(List *connectionList, bool stopOnFailure); @@ -288,6 +156,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) HTAB *shardConnectionHash = NULL; List *connectionList = NIL; MemoryContext tupleContext = NULL; + MemoryContext outputContext = NULL; CopyState copyState = NULL; TupleDesc tupleDescriptor = NULL; uint32 columnCount = 0; @@ -302,6 +171,9 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) int shardCount = 0; uint64 processedRowCount = 0; ErrorContextCallback errorCallback; + ShardConnections *shardConnections = NULL; + OutputCopyState rowOutputState = NULL; + FmgrInfo *columnOutputFunctions = NULL; /* disallow COPY to/from file or program except for superusers */ if (copyStatement->filename != NULL && !superuser()) @@ -404,12 +276,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) copyStatement->attlist, copyStatement->options); - if (copyState->binary) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Copy in binary mode is not currently supported"))); - } - /* set up callback to identify error line number */ errorCallback.callback = CopyFromErrorCallback; errorCallback.arg = (void *) copyState; @@ -429,6 +295,19 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); + outputContext = AllocSetContextCreate(CurrentMemoryContext, + "COPY Output Memory Context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, true); + + rowOutputState = (OutputCopyState) palloc0(sizeof(OutputCopyStateData)); + rowOutputState->binary = true; + rowOutputState->fe_msgbuf = makeStringInfo(); + rowOutputState->rowcontext = outputContext; + /* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */ PG_TRY(); { @@ -438,9 +317,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) Datum partitionColumnValue = 0; ShardInterval *shardInterval = NULL; int64 shardId = 0; - ShardConnections *shardConnections = NULL; bool found = false; - StringInfo lineBuf = NULL; MemoryContext oldContext = NULL; oldContext = MemoryContextSwitchTo(tupleContext); @@ -494,20 +371,28 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) shardConnections->connectionList = NIL; OpenCopyTransactions(copyStatement, shardConnections, shardId); + + CopySendBinaryHeaders(rowOutputState); + CopyRowToPlacements(rowOutputState->fe_msgbuf, shardConnections); } - /* get the (truncated) line buffer */ - lineBuf = ©State->line_buf; - lineBuf->data[lineBuf->len++] = '\n'; + OutputRow(columnValues, columnNulls, tupleDescriptor, rowOutputState, + columnOutputFunctions); /* Replicate row to all shard placements */ - CopyRowToPlacements(lineBuf, shardConnections); + CopyRowToPlacements(rowOutputState->fe_msgbuf, shardConnections); processedRowCount += 1; MemoryContextReset(tupleContext); } + if (shardConnections != NULL) + { + CopySendBinaryFooters(rowOutputState); + CopyRowToPlacements(rowOutputState->fe_msgbuf, shardConnections); + } + connectionList = ConnectionList(shardConnectionHash); EndRemoteCopy(connectionList, true); @@ -839,93 +724,12 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId) appendStringInfo(command, "COPY %s_%ld ", qualifiedName, shardId); - if (copyStatement->attlist != NIL) - { - AppendColumnNames(command, copyStatement->attlist); - } - - appendStringInfoString(command, "FROM STDIN"); - - if (copyStatement->options) - { - appendStringInfoString(command, " WITH "); - - AppendCopyOptions(command, copyStatement->options); - } + appendStringInfoString(command, "FROM STDIN WITH (FORMAT BINARY)"); return command; } -/* - * AppendCopyOptions deparses a list of CopyStmt options and appends them to command. - */ -static void -AppendCopyOptions(StringInfo command, List *copyOptionList) -{ - ListCell *optionCell = NULL; - char separator = '('; - - foreach(optionCell, copyOptionList) - { - DefElem *option = (DefElem *) lfirst(optionCell); - - if (strcmp(option->defname, "header") == 0 && defGetBoolean(option)) - { - /* worker should not skip header again */ - continue; - } - - appendStringInfo(command, "%c%s ", separator, option->defname); - - if (strcmp(option->defname, "force_not_null") == 0 || - strcmp(option->defname, "force_null") == 0) - { - if (!option->arg) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg( - "argument to option \"%s\" must be a list of column names", - option->defname))); - } - else - { - AppendColumnNames(command, (List *) option->arg); - } - } - else - { - appendStringInfo(command, "'%s'", defGetString(option)); - } - - separator = ','; - } - - appendStringInfoChar(command, ')'); -} - - -/* - * AppendColumnList deparses a list of column names into a StringInfo. - */ -static void -AppendColumnNames(StringInfo command, List *columnList) -{ - ListCell *attributeCell = NULL; - char separator = '('; - - foreach(attributeCell, columnList) - { - char *columnName = strVal(lfirst(attributeCell)); - appendStringInfo(command, "%c%s", separator, quote_identifier(columnName)); - separator = ','; - } - - appendStringInfoChar(command, ')'); -} - - /* * CopyRowToPlacements copies a row to a list of placements for a shard. */ @@ -1070,6 +874,44 @@ ReportCopyError(PGconn *connection, PGresult *result) } +/* + * ColumnOutputFunctions walks over a table's columns, and finds each column's + * type information. The function then resolves each type's output function, + * and stores and returns these output functions in an array. + */ +FmgrInfo * +ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat) +{ + uint32 columnCount = (uint32) rowDescriptor->natts; + FmgrInfo *columnOutputFunctions = palloc0(columnCount * sizeof(FmgrInfo)); + + uint32 columnIndex = 0; + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + FmgrInfo *currentOutputFunction = &columnOutputFunctions[columnIndex]; + Form_pg_attribute currentColumn = rowDescriptor->attrs[columnIndex]; + Oid columnTypeId = currentColumn->atttypid; + Oid outputFunctionId = InvalidOid; + bool typeVariableLength = false; + + if (binaryFormat) + { + getTypeBinaryOutputInfo(columnTypeId, &outputFunctionId, &typeVariableLength); + } + else + { + getTypeOutputInfo(columnTypeId, &outputFunctionId, &typeVariableLength); + } + + Assert(currentColumn->attisdropped == false); + + fmgr_info(outputFunctionId, currentOutputFunction); + } + + return columnOutputFunctions; +} + + /* * OutputRow serializes one row using the column output functions, * and appends the data to the row output state object's message buffer. @@ -1160,6 +1002,8 @@ CopySendBinaryHeaders(OutputCopyState headerOutputState) { const int32 zero = 0; + resetStringInfo(headerOutputState->fe_msgbuf); + /* Signature */ CopySendData(headerOutputState, BinarySignature, 11); @@ -1177,6 +1021,7 @@ CopySendBinaryFooters(OutputCopyState footerOutputState) { int16 negative = -1; + resetStringInfo(footerOutputState->fe_msgbuf); CopySendInt16(footerOutputState, negative); } diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index 4c8a7f47e..e0d2c9786 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -64,7 +64,6 @@ static void FilterAndPartitionTable(const char *filterQuery, FileOutputStream *partitionFileArray, uint32 fileCount); static int ColumnIndex(TupleDesc rowDescriptor, const char *columnName); -static FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); static OutputCopyState InitRowOutputState(void); static void ClearRowOutputState(OutputCopyState copyState); static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount); @@ -868,44 +867,6 @@ ColumnIndex(TupleDesc rowDescriptor, const char *columnName) } -/* - * ColumnOutputFunctions walks over a table's columns, and finds each column's - * type information. The function then resolves each type's output function, - * and stores and returns these output functions in an array. - */ -static FmgrInfo * -ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat) -{ - uint32 columnCount = (uint32) rowDescriptor->natts; - FmgrInfo *columnOutputFunctions = palloc0(columnCount * sizeof(FmgrInfo)); - - uint32 columnIndex = 0; - for (columnIndex = 0; columnIndex < columnCount; columnIndex++) - { - FmgrInfo *currentOutputFunction = &columnOutputFunctions[columnIndex]; - Form_pg_attribute currentColumn = rowDescriptor->attrs[columnIndex]; - Oid columnTypeId = currentColumn->atttypid; - Oid outputFunctionId = InvalidOid; - bool typeVariableLength = false; - - if (binaryFormat) - { - getTypeBinaryOutputInfo(columnTypeId, &outputFunctionId, &typeVariableLength); - } - else - { - getTypeOutputInfo(columnTypeId, &outputFunctionId, &typeVariableLength); - } - - Assert(currentColumn->attisdropped == false); - - fmgr_info(outputFunctionId, currentOutputFunction); - } - - return columnOutputFunctions; -} - - /* * InitRowOutputState creates and initializes a copy state object. This object * is internal to the copy command's implementation in Postgres; and we refactor diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 49c36a779..483d65601 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -43,6 +43,7 @@ typedef struct OutputCopyStateData typedef struct OutputCopyStateData *OutputCopyState; +extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern void OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions); extern void CopySendBinaryHeaders(OutputCopyState headerOutputState);