write_intermediate_results
Hadi Moshayedi 2019-12-05 13:53:02 -08:00
parent 35c433ff94
commit f5b9b76100
2 changed files with 23 additions and 10 deletions

View File

@ -1632,8 +1632,12 @@ AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
uint32 totalColumnCount = (uint32) rowDescriptor->natts; uint32 totalColumnCount = (uint32) rowDescriptor->natts;
uint32 availableColumnCount = AvailableColumnCount(rowDescriptor); uint32 availableColumnCount = AvailableColumnCount(rowDescriptor);
uint32 appendedColumnCount = 0; uint32 appendedColumnCount = 0;
MemoryContext oldContext = NULL;
MemoryContext oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext); if (rowOutputState->rowcontext)
{
oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext);
}
if (rowOutputState->binary) if (rowOutputState->binary)
{ {
@ -1709,7 +1713,10 @@ AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
#endif #endif
} }
MemoryContextSwitchTo(oldContext); if (rowOutputState->rowcontext)
{
MemoryContextSwitchTo(oldContext);
}
} }

View File

@ -126,8 +126,8 @@ write_intermediate_result_sfunc(PG_FUNCTION_ARGS)
{ {
MemoryContext agg_context; MemoryContext agg_context;
text *resultId = PG_GETARG_TEXT_P(1); text *filePath = PG_GETARG_TEXT_P(1);
char *resultIdString = text_to_cstring(resultId); char *filePathString = text_to_cstring(filePath);
if (!AggCheckCallContext(fcinfo, &agg_context)) if (!AggCheckCallContext(fcinfo, &agg_context))
{ {
@ -154,23 +154,28 @@ write_intermediate_result_sfunc(PG_FUNCTION_ARGS)
copyOutState->null_print_client = (char *) nullPrintCharacter; copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->binary = false;/*CanUseBinaryCopyFormat(state->tupleDescriptor); */ copyOutState->binary = false;/*CanUseBinaryCopyFormat(state->tupleDescriptor); */
copyOutState->fe_msgbuf = makeStringInfo(); copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = copyOutState->rowcontext = NULL;
AllocSetContextCreate(agg_context, "COPY TO", ALLOCSET_DEFAULT_SIZES);
state->copyOutState = copyOutState; state->copyOutState = copyOutState;
/* make sure the directory exists */ /* make sure the directory exists */
CreateIntermediateResultsDirectory(); CreateIntermediateResultsDirectory();
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); const int fileFlags = (O_APPEND | O_CREAT | O_WRONLY | O_TRUNC | PG_BINARY);
const int fileMode = (S_IRUSR | S_IWUSR); const int fileMode = (S_IRUSR | S_IWUSR);
char *filePath = QueryResultFileName(resultIdString); /* char *filePath = QueryResultFileName(resultIdString); */
state->fileCompat = state->fileCompat =
FileCompatFromFileStart(FileOpenForTransmit(filePath, fileFlags, fileMode)); FileCompatFromFileStart(FileOpenForTransmit(filePathString, fileFlags,
fileMode));
state->columnOutputFunctions = ColumnOutputFunctions(state->tupleDescriptor, state->columnOutputFunctions = ColumnOutputFunctions(state->tupleDescriptor,
copyOutState->binary); copyOutState->binary);
StringInfo copyData = state->copyOutState->fe_msgbuf;
enlargeStringInfo(copyData, 128 * 1024);
MemoryContextSwitchTo(old_context); MemoryContextSwitchTo(old_context);
} }
@ -190,7 +195,8 @@ write_intermediate_result_sfunc(PG_FUNCTION_ARGS)
state->columnOutputFunctions, NULL); state->columnOutputFunctions, NULL);
StringInfo copyData = state->copyOutState->fe_msgbuf; StringInfo copyData = state->copyOutState->fe_msgbuf;
if (copyData->len >= 1024 * 1024)
if (copyData->len >= 128 * 1024)
{ {
FileWriteCompat(&state->fileCompat, copyData->data, FileWriteCompat(&state->fileCompat, copyData->data,
copyData->len, PG_WAIT_IO); copyData->len, PG_WAIT_IO);