Rename functions and structs

pull/366/head
Metin Doslu 2016-03-18 15:23:03 -07:00
parent eb41a249c9
commit 7afac2a377
3 changed files with 58 additions and 61 deletions

View File

@ -125,18 +125,18 @@ static void OpenCopyTransactions(CopyStmt *copyStatement,
ShardConnections *shardConnections, ShardConnections *shardConnections,
int64 shardId); int64 shardId);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId); static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
static void CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections); static void SendCopyDataToPlacements(StringInfo lineBuf,
ShardConnections *shardConnections);
static List * ConnectionList(HTAB *connectionHash); static List * ConnectionList(HTAB *connectionHash);
static void EndRemoteCopy(List *connectionList, bool stopOnFailure); static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
static void ReportCopyError(PGconn *connection, PGresult *result); static void ReportCopyError(PGconn *connection, PGresult *result);
static void CopySendData(OutputCopyState outputState, const void *databuf, int datasize); static void CopySendData(CopyOutState outputState, const void *databuf, int datasize);
static void CopySendString(OutputCopyState outputState, const char *str); static void CopySendString(CopyOutState outputState, const char *str);
static void CopySendChar(OutputCopyState outputState, char c); static void CopySendChar(CopyOutState outputState, char c);
static void CopySendInt32(OutputCopyState outputState, int32 val); static void CopySendInt32(CopyOutState outputState, int32 val);
static void CopySendInt16(OutputCopyState outputState, int16 val); static void CopySendInt16(CopyOutState outputState, int16 val);
static void CopyAttributeOutText(OutputCopyState outputState, char *string); static void CopyAttributeOutText(CopyOutState outputState, char *string);
static inline void CopyFlushOutput(OutputCopyState outputState, char *start, static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer);
char *pointer);
/* /*
@ -171,7 +171,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
uint64 processedRowCount = 0; uint64 processedRowCount = 0;
ErrorContextCallback errorCallback; ErrorContextCallback errorCallback;
ShardConnections *shardConnections = NULL; ShardConnections *shardConnections = NULL;
OutputCopyState rowOutputState = NULL; CopyOutState copyOutState = NULL;
FmgrInfo *columnOutputFunctions = NULL; FmgrInfo *columnOutputFunctions = NULL;
/* disallow COPY to/from file or program except for superusers */ /* disallow COPY to/from file or program except for superusers */
@ -294,13 +294,12 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
rowOutputState = (OutputCopyState) palloc0(sizeof(OutputCopyStateData)); copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
rowOutputState->binary = true; copyOutState->binary = true;
rowOutputState->fe_msgbuf = makeStringInfo(); copyOutState->fe_msgbuf = makeStringInfo();
rowOutputState->rowcontext = tupleContext; copyOutState->rowcontext = tupleContext;
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
rowOutputState->binary);
/* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */ /* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */
PG_TRY(); PG_TRY();
@ -366,15 +365,14 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
OpenCopyTransactions(copyStatement, shardConnections, shardId); OpenCopyTransactions(copyStatement, shardConnections, shardId);
CopySendBinaryHeaders(rowOutputState); CopySendBinaryHeaders(copyOutState);
CopyRowToPlacements(rowOutputState->fe_msgbuf, shardConnections); SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections);
} }
OutputRow(columnValues, columnNulls, tupleDescriptor, rowOutputState,
columnOutputFunctions);
/* Replicate row to all shard placements */ /* Replicate row to all shard placements */
CopyRowToPlacements(rowOutputState->fe_msgbuf, shardConnections); CopySendRow(columnValues, columnNulls, tupleDescriptor, copyOutState,
columnOutputFunctions);
SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections);
processedRowCount += 1; processedRowCount += 1;
@ -383,8 +381,8 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
if (shardConnections != NULL) if (shardConnections != NULL)
{ {
CopySendBinaryFooters(rowOutputState); CopySendBinaryFooters(copyOutState);
CopyRowToPlacements(rowOutputState->fe_msgbuf, shardConnections); SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections);
} }
connectionList = ConnectionList(shardConnectionHash); connectionList = ConnectionList(shardConnectionHash);
@ -725,10 +723,11 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId)
/* /*
* CopyRowToPlacements copies a row to a list of placements for a shard. * SendCopyDataToPlacements copies given copy data to a list of placements for
* a shard.
*/ */
static void static void
CopyRowToPlacements(StringInfo lineBuf, ShardConnections *shardConnections) SendCopyDataToPlacements(StringInfo lineBuf, ShardConnections *shardConnections)
{ {
ListCell *connectionCell = NULL; ListCell *connectionCell = NULL;
foreach(connectionCell, shardConnections->connectionList) foreach(connectionCell, shardConnections->connectionList)
@ -907,7 +906,7 @@ ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
/* /*
* OutputRow serializes one row using the column output functions, * CopySendRow serializes one row using the column output functions,
* and appends the data to the row output state object's message buffer. * and appends the data to the row output state object's message buffer.
* This function is modeled after the CopyOneRowTo() function in * This function is modeled after the CopyOneRowTo() function in
* commands/copy.c, but only implements a subset of that functionality. * commands/copy.c, but only implements a subset of that functionality.
@ -915,8 +914,8 @@ ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
* to not bloat memory usage. * to not bloat memory usage.
*/ */
void void
OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, CopySendRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions) CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions)
{ {
MemoryContext oldContext = NULL; MemoryContext oldContext = NULL;
uint32 columnIndex = 0; uint32 columnIndex = 0;
@ -993,7 +992,7 @@ OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
/* Append binary headers to the copy buffer in headerOutputState. */ /* Append binary headers to the copy buffer in headerOutputState. */
void void
CopySendBinaryHeaders(OutputCopyState headerOutputState) CopySendBinaryHeaders(CopyOutState headerOutputState)
{ {
const int32 zero = 0; const int32 zero = 0;
@ -1012,7 +1011,7 @@ CopySendBinaryHeaders(OutputCopyState headerOutputState)
/* Append binary footers to the copy buffer in footerOutputState. */ /* Append binary footers to the copy buffer in footerOutputState. */
void void
CopySendBinaryFooters(OutputCopyState footerOutputState) CopySendBinaryFooters(CopyOutState footerOutputState)
{ {
int16 negative = -1; int16 negative = -1;
@ -1024,7 +1023,7 @@ CopySendBinaryFooters(OutputCopyState footerOutputState)
/* *INDENT-OFF* */ /* *INDENT-OFF* */
/* Append data to the copy buffer in outputState */ /* Append data to the copy buffer in outputState */
static void static void
CopySendData(OutputCopyState outputState, const void *databuf, int datasize) CopySendData(CopyOutState outputState, const void *databuf, int datasize)
{ {
appendBinaryStringInfo(outputState->fe_msgbuf, databuf, datasize); appendBinaryStringInfo(outputState->fe_msgbuf, databuf, datasize);
} }
@ -1032,7 +1031,7 @@ CopySendData(OutputCopyState outputState, const void *databuf, int datasize)
/* Append a striong to the copy buffer in outputState. */ /* Append a striong to the copy buffer in outputState. */
static void static void
CopySendString(OutputCopyState outputState, const char *str) CopySendString(CopyOutState outputState, const char *str)
{ {
appendBinaryStringInfo(outputState->fe_msgbuf, str, strlen(str)); appendBinaryStringInfo(outputState->fe_msgbuf, str, strlen(str));
} }
@ -1040,7 +1039,7 @@ CopySendString(OutputCopyState outputState, const char *str)
/* Append a char to the copy buffer in outputState. */ /* Append a char to the copy buffer in outputState. */
static void static void
CopySendChar(OutputCopyState outputState, char c) CopySendChar(CopyOutState outputState, char c)
{ {
appendStringInfoCharMacro(outputState->fe_msgbuf, c); appendStringInfoCharMacro(outputState->fe_msgbuf, c);
} }
@ -1048,7 +1047,7 @@ CopySendChar(OutputCopyState outputState, char c)
/* Append an int32 to the copy buffer in outputState. */ /* Append an int32 to the copy buffer in outputState. */
static void static void
CopySendInt32(OutputCopyState outputState, int32 val) CopySendInt32(CopyOutState outputState, int32 val)
{ {
uint32 buf = htonl((uint32) val); uint32 buf = htonl((uint32) val);
CopySendData(outputState, &buf, sizeof(buf)); CopySendData(outputState, &buf, sizeof(buf));
@ -1057,7 +1056,7 @@ CopySendInt32(OutputCopyState outputState, int32 val)
/* Append an int16 to the copy buffer in outputState. */ /* Append an int16 to the copy buffer in outputState. */
static void static void
CopySendInt16(OutputCopyState outputState, int16 val) CopySendInt16(CopyOutState outputState, int16 val)
{ {
uint16 buf = htons((uint16) val); uint16 buf = htons((uint16) val);
CopySendData(outputState, &buf, sizeof(buf)); CopySendData(outputState, &buf, sizeof(buf));
@ -1071,7 +1070,7 @@ CopySendInt16(OutputCopyState outputState, int16 val)
* our coding style. The function should be kept in sync with copy.c. * our coding style. The function should be kept in sync with copy.c.
*/ */
static void static void
CopyAttributeOutText(OutputCopyState cstate, char *string) CopyAttributeOutText(CopyOutState cstate, char *string)
{ {
char *pointer = NULL; char *pointer = NULL;
char *start = NULL; char *start = NULL;
@ -1164,7 +1163,7 @@ CopyAttributeOutText(OutputCopyState cstate, char *string)
/* *INDENT-ON* */ /* *INDENT-ON* */
/* Helper function to send pending copy output */ /* Helper function to send pending copy output */
static inline void static inline void
CopyFlushOutput(OutputCopyState cstate, char *start, char *pointer) CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
{ {
if (pointer > start) if (pointer > start)
{ {

View File

@ -64,8 +64,8 @@ static void FilterAndPartitionTable(const char *filterQuery,
FileOutputStream *partitionFileArray, FileOutputStream *partitionFileArray,
uint32 fileCount); uint32 fileCount);
static int ColumnIndex(TupleDesc rowDescriptor, const char *columnName); static int ColumnIndex(TupleDesc rowDescriptor, const char *columnName);
static OutputCopyState InitRowOutputState(void); static CopyOutState InitRowOutputState(void);
static void ClearRowOutputState(OutputCopyState copyState); static void ClearRowOutputState(CopyOutState copyState);
static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount); static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount);
static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount); static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount);
static uint32 RangePartitionId(Datum partitionValue, const void *context); static uint32 RangePartitionId(Datum partitionValue, const void *context);
@ -720,7 +720,7 @@ FilterAndPartitionTable(const char *filterQuery,
FileOutputStream *partitionFileArray, FileOutputStream *partitionFileArray,
uint32 fileCount) uint32 fileCount)
{ {
OutputCopyState rowOutputState = NULL; CopyOutState rowOutputState = NULL;
FmgrInfo *columnOutputFunctions = NULL; FmgrInfo *columnOutputFunctions = NULL;
int partitionColumnIndex = 0; int partitionColumnIndex = 0;
Oid partitionColumnTypeId = InvalidOid; Oid partitionColumnTypeId = InvalidOid;
@ -814,9 +814,8 @@ FilterAndPartitionTable(const char *filterQuery,
/* deconstruct the tuple; this is faster than repeated heap_getattr */ /* deconstruct the tuple; this is faster than repeated heap_getattr */
heap_deform_tuple(row, rowDescriptor, valueArray, isNullArray); heap_deform_tuple(row, rowDescriptor, valueArray, isNullArray);
OutputRow(valueArray, isNullArray, rowDescriptor, rowOutputState, CopySendRow(valueArray, isNullArray, rowDescriptor, rowOutputState,
columnOutputFunctions); columnOutputFunctions);
rowText = rowOutputState->fe_msgbuf; rowText = rowOutputState->fe_msgbuf;
partitionFile = partitionFileArray[partitionId]; partitionFile = partitionFileArray[partitionId];
@ -881,11 +880,10 @@ ColumnIndex(TupleDesc rowDescriptor, const char *columnName)
* must match one another. Therefore, any changes to the default values in the * must match one another. Therefore, any changes to the default values in the
* copy command must be propagated to this function. * copy command must be propagated to this function.
*/ */
static OutputCopyState static CopyOutState
InitRowOutputState(void) InitRowOutputState(void)
{ {
OutputCopyState rowOutputState = CopyOutState rowOutputState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
(OutputCopyState) palloc0(sizeof(OutputCopyStateData));
int fileEncoding = pg_get_client_encoding(); int fileEncoding = pg_get_client_encoding();
int databaseEncoding = GetDatabaseEncoding(); int databaseEncoding = GetDatabaseEncoding();
@ -942,7 +940,7 @@ InitRowOutputState(void)
/* Clears copy state used for outputting row data. */ /* Clears copy state used for outputting row data. */
static void static void
ClearRowOutputState(OutputCopyState rowOutputState) ClearRowOutputState(CopyOutState rowOutputState)
{ {
Assert(rowOutputState != NULL); Assert(rowOutputState != NULL);
@ -969,10 +967,10 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount)
{ {
/* Generate header for a binary copy */ /* Generate header for a binary copy */
FileOutputStream partitionFile = { 0, 0, 0 }; FileOutputStream partitionFile = { 0, 0, 0 };
OutputCopyStateData headerOutputStateData; CopyOutStateData headerOutputStateData;
OutputCopyState headerOutputState = (OutputCopyState) & headerOutputStateData; CopyOutState headerOutputState = (CopyOutState) & headerOutputStateData;
memset(headerOutputState, 0, sizeof(OutputCopyStateData)); memset(headerOutputState, 0, sizeof(CopyOutStateData));
headerOutputState->fe_msgbuf = makeStringInfo(); headerOutputState->fe_msgbuf = makeStringInfo();
CopySendBinaryHeaders(headerOutputState); CopySendBinaryHeaders(headerOutputState);
@ -995,10 +993,10 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount)
{ {
/* Generate footer for a binary copy */ /* Generate footer for a binary copy */
FileOutputStream partitionFile = { 0, 0, 0 }; FileOutputStream partitionFile = { 0, 0, 0 };
OutputCopyStateData footerOutputStateData; CopyOutStateData footerOutputStateData;
OutputCopyState footerOutputState = (OutputCopyState) & footerOutputStateData; CopyOutState footerOutputState = (CopyOutState) & footerOutputStateData;
memset(footerOutputState, 0, sizeof(OutputCopyStateData)); memset(footerOutputState, 0, sizeof(CopyOutStateData));
footerOutputState->fe_msgbuf = makeStringInfo(); footerOutputState->fe_msgbuf = makeStringInfo();
CopySendBinaryFooters(footerOutputState); CopySendBinaryFooters(footerOutputState);

View File

@ -27,7 +27,7 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
* necessary to copy out results. While it'd be a bit nicer to share code, * necessary to copy out results. While it'd be a bit nicer to share code,
* it'd require changing core postgres code. * it'd require changing core postgres code.
*/ */
typedef struct OutputCopyStateData typedef struct CopyOutStateData
{ {
StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
* dest == COPY_NEW_FE in COPY FROM */ * dest == COPY_NEW_FE in COPY FROM */
@ -39,17 +39,17 @@ typedef struct OutputCopyStateData
char *delim; /* column delimiter (must be 1 byte) */ char *delim; /* column delimiter (must be 1 byte) */
MemoryContext rowcontext; /* per-row evaluation context */ MemoryContext rowcontext; /* per-row evaluation context */
} OutputCopyStateData; } CopyOutStateData;
typedef struct OutputCopyStateData *OutputCopyState; typedef struct CopyOutStateData *CopyOutState;
/* function declarations for copying into a distributed table */ /* function declarations for copying into a distributed table */
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
extern void OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, extern void CopySendRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions); CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions);
extern void CopySendBinaryHeaders(OutputCopyState headerOutputState); extern void CopySendBinaryHeaders(CopyOutState headerOutputState);
extern void CopySendBinaryFooters(OutputCopyState footerOutputState); extern void CopySendBinaryFooters(CopyOutState footerOutputState);
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag); extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);