diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 4cc3f738c..d84865889 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -125,18 +125,18 @@ static void OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections, int64 shardId); static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId); -static void CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections); +static void SendCopyDataToPlacements(StringInfo lineBuf, + ShardConnections *shardConnections); static List * ConnectionList(HTAB *connectionHash); static void EndRemoteCopy(List *connectionList, bool stopOnFailure); static void ReportCopyError(PGconn *connection, PGresult *result); -static void CopySendData(OutputCopyState outputState, const void *databuf, int datasize); -static void CopySendString(OutputCopyState outputState, const char *str); -static void CopySendChar(OutputCopyState outputState, char c); -static void CopySendInt32(OutputCopyState outputState, int32 val); -static void CopySendInt16(OutputCopyState outputState, int16 val); -static void CopyAttributeOutText(OutputCopyState outputState, char *string); -static inline void CopyFlushOutput(OutputCopyState outputState, char *start, - char *pointer); +static void CopySendData(CopyOutState outputState, const void *databuf, int datasize); +static void CopySendString(CopyOutState outputState, const char *str); +static void CopySendChar(CopyOutState outputState, char c); +static void CopySendInt32(CopyOutState outputState, int32 val); +static void CopySendInt16(CopyOutState outputState, int16 val); +static void CopyAttributeOutText(CopyOutState outputState, char *string); +static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer); /* @@ -171,7 +171,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) uint64 processedRowCount = 0; ErrorContextCallback errorCallback; ShardConnections *shardConnections = NULL; - OutputCopyState rowOutputState = NULL; + CopyOutState copyOutState = NULL; FmgrInfo *columnOutputFunctions = NULL; /* disallow COPY to/from file or program except for superusers */ @@ -294,13 +294,12 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - rowOutputState = (OutputCopyState) palloc0(sizeof(OutputCopyStateData)); - rowOutputState->binary = true; - rowOutputState->fe_msgbuf = makeStringInfo(); - rowOutputState->rowcontext = tupleContext; + copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); + copyOutState->binary = true; + copyOutState->fe_msgbuf = makeStringInfo(); + copyOutState->rowcontext = tupleContext; - columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, - rowOutputState->binary); + columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); /* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */ PG_TRY(); @@ -366,15 +365,14 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) OpenCopyTransactions(copyStatement, shardConnections, shardId); - CopySendBinaryHeaders(rowOutputState); - CopyRowToPlacements(rowOutputState->fe_msgbuf, shardConnections); + CopySendBinaryHeaders(copyOutState); + SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections); } - OutputRow(columnValues, columnNulls, tupleDescriptor, rowOutputState, - columnOutputFunctions); - /* Replicate row to all shard placements */ - CopyRowToPlacements(rowOutputState->fe_msgbuf, shardConnections); + CopySendRow(columnValues, columnNulls, tupleDescriptor, copyOutState, + columnOutputFunctions); + SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections); processedRowCount += 1; @@ -383,8 +381,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) if (shardConnections != NULL) { - CopySendBinaryFooters(rowOutputState); - CopyRowToPlacements(rowOutputState->fe_msgbuf, shardConnections); + CopySendBinaryFooters(copyOutState); + SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections); } connectionList = ConnectionList(shardConnectionHash); @@ -725,10 +723,11 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId) /* - * CopyRowToPlacements copies a row to a list of placements for a shard. + * SendCopyDataToPlacements copies given copy data to a list of placements for + * a shard. */ static void -CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections) +SendCopyDataToPlacements(StringInfo lineBuf, ShardConnections *shardConnections) { ListCell *connectionCell = NULL; foreach(connectionCell, shardConnections->connectionList) @@ -907,7 +906,7 @@ ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat) /* - * OutputRow serializes one row using the column output functions, + * CopySendRow serializes one row using the column output functions, * and appends the data to the row output state object's message buffer. * This function is modeled after the CopyOneRowTo() function in * commands/copy.c, but only implements a subset of that functionality. @@ -915,8 +914,8 @@ ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat) * to not bloat memory usage. */ void -OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, - OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions) +CopySendRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, + CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions) { MemoryContext oldContext = NULL; uint32 columnIndex = 0; @@ -993,7 +992,7 @@ OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, /* Append binary headers to the copy buffer in headerOutputState. */ void -CopySendBinaryHeaders(OutputCopyState headerOutputState) +CopySendBinaryHeaders(CopyOutState headerOutputState) { const int32 zero = 0; @@ -1012,7 +1011,7 @@ CopySendBinaryHeaders(OutputCopyState headerOutputState) /* Append binary footers to the copy buffer in footerOutputState. */ void -CopySendBinaryFooters(OutputCopyState footerOutputState) +CopySendBinaryFooters(CopyOutState footerOutputState) { int16 negative = -1; @@ -1024,7 +1023,7 @@ CopySendBinaryFooters(OutputCopyState footerOutputState) /* *INDENT-OFF* */ /* Append data to the copy buffer in outputState */ static void -CopySendData(OutputCopyState outputState, const void *databuf, int datasize) +CopySendData(CopyOutState outputState, const void *databuf, int datasize) { appendBinaryStringInfo(outputState->fe_msgbuf, databuf, datasize); } @@ -1032,7 +1031,7 @@ CopySendData(OutputCopyState outputState, const void *databuf, int datasize) /* Append a striong to the copy buffer in outputState. */ static void -CopySendString(OutputCopyState outputState, const char *str) +CopySendString(CopyOutState outputState, const char *str) { appendBinaryStringInfo(outputState->fe_msgbuf, str, strlen(str)); } @@ -1040,7 +1039,7 @@ CopySendString(OutputCopyState outputState, const char *str) /* Append a char to the copy buffer in outputState. */ static void -CopySendChar(OutputCopyState outputState, char c) +CopySendChar(CopyOutState outputState, char c) { appendStringInfoCharMacro(outputState->fe_msgbuf, c); } @@ -1048,7 +1047,7 @@ CopySendChar(OutputCopyState outputState, char c) /* Append an int32 to the copy buffer in outputState. */ static void -CopySendInt32(OutputCopyState outputState, int32 val) +CopySendInt32(CopyOutState outputState, int32 val) { uint32 buf = htonl((uint32) val); CopySendData(outputState, &buf, sizeof(buf)); @@ -1057,7 +1056,7 @@ CopySendInt32(OutputCopyState outputState, int32 val) /* Append an int16 to the copy buffer in outputState. */ static void -CopySendInt16(OutputCopyState outputState, int16 val) +CopySendInt16(CopyOutState outputState, int16 val) { uint16 buf = htons((uint16) val); CopySendData(outputState, &buf, sizeof(buf)); @@ -1071,7 +1070,7 @@ CopySendInt16(OutputCopyState outputState, int16 val) * our coding style. The function should be kept in sync with copy.c. */ static void -CopyAttributeOutText(OutputCopyState cstate, char *string) +CopyAttributeOutText(CopyOutState cstate, char *string) { char *pointer = NULL; char *start = NULL; @@ -1164,7 +1163,7 @@ CopyAttributeOutText(OutputCopyState cstate, char *string) /* *INDENT-ON* */ /* Helper function to send pending copy output */ static inline void -CopyFlushOutput(OutputCopyState cstate, char *start, char *pointer) +CopyFlushOutput(CopyOutState cstate, char *start, char *pointer) { if (pointer > start) { diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index 1690eeee9..7bc46dbde 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -64,8 +64,8 @@ static void FilterAndPartitionTable(const char *filterQuery, FileOutputStream *partitionFileArray, uint32 fileCount); static int ColumnIndex(TupleDesc rowDescriptor, const char *columnName); -static OutputCopyState InitRowOutputState(void); -static void ClearRowOutputState(OutputCopyState copyState); +static CopyOutState InitRowOutputState(void); +static void ClearRowOutputState(CopyOutState copyState); static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount); static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount); static uint32 RangePartitionId(Datum partitionValue, const void *context); @@ -720,7 +720,7 @@ FilterAndPartitionTable(const char *filterQuery, FileOutputStream *partitionFileArray, uint32 fileCount) { - OutputCopyState rowOutputState = NULL; + CopyOutState rowOutputState = NULL; FmgrInfo *columnOutputFunctions = NULL; int partitionColumnIndex = 0; Oid partitionColumnTypeId = InvalidOid; @@ -814,9 +814,8 @@ FilterAndPartitionTable(const char *filterQuery, /* deconstruct the tuple; this is faster than repeated heap_getattr */ heap_deform_tuple(row, rowDescriptor, valueArray, isNullArray); - OutputRow(valueArray, isNullArray, rowDescriptor, rowOutputState, - columnOutputFunctions); - + CopySendRow(valueArray, isNullArray, rowDescriptor, rowOutputState, + columnOutputFunctions); rowText = rowOutputState->fe_msgbuf; partitionFile = partitionFileArray[partitionId]; @@ -881,11 +880,10 @@ ColumnIndex(TupleDesc rowDescriptor, const char *columnName) * must match one another. Therefore, any changes to the default values in the * copy command must be propagated to this function. */ -static OutputCopyState +static CopyOutState InitRowOutputState(void) { - OutputCopyState rowOutputState = - (OutputCopyState) palloc0(sizeof(OutputCopyStateData)); + CopyOutState rowOutputState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); int fileEncoding = pg_get_client_encoding(); int databaseEncoding = GetDatabaseEncoding(); @@ -942,7 +940,7 @@ InitRowOutputState(void) /* Clears copy state used for outputting row data. */ static void -ClearRowOutputState(OutputCopyState rowOutputState) +ClearRowOutputState(CopyOutState rowOutputState) { Assert(rowOutputState != NULL); @@ -969,10 +967,10 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount) { /* Generate header for a binary copy */ FileOutputStream partitionFile = { 0, 0, 0 }; - OutputCopyStateData headerOutputStateData; - OutputCopyState headerOutputState = (OutputCopyState) & headerOutputStateData; + CopyOutStateData headerOutputStateData; + CopyOutState headerOutputState = (CopyOutState) & headerOutputStateData; - memset(headerOutputState, 0, sizeof(OutputCopyStateData)); + memset(headerOutputState, 0, sizeof(CopyOutStateData)); headerOutputState->fe_msgbuf = makeStringInfo(); CopySendBinaryHeaders(headerOutputState); @@ -995,10 +993,10 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount) { /* Generate footer for a binary copy */ FileOutputStream partitionFile = { 0, 0, 0 }; - OutputCopyStateData footerOutputStateData; - OutputCopyState footerOutputState = (OutputCopyState) & footerOutputStateData; + CopyOutStateData footerOutputStateData; + CopyOutState footerOutputState = (CopyOutState) & footerOutputStateData; - memset(footerOutputState, 0, sizeof(OutputCopyStateData)); + memset(footerOutputState, 0, sizeof(CopyOutStateData)); footerOutputState->fe_msgbuf = makeStringInfo(); CopySendBinaryFooters(footerOutputState); diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 8aab98747..e31558878 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -27,7 +27,7 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; * necessary to copy out results. While it'd be a bit nicer to share code, * it'd require changing core postgres code. */ -typedef struct OutputCopyStateData +typedef struct CopyOutStateData { StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for * dest == COPY_NEW_FE in COPY FROM */ @@ -39,17 +39,17 @@ typedef struct OutputCopyStateData char *delim; /* column delimiter (must be 1 byte) */ MemoryContext rowcontext; /* per-row evaluation context */ -} OutputCopyStateData; +} CopyOutStateData; -typedef struct OutputCopyStateData *OutputCopyState; +typedef struct CopyOutStateData *CopyOutState; /* function declarations for copying into a distributed table */ extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); -extern void OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, - OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions); -extern void CopySendBinaryHeaders(OutputCopyState headerOutputState); -extern void CopySendBinaryFooters(OutputCopyState footerOutputState); +extern void CopySendRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, + CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions); +extern void CopySendBinaryHeaders(CopyOutState headerOutputState); +extern void CopySendBinaryFooters(CopyOutState footerOutputState); extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);