Send rows to workers in binary format

pull/366/head
Metin Doslu 2016-03-16 17:56:07 -07:00
parent 6d4e41b8bb
commit e0559ceaad
3 changed files with 72 additions and 265 deletions

View File

@ -97,136 +97,6 @@
int CopyTransactionManager = TRANSACTION_MANAGER_1PC; int CopyTransactionManager = TRANSACTION_MANAGER_1PC;
/* Data structures from copy.c, to keep track of COPY processing state */
typedef enum CopyDest
{
COPY_FILE, /* to/from file (or a piped program) */
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
COPY_NEW_FE /* to/from frontend (3.0 protocol) */
} CopyDest;
typedef enum EolType
{
EOL_UNKNOWN,
EOL_NL,
EOL_CR,
EOL_CRNL
} EolType;
typedef struct CopyStateData
{
/* low-level state data */
CopyDest copy_dest; /* type of copy source/destination */
FILE *copy_file; /* used if copy_dest == COPY_FILE */
StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
* dest == COPY_NEW_FE in COPY FROM */
bool fe_eof; /* true if detected end of copy data */
EolType eol_type; /* EOL type of input */
int file_encoding; /* file or remote side's character encoding */
bool need_transcoding; /* file encoding diff from server? */
bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
/* parameters from the COPY command */
Relation rel; /* relation to copy to or from */
QueryDesc *queryDesc; /* executable query to copy from */
List *attnumlist; /* integer list of attnums to copy */
char *filename; /* filename, or NULL for STDIN/STDOUT */
bool is_program; /* is 'filename' a program to popen? */
bool binary; /* binary format? */
bool oids; /* include OIDs? */
bool freeze; /* freeze rows on loading? */
bool csv_mode; /* Comma Separated Value format? */
bool header_line; /* CSV header line? */
char *null_print; /* NULL marker string (server encoding!) */
int null_print_len; /* length of same */
char *null_print_client; /* same converted to file encoding */
char *delim; /* column delimiter (must be 1 byte) */
char *quote; /* CSV quote char (must be 1 byte) */
char *escape; /* CSV escape char (must be 1 byte) */
List *force_quote; /* list of column names */
bool force_quote_all; /* FORCE QUOTE *? */
bool *force_quote_flags; /* per-column CSV FQ flags */
List *force_notnull; /* list of column names */
bool *force_notnull_flags; /* per-column CSV FNN flags */
#if PG_VERSION_NUM >= 90400
List *force_null; /* list of column names */
bool *force_null_flags; /* per-column CSV FN flags */
#endif
bool convert_selectively; /* do selective binary conversion? */
List *convert_select; /* list of column names (can be NIL) */
bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
/* these are just for error messages, see CopyFromErrorCallback */
const char *cur_relname; /* table name for error messages */
int cur_lineno; /* line number for error messages */
const char *cur_attname; /* current att for error messages */
const char *cur_attval; /* current att value for error messages */
/*
* Working state for COPY TO/FROM
*/
MemoryContext copycontext; /* per-copy execution context */
/*
* Working state for COPY TO
*/
FmgrInfo *out_functions; /* lookup info for output functions */
MemoryContext rowcontext; /* per-row evaluation context */
/*
* Working state for COPY FROM
*/
AttrNumber num_defaults;
bool file_has_oids;
FmgrInfo oid_in_function;
Oid oid_typioparam;
FmgrInfo *in_functions; /* array of input functions for each attrs */
Oid *typioparams; /* array of element types for in_functions */
int *defmap; /* array of default att numbers */
ExprState **defexprs; /* array of default att expressions */
bool volatile_defexprs; /* is any of defexprs volatile? */
List *range_table;
/*
* These variables are used to reduce overhead in textual COPY FROM.
*
* attribute_buf holds the separated, de-escaped text for each field of
* the current line. The CopyReadAttributes functions return arrays of
* pointers into this buffer. We avoid palloc/pfree overhead by re-using
* the buffer on each cycle.
*/
StringInfoData attribute_buf;
/* field raw data pointers found by COPY FROM */
int max_fields;
char **raw_fields;
/*
* Similarly, line_buf holds the whole input line being processed. The
* input cycle is first to read the whole line into line_buf, convert it
* to server encoding there, and then extract the individual attribute
* fields into attribute_buf. line_buf is preserved unmodified so that we
* can display it in error messages if appropriate.
*/
StringInfoData line_buf;
bool line_buf_converted; /* converted to server encoding? */
bool line_buf_valid; /* contains the row being processed? */
/*
* Finally, raw_buf holds raw data read from the data source (file or
* client connection). CopyReadLine parses this data sufficiently to
* locate line boundaries, then transfers the data to line_buf and
* converts it. Note: we guarantee that there is a \0 at
* raw_buf[raw_buf_len].
*/
#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
char *raw_buf;
int raw_buf_index; /* next byte to process */
int raw_buf_len; /* total # of bytes stored */
} CopyStateData;
/* ShardConnections represents a set of connections for each placement of a shard */ /* ShardConnections represents a set of connections for each placement of a shard */
typedef struct ShardConnections typedef struct ShardConnections
{ {
@ -255,8 +125,6 @@ static void OpenCopyTransactions(CopyStmt *copyStatement,
ShardConnections *shardConnections, ShardConnections *shardConnections,
int64 shardId); int64 shardId);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId); static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
static void AppendColumnNames(StringInfo command, List *columnList);
static void AppendCopyOptions(StringInfo command, List *copyOptionList);
static void CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections); static void CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections);
static List * ConnectionList(HTAB *connectionHash); static List * ConnectionList(HTAB *connectionHash);
static void EndRemoteCopy(List *connectionList, bool stopOnFailure); static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
@ -288,6 +156,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
HTAB *shardConnectionHash = NULL; HTAB *shardConnectionHash = NULL;
List *connectionList = NIL; List *connectionList = NIL;
MemoryContext tupleContext = NULL; MemoryContext tupleContext = NULL;
MemoryContext outputContext = NULL;
CopyState copyState = NULL; CopyState copyState = NULL;
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
uint32 columnCount = 0; uint32 columnCount = 0;
@ -302,6 +171,9 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
int shardCount = 0; int shardCount = 0;
uint64 processedRowCount = 0; uint64 processedRowCount = 0;
ErrorContextCallback errorCallback; ErrorContextCallback errorCallback;
ShardConnections *shardConnections = NULL;
OutputCopyState rowOutputState = NULL;
FmgrInfo *columnOutputFunctions = NULL;
/* disallow COPY to/from file or program except for superusers */ /* disallow COPY to/from file or program except for superusers */
if (copyStatement->filename != NULL && !superuser()) if (copyStatement->filename != NULL && !superuser())
@ -404,12 +276,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
copyStatement->attlist, copyStatement->attlist,
copyStatement->options); copyStatement->options);
if (copyState->binary)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Copy in binary mode is not currently supported")));
}
/* set up callback to identify error line number */ /* set up callback to identify error line number */
errorCallback.callback = CopyFromErrorCallback; errorCallback.callback = CopyFromErrorCallback;
errorCallback.arg = (void *) copyState; errorCallback.arg = (void *) copyState;
@ -429,6 +295,19 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
outputContext = AllocSetContextCreate(CurrentMemoryContext,
"COPY Output Memory Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, true);
rowOutputState = (OutputCopyState) palloc0(sizeof(OutputCopyStateData));
rowOutputState->binary = true;
rowOutputState->fe_msgbuf = makeStringInfo();
rowOutputState->rowcontext = outputContext;
/* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */ /* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */
PG_TRY(); PG_TRY();
{ {
@ -438,9 +317,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
Datum partitionColumnValue = 0; Datum partitionColumnValue = 0;
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
int64 shardId = 0; int64 shardId = 0;
ShardConnections *shardConnections = NULL;
bool found = false; bool found = false;
StringInfo lineBuf = NULL;
MemoryContext oldContext = NULL; MemoryContext oldContext = NULL;
oldContext = MemoryContextSwitchTo(tupleContext); oldContext = MemoryContextSwitchTo(tupleContext);
@ -494,20 +371,28 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
shardConnections->connectionList = NIL; shardConnections->connectionList = NIL;
OpenCopyTransactions(copyStatement, shardConnections, shardId); OpenCopyTransactions(copyStatement, shardConnections, shardId);
CopySendBinaryHeaders(rowOutputState);
CopyRowToPlacements(rowOutputState->fe_msgbuf, shardConnections);
} }
/* get the (truncated) line buffer */ OutputRow(columnValues, columnNulls, tupleDescriptor, rowOutputState,
lineBuf = &copyState->line_buf; columnOutputFunctions);
lineBuf->data[lineBuf->len++] = '\n';
/* Replicate row to all shard placements */ /* Replicate row to all shard placements */
CopyRowToPlacements(lineBuf, shardConnections); CopyRowToPlacements(rowOutputState->fe_msgbuf, shardConnections);
processedRowCount += 1; processedRowCount += 1;
MemoryContextReset(tupleContext); MemoryContextReset(tupleContext);
} }
if (shardConnections != NULL)
{
CopySendBinaryFooters(rowOutputState);
CopyRowToPlacements(rowOutputState->fe_msgbuf, shardConnections);
}
connectionList = ConnectionList(shardConnectionHash); connectionList = ConnectionList(shardConnectionHash);
EndRemoteCopy(connectionList, true); EndRemoteCopy(connectionList, true);
@ -839,93 +724,12 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId)
appendStringInfo(command, "COPY %s_%ld ", qualifiedName, shardId); appendStringInfo(command, "COPY %s_%ld ", qualifiedName, shardId);
if (copyStatement->attlist != NIL) appendStringInfoString(command, "FROM STDIN WITH (FORMAT BINARY)");
{
AppendColumnNames(command, copyStatement->attlist);
}
appendStringInfoString(command, "FROM STDIN");
if (copyStatement->options)
{
appendStringInfoString(command, " WITH ");
AppendCopyOptions(command, copyStatement->options);
}
return command; return command;
} }
/*
* AppendCopyOptions deparses a list of CopyStmt options and appends them to command.
*/
static void
AppendCopyOptions(StringInfo command, List *copyOptionList)
{
ListCell *optionCell = NULL;
char separator = '(';
foreach(optionCell, copyOptionList)
{
DefElem *option = (DefElem *) lfirst(optionCell);
if (strcmp(option->defname, "header") == 0 && defGetBoolean(option))
{
/* worker should not skip header again */
continue;
}
appendStringInfo(command, "%c%s ", separator, option->defname);
if (strcmp(option->defname, "force_not_null") == 0 ||
strcmp(option->defname, "force_null") == 0)
{
if (!option->arg)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg(
"argument to option \"%s\" must be a list of column names",
option->defname)));
}
else
{
AppendColumnNames(command, (List *) option->arg);
}
}
else
{
appendStringInfo(command, "'%s'", defGetString(option));
}
separator = ',';
}
appendStringInfoChar(command, ')');
}
/*
* AppendColumnList deparses a list of column names into a StringInfo.
*/
static void
AppendColumnNames(StringInfo command, List *columnList)
{
ListCell *attributeCell = NULL;
char separator = '(';
foreach(attributeCell, columnList)
{
char *columnName = strVal(lfirst(attributeCell));
appendStringInfo(command, "%c%s", separator, quote_identifier(columnName));
separator = ',';
}
appendStringInfoChar(command, ')');
}
/* /*
* CopyRowToPlacements copies a row to a list of placements for a shard. * CopyRowToPlacements copies a row to a list of placements for a shard.
*/ */
@ -1070,6 +874,44 @@ ReportCopyError(PGconn *connection, PGresult *result)
} }
/*
* ColumnOutputFunctions walks over a table's columns, and finds each column's
* type information. The function then resolves each type's output function,
* and stores and returns these output functions in an array.
*/
FmgrInfo *
ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
{
uint32 columnCount = (uint32) rowDescriptor->natts;
FmgrInfo *columnOutputFunctions = palloc0(columnCount * sizeof(FmgrInfo));
uint32 columnIndex = 0;
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
FmgrInfo *currentOutputFunction = &columnOutputFunctions[columnIndex];
Form_pg_attribute currentColumn = rowDescriptor->attrs[columnIndex];
Oid columnTypeId = currentColumn->atttypid;
Oid outputFunctionId = InvalidOid;
bool typeVariableLength = false;
if (binaryFormat)
{
getTypeBinaryOutputInfo(columnTypeId, &outputFunctionId, &typeVariableLength);
}
else
{
getTypeOutputInfo(columnTypeId, &outputFunctionId, &typeVariableLength);
}
Assert(currentColumn->attisdropped == false);
fmgr_info(outputFunctionId, currentOutputFunction);
}
return columnOutputFunctions;
}
/* /*
* OutputRow serializes one row using the column output functions, * OutputRow serializes one row using the column output functions,
* and appends the data to the row output state object's message buffer. * and appends the data to the row output state object's message buffer.
@ -1160,6 +1002,8 @@ CopySendBinaryHeaders(OutputCopyState headerOutputState)
{ {
const int32 zero = 0; const int32 zero = 0;
resetStringInfo(headerOutputState->fe_msgbuf);
/* Signature */ /* Signature */
CopySendData(headerOutputState, BinarySignature, 11); CopySendData(headerOutputState, BinarySignature, 11);
@ -1177,6 +1021,7 @@ CopySendBinaryFooters(OutputCopyState footerOutputState)
{ {
int16 negative = -1; int16 negative = -1;
resetStringInfo(footerOutputState->fe_msgbuf);
CopySendInt16(footerOutputState, negative); CopySendInt16(footerOutputState, negative);
} }

View File

@ -64,7 +64,6 @@ static void FilterAndPartitionTable(const char *filterQuery,
FileOutputStream *partitionFileArray, FileOutputStream *partitionFileArray,
uint32 fileCount); uint32 fileCount);
static int ColumnIndex(TupleDesc rowDescriptor, const char *columnName); static int ColumnIndex(TupleDesc rowDescriptor, const char *columnName);
static FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
static OutputCopyState InitRowOutputState(void); static OutputCopyState InitRowOutputState(void);
static void ClearRowOutputState(OutputCopyState copyState); static void ClearRowOutputState(OutputCopyState copyState);
static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount); static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount);
@ -868,44 +867,6 @@ ColumnIndex(TupleDesc rowDescriptor, const char *columnName)
} }
/*
* ColumnOutputFunctions walks over a table's columns, and finds each column's
* type information. The function then resolves each type's output function,
* and stores and returns these output functions in an array.
*/
static FmgrInfo *
ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
{
uint32 columnCount = (uint32) rowDescriptor->natts;
FmgrInfo *columnOutputFunctions = palloc0(columnCount * sizeof(FmgrInfo));
uint32 columnIndex = 0;
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
FmgrInfo *currentOutputFunction = &columnOutputFunctions[columnIndex];
Form_pg_attribute currentColumn = rowDescriptor->attrs[columnIndex];
Oid columnTypeId = currentColumn->atttypid;
Oid outputFunctionId = InvalidOid;
bool typeVariableLength = false;
if (binaryFormat)
{
getTypeBinaryOutputInfo(columnTypeId, &outputFunctionId, &typeVariableLength);
}
else
{
getTypeOutputInfo(columnTypeId, &outputFunctionId, &typeVariableLength);
}
Assert(currentColumn->attisdropped == false);
fmgr_info(outputFunctionId, currentOutputFunction);
}
return columnOutputFunctions;
}
/* /*
* InitRowOutputState creates and initializes a copy state object. This object * InitRowOutputState creates and initializes a copy state object. This object
* is internal to the copy command's implementation in Postgres; and we refactor * is internal to the copy command's implementation in Postgres; and we refactor

View File

@ -43,6 +43,7 @@ typedef struct OutputCopyStateData
typedef struct OutputCopyStateData *OutputCopyState; typedef struct OutputCopyStateData *OutputCopyState;
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
extern void OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, extern void OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions); OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions);
extern void CopySendBinaryHeaders(OutputCopyState headerOutputState); extern void CopySendBinaryHeaders(OutputCopyState headerOutputState);