From b20c64dd67325048238d49123c5a783d97ce23c1 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Wed, 11 Mar 2020 15:04:52 -0700 Subject: [PATCH] Address feedback --- .../executor/intermediate_result_encoder.c | 51 +++---------------- .../executor/intermediate_results.c | 32 ++++++++---- .../worker/worker_sql_task_protocol.c | 27 ++++++---- .../distributed/intermediate_results.h | 11 ++-- 4 files changed, 54 insertions(+), 67 deletions(-) diff --git a/src/backend/distributed/executor/intermediate_result_encoder.c b/src/backend/distributed/executor/intermediate_result_encoder.c index 817ca1b13..08a52ce86 100644 --- a/src/backend/distributed/executor/intermediate_result_encoder.c +++ b/src/backend/distributed/executor/intermediate_result_encoder.c @@ -43,8 +43,6 @@ #include "utils/syscache.h" -#define ENCODER_BUFFER_SIZE (4 * 1024 * 1024) - /* internal state of intermediate result file encoder */ struct IntermediateResultEncoder { @@ -56,15 +54,6 @@ struct IntermediateResultEncoder */ StringInfo outputBuffer; - /* - * Used for returning the flushed result of encoding, at which time move the - * data pointer from outputBuffer to flushBuffer before reseting length of - * outputBuffer. - * - * This is kept here to avoid allocating it everytime we need to flush some data. - */ - StringInfo flushBuffer; - IntermediateResultFormat format; TupleDesc tupleDescriptor; @@ -74,7 +63,7 @@ struct IntermediateResultEncoder }; /* forward declaration of local functions */ -static void ReadCopyFileIntoTupleStore(char *fileName, char *copyFormat, +static void ReadCopyFileIntoTupleStore(const char *fileName, const char *copyFormat, TupleDesc tupleDescriptor, Tuplestorestate *tupstore); static Relation StubRelation(TupleDesc tupleDescriptor); @@ -88,13 +77,13 @@ static Relation StubRelation(TupleDesc tupleDescriptor); IntermediateResultEncoder * IntermediateResultEncoderCreate(TupleDesc tupleDesc, IntermediateResultFormat format, - MemoryContext tupleContext) + MemoryContext tupleContext, + StringInfo outputBuffer) { IntermediateResultEncoder *encoder = palloc0(sizeof(IntermediateResultEncoder)); encoder->format = format; encoder->tupleDescriptor = CreateTupleDescCopy(tupleDesc); - encoder->outputBuffer = makeStringInfo(); - encoder->flushBuffer = makeStringInfo(); + encoder->outputBuffer = outputBuffer; if (format == TEXT_COPY_FORMAT || format == BINARY_COPY_FORMAT) { @@ -141,7 +130,7 @@ IntermediateResultEncoderCreate(TupleDesc tupleDesc, /* * IntermediateResultEncoderReceive encodes the next row with the given encoder. */ -StringInfo +void IntermediateResultEncoderReceive(IntermediateResultEncoder *encoder, Datum *values, bool *nulls) { @@ -151,18 +140,6 @@ IntermediateResultEncoderReceive(IntermediateResultEncoder *encoder, AppendCopyRowData(values, nulls, encoder->tupleDescriptor, encoder->copyOutState, encoder->columnOutputFunctions, NULL); } - - if (encoder->outputBuffer->len > ENCODER_BUFFER_SIZE) - { - encoder->flushBuffer->data = encoder->outputBuffer->data; - encoder->flushBuffer->len = encoder->outputBuffer->len; - - encoder->outputBuffer->len = 0; - - return encoder->flushBuffer; - } - - return NULL; } @@ -170,7 +147,7 @@ IntermediateResultEncoderReceive(IntermediateResultEncoder *encoder, * IntermediateResultEncoderDone tells the encoder that there is no more work * to do. Encoder possibly can add footer data at this stage. */ -StringInfo +void IntermediateResultEncoderDone(IntermediateResultEncoder *encoder) { if (encoder->format == TEXT_COPY_FORMAT || @@ -182,18 +159,6 @@ IntermediateResultEncoderDone(IntermediateResultEncoder *encoder) AppendCopyBinaryFooters(copyOutState); } } - - if (encoder->outputBuffer->len > 0) - { - encoder->flushBuffer->data = encoder->outputBuffer->data; - encoder->flushBuffer->len = encoder->outputBuffer->len; - - encoder->outputBuffer->len = 0; - - return encoder->flushBuffer; - } - - return NULL; } @@ -237,7 +202,7 @@ ReadFileIntoTupleStore(char *fileName, IntermediateResultFormat format, * store. */ static void -ReadCopyFileIntoTupleStore(char *fileName, char *copyFormat, +ReadCopyFileIntoTupleStore(const char *fileName, const char *copyFormat, TupleDesc tupleDescriptor, Tuplestorestate *tupstore) { @@ -258,7 +223,7 @@ ReadCopyFileIntoTupleStore(char *fileName, char *copyFormat, List *copyOptions = NIL; int location = -1; /* "unknown" token location */ - DefElem *copyOption = makeDefElem("format", (Node *) makeString(copyFormat), + DefElem *copyOption = makeDefElem("format", (Node *) makeString((char *) copyFormat), location); copyOptions = lappend(copyOptions, copyOption); diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index c0f755636..5c143cbd4 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -71,6 +71,9 @@ typedef struct RemoteFileDestReceiver /* state on how to copy out data types */ IntermediateResultEncoder *encoder; + /* buffer to which encoder writes its output */ + StringInfo outputBuffer; + /* number of tuples sent */ uint64 tuplesSent; } RemoteFileDestReceiver; @@ -238,8 +241,10 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, MemoryContext tupleContext = GetPerTupleMemoryContext(resultDest->executorState); IntermediateResultFormat format = ResultFileFormatForTupleDesc(inputTupleDescriptor); + resultDest->outputBuffer = makeStringInfo(); resultDest->encoder = IntermediateResultEncoderCreate(inputTupleDescriptor, - format, tupleContext); + format, tupleContext, + resultDest->outputBuffer); if (resultDest->writeLocalFile) { @@ -348,18 +353,21 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) Datum *columnValues = slot->tts_values; bool *columnNulls = slot->tts_isnull; - StringInfo bufferToFlush = - IntermediateResultEncoderReceive(resultDest->encoder, columnValues, columnNulls); - if (bufferToFlush != NULL) + IntermediateResultEncoderReceive(resultDest->encoder, columnValues, columnNulls); + + StringInfo outputBuffer = resultDest->outputBuffer; + if (outputBuffer->len >= ENCODER_BUFFER_SIZE_THRESHOLD) { /* send row to nodes */ - BroadcastCopyData(bufferToFlush, connectionList); + BroadcastCopyData(outputBuffer, connectionList); /* write to local file (if applicable) */ if (resultDest->writeLocalFile) { - WriteToLocalFile(bufferToFlush, &resultDest->fileCompat); + WriteToLocalFile(outputBuffer, &resultDest->fileCompat); } + + resetStringInfo(outputBuffer); } MemoryContextSwitchTo(oldContext); @@ -400,17 +408,21 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver) RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver; List *connectionList = resultDest->connectionList; - StringInfo bufferToFlush = IntermediateResultEncoderDone(resultDest->encoder); - if (bufferToFlush != NULL) + IntermediateResultEncoderDone(resultDest->encoder); + + StringInfo outputBuffer = resultDest->outputBuffer; + if (outputBuffer->len > 0) { /* send row to nodes */ - BroadcastCopyData(bufferToFlush, connectionList); + BroadcastCopyData(outputBuffer, connectionList); /* write to local file (if applicable) */ if (resultDest->writeLocalFile) { - WriteToLocalFile(bufferToFlush, &resultDest->fileCompat); + WriteToLocalFile(outputBuffer, &resultDest->fileCompat); } + + resetStringInfo(outputBuffer); } /* close the COPY input */ diff --git a/src/backend/distributed/worker/worker_sql_task_protocol.c b/src/backend/distributed/worker/worker_sql_task_protocol.c index d6d987361..df089a738 100644 --- a/src/backend/distributed/worker/worker_sql_task_protocol.c +++ b/src/backend/distributed/worker/worker_sql_task_protocol.c @@ -24,8 +24,6 @@ /* necessary to get S_IRUSR, S_IWUSR definitions on illumos */ #include -#define COPY_BUFFER_SIZE (4 * 1024 * 1024) - /* TaskFileDestReceiver can be used to stream results into a file */ typedef struct TaskFileDestReceiver { @@ -49,6 +47,9 @@ typedef struct TaskFileDestReceiver /* state on how to copy out data types */ IntermediateResultEncoder *encoder; + /* buffer to which encoder writes its output */ + StringInfo outputBuffer; + /* statistics */ uint64 tuplesSent; uint64 bytesSent; @@ -169,9 +170,11 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation, taskFileDest->tupleDescriptor = inputTupleDescriptor; /* define how tuples will be serialised */ + taskFileDest->outputBuffer = makeStringInfo(); taskFileDest->encoder = IntermediateResultEncoderCreate(inputTupleDescriptor, taskFileDest->format, - taskFileDest->tupleContext); + taskFileDest->tupleContext, + taskFileDest->outputBuffer); taskFileDest->fileCompat = FileCompatFromFileStart(FileOpenForTransmit( taskFileDest->filePath, @@ -202,11 +205,13 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) Datum *columnValues = slot->tts_values; bool *columnNulls = slot->tts_isnull; - StringInfo bufferToFlush = IntermediateResultEncoderReceive(encoder, columnValues, - columnNulls); - if (bufferToFlush != NULL) + IntermediateResultEncoderReceive(encoder, columnValues, columnNulls); + + StringInfo outputBuffer = taskFileDest->outputBuffer; + if (outputBuffer->len > ENCODER_BUFFER_SIZE_THRESHOLD) { - WriteToLocalFile(bufferToFlush, taskFileDest); + WriteToLocalFile(outputBuffer, taskFileDest); + resetStringInfo(outputBuffer); } MemoryContextSwitchTo(oldContext); @@ -248,10 +253,12 @@ TaskFileDestReceiverShutdown(DestReceiver *destReceiver) TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver; IntermediateResultEncoder *encoder = taskFileDest->encoder; - StringInfo bufferToFlush = IntermediateResultEncoderDone(encoder); - if (bufferToFlush != NULL) + IntermediateResultEncoderDone(encoder); + + StringInfo outputBuffer = taskFileDest->outputBuffer; + if (outputBuffer->len > 0) { - WriteToLocalFile(bufferToFlush, taskFileDest); + WriteToLocalFile(outputBuffer, taskFileDest); } FileClose(taskFileDest->fileCompat.fd); diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index 5ab586690..10e156513 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -22,6 +22,8 @@ #include "utils/palloc.h" +#define ENCODER_BUFFER_SIZE_THRESHOLD (4 * 1024 * 1024) + /* * DistributedResultFragment represents a fragment of a distributed result. */ @@ -82,10 +84,11 @@ extern char * CreateIntermediateResultsDirectory(void); extern IntermediateResultEncoder * IntermediateResultEncoderCreate(TupleDesc tupleDesc, IntermediateResultFormat format, MemoryContext - tupleContext); -extern StringInfo IntermediateResultEncoderReceive(IntermediateResultEncoder *encoder, - Datum *values, bool *nulls); -extern StringInfo IntermediateResultEncoderDone(IntermediateResultEncoder *encoder); + tupleContext, + StringInfo outputBuffer); +extern void IntermediateResultEncoderReceive(IntermediateResultEncoder *encoder, + Datum *values, bool *nulls); +extern void IntermediateResultEncoderDone(IntermediateResultEncoder *encoder); extern void IntermediateResultEncoderDestroy(IntermediateResultEncoder *encoder); extern void ReadFileIntoTupleStore(char *fileName, IntermediateResultFormat format, TupleDesc tupleDescriptor, Tuplestorestate *tupstore);