Refactor binary header and footer functions

pull/390/head
Metin Doslu 2016-03-16 15:44:01 -07:00
parent 54d13a8325
commit 3eaab16980
1 changed files with 31 additions and 11 deletions

View File

@ -70,7 +70,9 @@ static void ClearRowOutputState(OutputCopyState copyState);
static void OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, static void OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions); OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions);
static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount); static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount);
static void CopySendBinaryHeaders(OutputCopyState headerOutputState);
static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount); static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount);
static void CopySendBinaryFooters(OutputCopyState footerOutputState);
static void CopySendData(OutputCopyState outputState, const void *databuf, int datasize); static void CopySendData(OutputCopyState outputState, const void *databuf, int datasize);
static void CopySendString(OutputCopyState outputState, const char *str); static void CopySendString(OutputCopyState outputState, const char *str);
static void CopySendChar(OutputCopyState outputState, char c); static void CopySendChar(OutputCopyState outputState, char c);
@ -1097,7 +1099,6 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount)
for (fileIndex = 0; fileIndex < fileCount; fileIndex++) for (fileIndex = 0; fileIndex < fileCount; fileIndex++)
{ {
/* Generate header for a binary copy */ /* Generate header for a binary copy */
const int32 zero = 0;
FileOutputStream partitionFile = { 0, 0, 0 }; FileOutputStream partitionFile = { 0, 0, 0 };
OutputCopyStateData headerOutputStateData; OutputCopyStateData headerOutputStateData;
OutputCopyState headerOutputState = (OutputCopyState) & headerOutputStateData; OutputCopyState headerOutputState = (OutputCopyState) & headerOutputStateData;
@ -1105,14 +1106,7 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount)
memset(headerOutputState, 0, sizeof(OutputCopyStateData)); memset(headerOutputState, 0, sizeof(OutputCopyStateData));
headerOutputState->fe_msgbuf = makeStringInfo(); headerOutputState->fe_msgbuf = makeStringInfo();
/* Signature */ CopySendBinaryHeaders(headerOutputState);
CopySendData(headerOutputState, BinarySignature, 11);
/* Flags field (no OIDs) */
CopySendInt32(headerOutputState, zero);
/* No header extension */
CopySendInt32(headerOutputState, zero);
partitionFile = partitionFileArray[fileIndex]; partitionFile = partitionFileArray[fileIndex];
FileOutputStreamWrite(partitionFile, headerOutputState->fe_msgbuf); FileOutputStreamWrite(partitionFile, headerOutputState->fe_msgbuf);
@ -1120,6 +1114,23 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount)
} }
/* Append binary headers to the copy buffer in headerOutputState. */
static void
CopySendBinaryHeaders(OutputCopyState headerOutputState)
{
const int32 zero = 0;
/* Signature */
CopySendData(headerOutputState, BinarySignature, 11);
/* Flags field (no OIDs) */
CopySendInt32(headerOutputState, zero);
/* No header extension */
CopySendInt32(headerOutputState, zero);
}
/* /*
* Write the footer of postgres' binary serialization format to each partition file. * Write the footer of postgres' binary serialization format to each partition file.
* This function is used when binary_worker_copy_format is enabled. * This function is used when binary_worker_copy_format is enabled.
@ -1131,7 +1142,6 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount)
for (fileIndex = 0; fileIndex < fileCount; fileIndex++) for (fileIndex = 0; fileIndex < fileCount; fileIndex++)
{ {
/* Generate footer for a binary copy */ /* Generate footer for a binary copy */
int16 negative = -1;
FileOutputStream partitionFile = { 0, 0, 0 }; FileOutputStream partitionFile = { 0, 0, 0 };
OutputCopyStateData footerOutputStateData; OutputCopyStateData footerOutputStateData;
OutputCopyState footerOutputState = (OutputCopyState) & footerOutputStateData; OutputCopyState footerOutputState = (OutputCopyState) & footerOutputStateData;
@ -1139,7 +1149,7 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount)
memset(footerOutputState, 0, sizeof(OutputCopyStateData)); memset(footerOutputState, 0, sizeof(OutputCopyStateData));
footerOutputState->fe_msgbuf = makeStringInfo(); footerOutputState->fe_msgbuf = makeStringInfo();
CopySendInt16(footerOutputState, negative); CopySendBinaryFooters(footerOutputState);
partitionFile = partitionFileArray[fileIndex]; partitionFile = partitionFileArray[fileIndex];
FileOutputStreamWrite(partitionFile, footerOutputState->fe_msgbuf); FileOutputStreamWrite(partitionFile, footerOutputState->fe_msgbuf);
@ -1147,6 +1157,16 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount)
} }
/* Append binary footers to the copy buffer in footerOutputState. */
static void
CopySendBinaryFooters(OutputCopyState footerOutputState)
{
int16 negative = -1;
CopySendInt16(footerOutputState, negative);
}
/* *INDENT-OFF* */ /* *INDENT-OFF* */
/* Append data to the copy buffer in outputState */ /* Append data to the copy buffer in outputState */
static void static void