diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index 7f00adf37..c5a7065dc 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -70,7 +70,9 @@ static void ClearRowOutputState(OutputCopyState copyState); static void OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, OutputCopyState rowOutputState, FmgrInfo *columnOutputFunctions); static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount); +static void CopySendBinaryHeaders(OutputCopyState headerOutputState); static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount); +static void CopySendBinaryFooters(OutputCopyState footerOutputState); static void CopySendData(OutputCopyState outputState, const void *databuf, int datasize); static void CopySendString(OutputCopyState outputState, const char *str); static void CopySendChar(OutputCopyState outputState, char c); @@ -1097,7 +1099,6 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount) for (fileIndex = 0; fileIndex < fileCount; fileIndex++) { /* Generate header for a binary copy */ - const int32 zero = 0; FileOutputStream partitionFile = { 0, 0, 0 }; OutputCopyStateData headerOutputStateData; OutputCopyState headerOutputState = (OutputCopyState) & headerOutputStateData; @@ -1105,14 +1106,7 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount) memset(headerOutputState, 0, sizeof(OutputCopyStateData)); headerOutputState->fe_msgbuf = makeStringInfo(); - /* Signature */ - CopySendData(headerOutputState, BinarySignature, 11); - - /* Flags field (no OIDs) */ - CopySendInt32(headerOutputState, zero); - - /* No header extension */ - CopySendInt32(headerOutputState, zero); + CopySendBinaryHeaders(headerOutputState); partitionFile = partitionFileArray[fileIndex]; FileOutputStreamWrite(partitionFile, headerOutputState->fe_msgbuf); @@ -1120,6 +1114,23 @@ OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount) } +/* Append binary headers to the copy buffer in headerOutputState. */ +static void +CopySendBinaryHeaders(OutputCopyState headerOutputState) +{ + const int32 zero = 0; + + /* Signature */ + CopySendData(headerOutputState, BinarySignature, 11); + + /* Flags field (no OIDs) */ + CopySendInt32(headerOutputState, zero); + + /* No header extension */ + CopySendInt32(headerOutputState, zero); +} + + /* * Write the footer of postgres' binary serialization format to each partition file. * This function is used when binary_worker_copy_format is enabled. @@ -1131,7 +1142,6 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount) for (fileIndex = 0; fileIndex < fileCount; fileIndex++) { /* Generate footer for a binary copy */ - int16 negative = -1; FileOutputStream partitionFile = { 0, 0, 0 }; OutputCopyStateData footerOutputStateData; OutputCopyState footerOutputState = (OutputCopyState) & footerOutputStateData; @@ -1139,7 +1149,7 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount) memset(footerOutputState, 0, sizeof(OutputCopyStateData)); footerOutputState->fe_msgbuf = makeStringInfo(); - CopySendInt16(footerOutputState, negative); + CopySendBinaryFooters(footerOutputState); partitionFile = partitionFileArray[fileIndex]; FileOutputStreamWrite(partitionFile, footerOutputState->fe_msgbuf); @@ -1147,6 +1157,16 @@ OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount) } +/* Append binary footers to the copy buffer in footerOutputState. */ +static void +CopySendBinaryFooters(OutputCopyState footerOutputState) +{ + int16 negative = -1; + + CopySendInt16(footerOutputState, negative); +} + + /* *INDENT-OFF* */ /* Append data to the copy buffer in outputState */ static void