Refactor OutputRow()

- Pull heap_deform_tuple() out of OutputRow().
- Use same valueArray and isNullArray for multiple rows similar
to CopyTo() in copy.c.
pull/390/head
Metin Doslu 2016-03-16 14:41:25 -07:00
parent 89411f9f52
commit 040faea16e
1 changed files with 19 additions and 15 deletions

View File

@ -67,7 +67,7 @@ static int ColumnIndex(TupleDesc rowDescriptor, const char *columnName);
static FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
static PartialCopyState InitRowOutputState(void);
static void ClearRowOutputState(PartialCopyState copyState);
static void OutputRow(HeapTuple row, TupleDesc rowDescriptor,
static void OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
PartialCopyState rowOutputState, FmgrInfo *columnOutputFunctions);
static void OutputBinaryHeaders(FileOutputStream *partitionFileArray, uint32 fileCount);
static void OutputBinaryFooters(FileOutputStream *partitionFileArray, uint32 fileCount);
@ -786,11 +786,15 @@ FilterAndPartitionTable(const char *filterQuery,
while (SPI_processed > 0)
{
TupleDesc rowDescriptor = SPI_tuptable->tupdesc;
uint32 columnCount = (uint32) rowDescriptor->natts;
Datum *valueArray = (Datum *) palloc0(columnCount * sizeof(Datum));
bool *isNullArray = (bool *) palloc0(columnCount * sizeof(bool));
int rowIndex = 0;
for (rowIndex = 0; rowIndex < SPI_processed; rowIndex++)
{
HeapTuple row = SPI_tuptable->vals[rowIndex];
TupleDesc rowDescriptor = SPI_tuptable->tupdesc;
FileOutputStream partitionFile = { 0, 0, 0 };
StringInfo rowText = NULL;
Datum partitionKey = 0;
@ -815,7 +819,12 @@ FilterAndPartitionTable(const char *filterQuery,
partitionId = 0;
}
OutputRow(row, rowDescriptor, rowOutputState, columnOutputFunctions);
/* deconstruct the tuple; this is faster than repeated heap_getattr */
heap_deform_tuple(row, rowDescriptor, valueArray, isNullArray);
OutputRow(valueArray, isNullArray, rowDescriptor, rowOutputState,
columnOutputFunctions);
rowText = rowOutputState->fe_msgbuf;
partitionFile = partitionFileArray[partitionId];
@ -824,6 +833,9 @@ FilterAndPartitionTable(const char *filterQuery,
resetStringInfo(rowText);
}
pfree(valueArray);
pfree(isNullArray);
SPI_freetuptable(SPI_tuptable);
SPI_cursor_fetch(queryPortal, fetchForward, prefetchCount);
@ -997,23 +1009,17 @@ ClearRowOutputState(PartialCopyState rowOutputState)
* commands/copy.c, but only implements a subset of that functionality.
*/
static void
OutputRow(HeapTuple row, TupleDesc rowDescriptor,
OutputRow(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
PartialCopyState rowOutputState, FmgrInfo *columnOutputFunctions)
{
MemoryContext oldContext = NULL;
uint32 columnIndex = 0;
uint32 columnCount = (uint32) rowDescriptor->natts;
Datum *valueArray = (Datum *) palloc0(columnCount * sizeof(Datum));
bool *isNullArray = (bool *) palloc0(columnCount * sizeof(bool));
/* deconstruct the tuple; this is faster than repeated heap_getattr */
heap_deform_tuple(row, rowDescriptor, valueArray, isNullArray);
uint32 columnCount = 0;
/* reset previous tuple's output data, and the temporary memory context */
resetStringInfo(rowOutputState->fe_msgbuf);
MemoryContextReset(rowOutputState->rowcontext);
oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext);
if (rowOutputState->binary)
@ -1021,6 +1027,7 @@ OutputRow(HeapTuple row, TupleDesc rowDescriptor,
CopySendInt16(rowOutputState, rowDescriptor->natts);
}
columnCount = (uint32) rowDescriptor->natts;
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
Datum value = valueArray[columnIndex];
@ -1076,9 +1083,6 @@ OutputRow(HeapTuple row, TupleDesc rowDescriptor,
}
MemoryContextSwitchTo(oldContext);
pfree(valueArray);
pfree(isNullArray);
}