From f5b9b761002fc69a06afb15cc7142c2e99a489d8 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 5 Dec 2019 13:53:02 -0800 Subject: [PATCH] b --- src/backend/distributed/commands/multi_copy.c | 11 ++++++++-- .../executor/intermediate_results.c | 22 ++++++++++++------- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 8cfa56695..5b0df9762 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -1632,8 +1632,12 @@ AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, uint32 totalColumnCount = (uint32) rowDescriptor->natts; uint32 availableColumnCount = AvailableColumnCount(rowDescriptor); uint32 appendedColumnCount = 0; + MemoryContext oldContext = NULL; - MemoryContext oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext); + if (rowOutputState->rowcontext) + { + oldContext = MemoryContextSwitchTo(rowOutputState->rowcontext); + } if (rowOutputState->binary) { @@ -1709,7 +1713,10 @@ AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, #endif } - MemoryContextSwitchTo(oldContext); + if (rowOutputState->rowcontext) + { + MemoryContextSwitchTo(oldContext); + } } diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 71bb8a01c..4ffbd43a7 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -126,8 +126,8 @@ write_intermediate_result_sfunc(PG_FUNCTION_ARGS) { MemoryContext agg_context; - text *resultId = PG_GETARG_TEXT_P(1); - char *resultIdString = text_to_cstring(resultId); + text *filePath = PG_GETARG_TEXT_P(1); + char *filePathString = text_to_cstring(filePath); if (!AggCheckCallContext(fcinfo, &agg_context)) { @@ -154,23 +154,28 @@ write_intermediate_result_sfunc(PG_FUNCTION_ARGS) copyOutState->null_print_client = (char *) nullPrintCharacter; copyOutState->binary = false;/*CanUseBinaryCopyFormat(state->tupleDescriptor); */ copyOutState->fe_msgbuf = makeStringInfo(); - copyOutState->rowcontext = - AllocSetContextCreate(agg_context, "COPY TO", ALLOCSET_DEFAULT_SIZES); + copyOutState->rowcontext = NULL; state->copyOutState = copyOutState; /* make sure the directory exists */ CreateIntermediateResultsDirectory(); - const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); + const int fileFlags = (O_APPEND | O_CREAT | O_WRONLY | O_TRUNC | PG_BINARY); const int fileMode = (S_IRUSR | S_IWUSR); - char *filePath = QueryResultFileName(resultIdString); + /* char *filePath = QueryResultFileName(resultIdString); */ state->fileCompat = - FileCompatFromFileStart(FileOpenForTransmit(filePath, fileFlags, fileMode)); + FileCompatFromFileStart(FileOpenForTransmit(filePathString, fileFlags, + fileMode)); state->columnOutputFunctions = ColumnOutputFunctions(state->tupleDescriptor, copyOutState->binary); + + StringInfo copyData = state->copyOutState->fe_msgbuf; + enlargeStringInfo(copyData, 128 * 1024); + + MemoryContextSwitchTo(old_context); } @@ -190,7 +195,8 @@ write_intermediate_result_sfunc(PG_FUNCTION_ARGS) state->columnOutputFunctions, NULL); StringInfo copyData = state->copyOutState->fe_msgbuf; - if (copyData->len >= 1024 * 1024) + + if (copyData->len >= 128 * 1024) { FileWriteCompat(&state->fileCompat, copyData->data, copyData->len, PG_WAIT_IO);